diff options
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r-- | src/core/core_api.c | 513 |
1 files changed, 235 insertions, 278 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c index 526dc9f..2b1291d 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -31,6 +31,65 @@ #define LOG(kind,...) GNUNET_log_from (kind, "core-api",__VA_ARGS__) + +/** + * Handle for a transmission request. + */ +struct GNUNET_CORE_TransmitHandle +{ + + /** + * Corresponding peer record. + */ + struct PeerRecord *peer; + + /** + * Corresponding SEND_REQUEST message. Only non-NULL + * while SEND_REQUEST message is pending. + */ + struct ControlMessage *cm; + + /** + * Function that will be called to get the actual request + * (once we are ready to transmit this request to the core). + * The function will be called with a NULL buffer to signal + * timeout. + */ + GNUNET_CONNECTION_TransmitReadyNotify get_message; + + /** + * Closure for get_message. + */ + void *get_message_cls; + + /** + * Timeout for this handle. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * How important is this message? + */ + uint32_t priority; + + /** + * Size of this request. + */ + uint16_t msize; + + /** + * Send message request ID for this request. + */ + uint16_t smr_id; + + /** + * Is corking allowed? + */ + int cork; + +}; + + /** * Information we track for each peer. */ @@ -62,16 +121,10 @@ struct PeerRecord struct GNUNET_CORE_Handle *ch; /** - * Head of doubly-linked list of pending requests. - * Requests are sorted by deadline *except* for HEAD, - * which is only modified upon transmission to core. - */ - struct GNUNET_CORE_TransmitHandle *pending_head; - - /** - * Tail of doubly-linked list of pending requests. + * Pending request, if any. 'th->peer' is set to NULL if the + * request is not active. */ - struct GNUNET_CORE_TransmitHandle *pending_tail; + struct GNUNET_CORE_TransmitHandle th; /** * ID of timeout task for the 'pending_head' handle @@ -85,11 +138,6 @@ struct PeerRecord GNUNET_SCHEDULER_TaskIdentifier ntr_task; /** - * Current size of the queue of pending requests. - */ - unsigned int queue_size; - - /** * SendMessageRequest ID generator for this peer. */ uint16_t smr_id_gen; @@ -247,11 +295,6 @@ struct GNUNET_CORE_Handle struct GNUNET_TIME_Relative retry_backoff; /** - * Number of messages we are allowed to queue per target. - */ - unsigned int queue_size; - - /** * Number of entries in the handlers array. */ unsigned int hcnt; @@ -278,74 +321,6 @@ struct GNUNET_CORE_Handle /** - * Handle for a transmission request. - */ -struct GNUNET_CORE_TransmitHandle -{ - - /** - * We keep active transmit handles in a doubly-linked list. - */ - struct GNUNET_CORE_TransmitHandle *next; - - /** - * We keep active transmit handles in a doubly-linked list. - */ - struct GNUNET_CORE_TransmitHandle *prev; - - /** - * Corresponding peer record. - */ - struct PeerRecord *peer; - - /** - * Corresponding SEND_REQUEST message. Only non-NULL - * while SEND_REQUEST message is pending. - */ - struct ControlMessage *cm; - - /** - * Function that will be called to get the actual request - * (once we are ready to transmit this request to the core). - * The function will be called with a NULL buffer to signal - * timeout. - */ - GNUNET_CONNECTION_TransmitReadyNotify get_message; - - /** - * Closure for get_message. - */ - void *get_message_cls; - - /** - * Timeout for this handle. - */ - struct GNUNET_TIME_Absolute timeout; - - /** - * How important is this message? - */ - uint32_t priority; - - /** - * Size of this request. - */ - uint16_t msize; - - /** - * Send message request ID for this request. - */ - uint16_t smr_id; - - /** - * Is corking allowed? - */ - int cork; - -}; - - -/** * Our current client connection went down. Clean it up * and try to reconnect! * @@ -382,47 +357,42 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @return GNUNET_YES (continue) */ static int -disconnect_and_free_peer_entry (void *cls, const GNUNET_HashCode * key, +disconnect_and_free_peer_entry (void *cls, const struct GNUNET_HashCode * key, void *value) { struct GNUNET_CORE_Handle *h = cls; struct GNUNET_CORE_TransmitHandle *th; struct PeerRecord *pr = value; - if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK) + if (GNUNET_SCHEDULER_NO_TASK != pr->timeout_task) { GNUNET_SCHEDULER_cancel (pr->timeout_task); pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; } - if (pr->ntr_task != GNUNET_SCHEDULER_NO_TASK) + if (GNUNET_SCHEDULER_NO_TASK != pr->ntr_task) { GNUNET_SCHEDULER_cancel (pr->ntr_task); pr->ntr_task = GNUNET_SCHEDULER_NO_TASK; } - if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr)) + if ((NULL != pr->prev) || (NULL != pr->next) || (h->ready_peer_head == pr)) GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); - if (h->disconnects != NULL) + if (NULL != h->disconnects) h->disconnects (h->cls, &pr->peer); /* all requests should have been cancelled, clean up anyway, just in case */ - GNUNET_break (pr->queue_size == 0); - while (NULL != (th = pr->pending_head)) + th = &pr->th; + if (NULL != th->peer) { GNUNET_break (0); - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); - pr->queue_size--; - if (th->cm != NULL) + th->peer = NULL; + if (NULL != th->cm) th->cm->th = NULL; - GNUNET_free (th); } /* done with 'voluntary' cleanups, now on to normal freeing */ GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (h->peers, key, pr)); - GNUNET_assert (pr->pending_head == NULL); - GNUNET_assert (pr->pending_tail == NULL); GNUNET_assert (pr->ch == h); - GNUNET_assert (pr->queue_size == 0); - GNUNET_assert (pr->timeout_task == GNUNET_SCHEDULER_NO_TASK); - GNUNET_assert (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pr->timeout_task); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pr->ntr_task); GNUNET_free (pr); return GNUNET_YES; } @@ -440,13 +410,13 @@ reconnect_later (struct GNUNET_CORE_Handle *h) struct ControlMessage *cm; struct PeerRecord *pr; - GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->reconnect_task); if (NULL != h->cth) { GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); h->cth = NULL; } - if (h->client != NULL) + if (NULL != h->client) { GNUNET_CLIENT_disconnect (h->client); h->client = NULL; @@ -459,9 +429,9 @@ reconnect_later (struct GNUNET_CORE_Handle *h) { GNUNET_CONTAINER_DLL_remove (h->control_pending_head, h->control_pending_tail, cm); - if (cm->th != NULL) + if (NULL != cm->th) cm->th->cm = NULL; - if (cm->cont != NULL) + if (NULL != cm->cont) cm->cont (cm->cont_cls, GNUNET_NO); GNUNET_free (cm); } @@ -470,9 +440,7 @@ reconnect_later (struct GNUNET_CORE_Handle *h) while (NULL != (pr = h->ready_peer_head)) GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); GNUNET_assert (h->control_pending_head == NULL); - h->retry_backoff = - GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_SECONDS, h->retry_backoff); - h->retry_backoff = GNUNET_TIME_relative_multiply (h->retry_backoff, 2); + h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); } @@ -517,7 +485,8 @@ request_next_transmission (struct PeerRecord *pr) GNUNET_SCHEDULER_cancel (pr->timeout_task); pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; } - if (NULL == (th = pr->pending_head)) + th = &pr->th; + if (NULL == th->peer) { trigger_next_request (h, GNUNET_NO); return; @@ -539,7 +508,7 @@ request_next_transmission (struct PeerRecord *pr) smr->priority = htonl (th->priority); smr->deadline = GNUNET_TIME_absolute_hton (th->timeout); smr->peer = pr->peer; - smr->queue_size = htonl (pr->queue_size); + smr->reserved = htonl (0); smr->size = htons (th->msize); smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, @@ -566,10 +535,14 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct GNUNET_CORE_TransmitHandle *th; pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; - th = pr->pending_head; - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); - pr->queue_size--; - if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head)) + if (GNUNET_SCHEDULER_NO_TASK != pr->ntr_task) + { + GNUNET_SCHEDULER_cancel (pr->ntr_task); + pr->ntr_task = GNUNET_SCHEDULER_NO_TASK; + } + th = &pr->th; + th->peer = NULL; + if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head)) { /* the request that was 'approved' by core was * canceled before it could be transmitted; remove @@ -584,10 +557,11 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_free (th->cm); } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Signalling timeout of request for transmission to CORE service\n"); - request_next_transmission (pr); + "Signalling timeout of request for transmission to peer `%s' via CORE\n", + GNUNET_i2s (&pr->peer)); + trigger_next_request (h, GNUNET_NO); + GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL)); - GNUNET_free (th); } @@ -613,7 +587,7 @@ transmit_message (void *cls, size_t size, void *buf) GNUNET_assert (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK); h->cth = NULL; - if (buf == NULL) + if (NULL == buf) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission failed, initiating reconnect\n"); @@ -636,7 +610,7 @@ transmit_message (void *cls, size_t size, void *buf) memcpy (buf, hdr, msize); GNUNET_CONTAINER_DLL_remove (h->control_pending_head, h->control_pending_tail, cm); - if (cm->th != NULL) + if (NULL != cm->th) cm->th->cm = NULL; if (NULL != cm->cont) cm->cont (cm->cont_cls, GNUNET_OK); @@ -645,67 +619,63 @@ transmit_message (void *cls, size_t size, void *buf) return msize; } /* now check for 'ready' P2P messages */ - if (NULL != (pr = h->ready_peer_head)) + if (NULL == (pr = h->ready_peer_head)) + return 0; + GNUNET_assert (NULL != pr->th.peer); + th = &pr->th; + if (size < th->msize + sizeof (struct SendMessage)) + { + trigger_next_request (h, GNUNET_NO); + return 0; + } + GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); + th->peer = NULL; + if (GNUNET_SCHEDULER_NO_TASK != pr->timeout_task) + { + GNUNET_SCHEDULER_cancel (pr->timeout_task); + pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting SEND request to `%s' with %u bytes.\n", + GNUNET_i2s (&pr->peer), (unsigned int) th->msize); + sm = (struct SendMessage *) buf; + sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); + sm->priority = htonl (th->priority); + sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); + sm->peer = pr->peer; + sm->cork = htonl ((uint32_t) th->cork); + sm->reserved = htonl (0); + ret = + th->get_message (th->get_message_cls, + size - sizeof (struct SendMessage), &sm[1]); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Transmitting SEND request to `%s' yielded %u bytes.\n", + GNUNET_i2s (&pr->peer), ret); + if (0 == ret) { - GNUNET_assert (pr->pending_head != NULL); - th = pr->pending_head; - if (size < th->msize + sizeof (struct SendMessage)) - { - trigger_next_request (h, GNUNET_NO); - return 0; - } - GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); - pr->queue_size--; - if (pr->timeout_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (pr->timeout_task); - pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting SEND request to `%s' with %u bytes.\n", - GNUNET_i2s (&pr->peer), (unsigned int) th->msize); - sm = (struct SendMessage *) buf; - sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); - sm->priority = htonl (th->priority); - sm->deadline = GNUNET_TIME_absolute_hton (th->timeout); - sm->peer = pr->peer; - sm->cork = htonl ((uint32_t) th->cork); - sm->reserved = htonl (0); - ret = - th->get_message (th->get_message_cls, - size - sizeof (struct SendMessage), &sm[1]); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting SEND request to `%s' yielded %u bytes.\n", - GNUNET_i2s (&pr->peer), ret); - GNUNET_free (th); - if (0 == ret) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Size of clients message to peer %s is 0!\n", - GNUNET_i2s (&pr->peer)); - /* client decided to send nothing! */ - request_next_transmission (pr); - return 0; - } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Produced SEND message to core with %u bytes payload\n", - (unsigned int) ret); - GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); - if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) - { - GNUNET_break (0); - request_next_transmission (pr); - return 0; - } - ret += sizeof (struct SendMessage); - sm->header.size = htons (ret); - GNUNET_assert (ret <= size); + "Size of clients message to peer %s is 0!\n", + GNUNET_i2s (&pr->peer)); + /* client decided to send nothing! */ request_next_transmission (pr); - return ret; + return 0; } - return 0; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Produced SEND message to core with %u bytes payload\n", + (unsigned int) ret); + GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); + if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + request_next_transmission (pr); + return 0; + } + ret += sizeof (struct SendMessage); + sm->header.size = htons (ret); + GNUNET_assert (ret <= size); + request_next_transmission (pr); + return ret; } @@ -732,13 +702,13 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down) LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n"); return; } - if (h->control_pending_head != NULL) + if (NULL != h->control_pending_head) msize = ntohs (((struct GNUNET_MessageHeader *) &h-> control_pending_head[1])->size); else if (h->ready_peer_head != NULL) msize = - h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage); + h->ready_peer_head->th.msize + sizeof (struct SendMessage); else { LOG (GNUNET_ERROR_TYPE_DEBUG, @@ -782,8 +752,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) if (NULL == msg) { LOG (GNUNET_ERROR_TYPE_INFO, - _ - ("Client was disconnected from core service, trying to reconnect.\n")); + _("Client was disconnected from core service, trying to reconnect.\n")); reconnect_later (h); return; } @@ -951,9 +920,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) continue; if ((mh->expected_size != ntohs (em->size)) && (mh->expected_size != 0)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Unexpected message size %u for message of type %u from peer `%4s'\n", - htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); + LOG (GNUNET_ERROR_TYPE_ERROR, + "Unexpected message size %u for message of type %u from peer `%4s'\n", + htons (em->size), mh->type, GNUNET_i2s (&ntm->peer)); GNUNET_break_op (0); continue; } @@ -1032,14 +1001,14 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about transmission readiness to `%s'.\n", GNUNET_i2s (&smr->peer)); - if (NULL == pr->pending_head) + if (NULL == pr->th.peer) { /* request must have been cancelled between the original request * and the response from core, ignore core's readiness */ break; } - th = pr->pending_head; + th = &pr->th; if (ntohs (smr->smr_id) != th->smr_id) { /* READY message is for expired or cancelled message, @@ -1158,7 +1127,6 @@ reconnect (struct GNUNET_CORE_Handle *h) * complete (or fail) asynchronously. * * @param cfg configuration to use - * @param queue_size size of the per-peer message queue * @param cls closure for the various callbacks that follow (including handlers in the handlers array) * @param init callback to call once we have successfully * connected to the core service @@ -1178,7 +1146,7 @@ reconnect (struct GNUNET_CORE_Handle *h) */ struct GNUNET_CORE_Handle * GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, - unsigned int queue_size, void *cls, + void *cls, GNUNET_CORE_StartupCallback init, GNUNET_CORE_ConnectEventHandler connects, GNUNET_CORE_DisconnectEventHandler disconnects, @@ -1192,7 +1160,6 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, h = GNUNET_malloc (sizeof (struct GNUNET_CORE_Handle)); h->cfg = cfg; - h->queue_size = queue_size; h->cls = cls; h->init = init; h->connects = connects; @@ -1204,8 +1171,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, h->handlers = handlers; h->hcnt = 0; h->currently_down = GNUNET_YES; - h->peers = GNUNET_CONTAINER_multihashmap_create (128); - h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS; + h->peers = GNUNET_CONTAINER_multihashmap_create (128, GNUNET_NO); if (NULL != handlers) while (handlers[h->hcnt].callback != NULL) h->hcnt++; @@ -1285,9 +1251,13 @@ run_request_next_transmission (void *cls, /** * Ask the core to call "notify" once it is ready to transmit the - * given number of bytes to the specified "target". Must only be + * given number of bytes to the specified "target". Must only be * called after a connection to the respective peer has been - * established (and the client has been informed about this). + * established (and the client has been informed about this). You may + * have one request of this type pending for each connected peer at + * any time. If a peer disconnects, the application MUST call + * "GNUNET_CORE_notify_transmit_ready_cancel" on the respective + * transmission request, if one such request is pending. * * @param handle connection to core service * @param cork is corking allowed for this transmission? @@ -1295,11 +1265,14 @@ run_request_next_transmission (void *cls, * @param maxdelay how long can the message wait? * @param target who should receive the message, never NULL (can be this peer's identity for loopback) * @param notify_size how many bytes of buffer space does notify want? - * @param notify function to call when buffer space is available + * @param notify function to call when buffer space is available; + * will be called with NULL on timeout; clients MUST cancel + * all pending transmission requests DURING the disconnect + * handler * @param notify_cls closure for notify * @return non-NULL if the notify callback was queued, - * NULL if we can not even queue the request (insufficient - * memory); if NULL is returned, "notify" will NOT be called. + * NULL if we can not even queue the request (request already pending); + * if NULL is returned, "notify" will NOT be called. */ struct GNUNET_CORE_TransmitHandle * GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, @@ -1312,89 +1285,45 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, { struct PeerRecord *pr; struct GNUNET_CORE_TransmitHandle *th; - struct GNUNET_CORE_TransmitHandle *pos; - struct GNUNET_CORE_TransmitHandle *prev; - struct GNUNET_CORE_TransmitHandle *minp; + if (notify_size > GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) + { + GNUNET_break (0); + return NULL; + } + GNUNET_assert (NULL != notify); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Asking core for transmission of %u bytes to `%s'\n", + (unsigned int) notify_size, + GNUNET_i2s (target)); pr = GNUNET_CONTAINER_multihashmap_get (handle->peers, &target->hashPubKey); if (NULL == pr) { /* attempt to send to peer that is not connected */ - LOG (GNUNET_ERROR_TYPE_WARNING, - "Attempting to send to peer `%s' from peer `%s', but not connected!\n", - GNUNET_i2s (target), GNUNET_h2s (&handle->me.hashPubKey)); + GNUNET_break (0); + return NULL; + } + if (NULL != pr->th.peer) + { + /* attempting to queue a second request for the same destination */ GNUNET_break (0); return NULL; } GNUNET_assert (notify_size + sizeof (struct SendMessage) < GNUNET_SERVER_MAX_MESSAGE_SIZE); - th = GNUNET_malloc (sizeof (struct GNUNET_CORE_TransmitHandle)); + th = &pr->th; + memset (th, 0, sizeof (struct GNUNET_CORE_TransmitHandle)); th->peer = pr; - GNUNET_assert (NULL != notify); th->get_message = notify; th->get_message_cls = notify_cls; th->timeout = GNUNET_TIME_relative_to_absolute (maxdelay); th->priority = priority; th->msize = notify_size; th->cork = cork; - /* bound queue size */ - if (pr->queue_size == handle->queue_size) - { - /* find lowest-priority entry, but skip the head of the list */ - minp = pr->pending_head->next; - prev = minp; - while (prev != NULL) - { - if (prev->priority < minp->priority) - minp = prev; - prev = prev->next; - } - if (minp == NULL) - { - GNUNET_break (handle->queue_size != 0); - GNUNET_break (pr->queue_size == 1); - GNUNET_free (th); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Dropping transmission request: cannot drop queue head and limit is one\n"); - return NULL; - } - if (priority <= minp->priority) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Dropping transmission request: priority too low\n"); - GNUNET_free (th); - return NULL; /* priority too low */ - } - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, minp); - pr->queue_size--; - GNUNET_assert (0 == minp->get_message (minp->get_message_cls, 0, NULL)); - GNUNET_free (minp); - } - - /* Order entries by deadline, but SKIP 'HEAD' (as we may have transmitted - * that request already or might even already be approved to transmit that - * message to core) */ - pos = pr->pending_head; - if (pos != NULL) - pos = pos->next; /* skip head */ - - /* insertion sort */ - prev = pos; - while ((NULL != pos) && (pos->timeout.abs_value < th->timeout.abs_value)) - { - prev = pos; - pos = pos->next; - } - GNUNET_CONTAINER_DLL_insert_after (pr->pending_head, pr->pending_tail, prev, - th); - pr->queue_size++; - /* was the request queue previously empty? */ + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == pr->ntr_task); + pr->ntr_task = + GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr); LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n"); - if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) && - (pr->next == NULL) && (pr->prev == NULL) && - (handle->ready_peer_head != pr)) - pr->ntr_task = - GNUNET_SCHEDULER_add_now (&run_request_next_transmission, pr); return th; } @@ -1408,33 +1337,61 @@ void GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) { struct PeerRecord *pr = th->peer; - struct GNUNET_CORE_Handle *h = pr->ch; - int was_head; + struct GNUNET_CORE_Handle *h; - was_head = (pr->pending_head == th); - GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); - pr->queue_size--; + GNUNET_assert (NULL != pr); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Aborting transmission request to core for %u bytes to `%s'\n", + (unsigned int) th->msize, + GNUNET_i2s (&pr->peer)); + th->peer = NULL; + h = pr->ch; if (NULL != th->cm) { /* we're currently in the control queue, remove */ GNUNET_CONTAINER_DLL_remove (h->control_pending_head, h->control_pending_tail, th->cm); GNUNET_free (th->cm); + th->cm = NULL; } - GNUNET_free (th); - if (was_head) + if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head)) { - if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head)) - { - /* the request that was 'approved' by core was - * canceled before it could be transmitted; remove - * us from the 'ready' list */ - GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); - } - if (NULL != h->client) - request_next_transmission (pr); + /* the request that was 'approved' by core was + * canceled before it could be transmitted; remove + * us from the 'ready' list */ + GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); + } + if (GNUNET_SCHEDULER_NO_TASK != pr->ntr_task) + { + GNUNET_SCHEDULER_cancel (pr->ntr_task); + pr->ntr_task = GNUNET_SCHEDULER_NO_TASK; } } +/** + * Check if the given peer is currently connected. This function is for special + * cirumstances (GNUNET_TESTBED uses it), normal users of the CORE API are + * expected to track which peers are connected based on the connect/disconnect + * callbacks from GNUNET_CORE_connect. This function is NOT part of the + * 'versioned', 'official' API. The difference between this function and the + * function GNUNET_CORE_is_peer_connected() is that this one returns + * synchronously after looking in the CORE API cache. The function + * GNUNET_CORE_is_peer_connected() sends a message to the CORE service and hence + * its response is given asynchronously. + * + * @param h the core handle + * @param pid the identity of the peer to check if it has been connected to us + * @return GNUNET_YES if the peer is connected to us; GNUNET_NO if not + */ +int +GNUNET_CORE_is_peer_connected_sync (const struct GNUNET_CORE_Handle *h, + const struct GNUNET_PeerIdentity *pid) +{ + GNUNET_assert (NULL != h); + GNUNET_assert (NULL != pid); + return GNUNET_CONTAINER_multihashmap_contains (h->peers, &pid->hashPubKey); +} + + /* end of core_api.c */ |