diff options
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r-- | src/core/core_api.c | 167 |
1 files changed, 57 insertions, 110 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c index 66df134..526dc9f 100644 --- a/src/core/core_api.c +++ b/src/core/core_api.c @@ -367,9 +367,7 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct GNUNET_CORE_Handle *h = cls; h->reconnect_task = GNUNET_SCHEDULER_NO_TASK; -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n"); -#endif reconnect (h); } @@ -450,7 +448,7 @@ reconnect_later (struct GNUNET_CORE_Handle *h) } if (h->client != NULL) { - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + GNUNET_CLIENT_disconnect (h->client); h->client = NULL; } h->currently_down = GNUNET_YES; @@ -546,11 +544,9 @@ request_next_transmission (struct PeerRecord *pr) smr->smr_id = htons (th->smr_id = pr->smr_id_gen++); GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, h->control_pending_tail, cm); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Adding SEND REQUEST for peer `%s' to message queue\n", GNUNET_i2s (&pr->peer)); -#endif trigger_next_request (h, GNUNET_NO); } @@ -580,10 +576,15 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * us from the 'ready' list */ GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); } -#if DEBUG_CORE + if (NULL != th->cm) + { + /* we're currently in the control queue, remove */ + GNUNET_CONTAINER_DLL_remove (h->control_pending_head, + h->control_pending_tail, th->cm); + GNUNET_free (th->cm); + } LOG (GNUNET_ERROR_TYPE_DEBUG, "Signalling timeout of request for transmission to CORE service\n"); -#endif request_next_transmission (pr); GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL)); GNUNET_free (th); @@ -592,6 +593,11 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** * Transmit the next message to the core service. + * + * @param cls closure with the 'struct GNUNET_CORE_Handle' + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf */ static size_t transmit_message (void *cls, size_t size, void *buf) @@ -609,10 +615,8 @@ transmit_message (void *cls, size_t size, void *buf) h->cth = NULL; if (buf == NULL) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission failed, initiating reconnect\n"); -#endif reconnect_later (h); return 0; } @@ -626,11 +630,9 @@ transmit_message (void *cls, size_t size, void *buf) trigger_next_request (h, GNUNET_NO); return 0; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting control message with %u bytes of type %u to core.\n", (unsigned int) msize, (unsigned int) ntohs (hdr->type)); -#endif memcpy (buf, hdr, msize); GNUNET_CONTAINER_DLL_remove (h->control_pending_head, h->control_pending_tail, cm); @@ -660,11 +662,9 @@ transmit_message (void *cls, size_t size, void *buf) GNUNET_SCHEDULER_cancel (pr->timeout_task); pr->timeout_task = GNUNET_SCHEDULER_NO_TASK; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting SEND request to `%s' with %u bytes.\n", GNUNET_i2s (&pr->peer), (unsigned int) th->msize); -#endif sm = (struct SendMessage *) buf; sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND); sm->priority = htonl (th->priority); @@ -676,28 +676,22 @@ transmit_message (void *cls, size_t size, void *buf) th->get_message (th->get_message_cls, size - sizeof (struct SendMessage), &sm[1]); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting SEND request to `%s' yielded %u bytes.\n", GNUNET_i2s (&pr->peer), ret); -#endif GNUNET_free (th); if (0 == ret) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Size of clients message to peer %s is 0!\n", GNUNET_i2s (&pr->peer)); -#endif /* client decided to send nothing! */ request_next_transmission (pr); return 0; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Produced SEND message to core with %u bytes payload\n", (unsigned int) ret); -#endif GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader)); if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { @@ -729,17 +723,13 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down) if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO)) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Core connection down, not processing queue\n"); -#endif return; } if (NULL != h->cth) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n"); -#endif return; } if (h->control_pending_head != NULL) @@ -751,10 +741,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down) h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage); else { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Request queue empty, not processing queue\n"); -#endif return; /* no pending message */ } h->cth = @@ -791,7 +779,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) uint16_t et; uint32_t ats_count; - if (msg == NULL) + if (NULL == msg) { LOG (GNUNET_ERROR_TYPE_INFO, _ @@ -800,11 +788,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } msize = ntohs (msg->size); -#if DEBUG_CORE > 2 LOG (GNUNET_ERROR_TYPE_DEBUG, "Processing message of type %u and size %u from core service\n", ntohs (msg->type), msize); -#endif switch (ntohs (msg->type)) { case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY: @@ -828,22 +814,18 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) { /* mark so we don't call init on reconnect */ h->init = NULL; -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n", GNUNET_i2s (&h->me)); -#endif init (h->cls, h, &h->me); } else { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Successfully reconnected to core service.\n"); -#endif } /* fake 'connect to self' */ pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey); - GNUNET_assert (pr == NULL); + GNUNET_assert (NULL == pr); pr = GNUNET_malloc (sizeof (struct PeerRecord)); pr->peer = h->me; pr->ch = h; @@ -871,11 +853,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) reconnect_later (h); return; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about connection from `%s'.\n", GNUNET_i2s (&cnm->peer)); -#endif if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity))) { /* connect to self!? */ @@ -883,7 +863,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey); - if (pr != NULL) + if (NULL != pr) { GNUNET_break (0); reconnect_later (h); @@ -915,13 +895,11 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } GNUNET_break (0 == ntohl (dnm->reserved)); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about disconnect from `%s'.\n", GNUNET_i2s (&dnm->peer)); -#endif pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey); - if (pr == NULL) + if (NULL == pr) { GNUNET_break (0); reconnect_later (h); @@ -941,31 +919,21 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } ntm = (const struct NotifyTrafficMessage *) msg; - ats_count = ntohl (ntm->ats_count); if ((msize < sizeof (struct NotifyTrafficMessage) + ats_count * sizeof (struct GNUNET_ATS_Information) + - sizeof (struct GNUNET_MessageHeader)) || - (GNUNET_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type))) + sizeof (struct GNUNET_MessageHeader)) ) { GNUNET_break (0); reconnect_later (h); return; } - em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1]; -#if DEBUG_CORE + ats = (const struct GNUNET_ATS_Information*) &ntm[1]; + em = (const struct GNUNET_MessageHeader *) &ats[ats_count]; LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %u and size %u from peer `%4s'\n", ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer)); -#endif - pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey); - if (pr == NULL) - { - GNUNET_break (0); - reconnect_later (h); - return; - } if ((GNUNET_NO == h->inbound_hdr_only) && (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) + @@ -989,8 +957,15 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_break_op (0); continue; } + pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey); + if (NULL == pr) + { + GNUNET_break (0); + reconnect_later (h); + return; + } if (GNUNET_OK != - h->handlers[hpos].callback (h->cls, &ntm->peer, em, &ntm->ats, + h->handlers[hpos].callback (h->cls, &ntm->peer, em, ats, ats_count)) { /* error in processing, do not process other messages! */ @@ -998,7 +973,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) } } if (NULL != h->inbound_notify) - h->inbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count); + h->inbound_notify (h->cls, &ntm->peer, em, ats, ats_count); break; case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND: if (msize < sizeof (struct NotifyTrafficMessage)) @@ -1008,36 +983,21 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) return; } ntm = (const struct NotifyTrafficMessage *) msg; - if (0 == memcmp (&h->me, &ntm->peer, sizeof (struct GNUNET_PeerIdentity))) - { - /* self-change!? */ - GNUNET_break (0); - return; - } ats_count = ntohl (ntm->ats_count); if ((msize < sizeof (struct NotifyTrafficMessage) + ats_count * sizeof (struct GNUNET_ATS_Information) + - sizeof (struct GNUNET_MessageHeader)) || - (GNUNET_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type))) - { - GNUNET_break (0); - reconnect_later (h); - return; - } - em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1]; - pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey); - if (pr == NULL) + sizeof (struct GNUNET_MessageHeader)) ) { GNUNET_break (0); reconnect_later (h); return; } -#if DEBUG_CORE + ats = (const struct GNUNET_ATS_Information*) &ntm[1]; + em = (const struct GNUNET_MessageHeader *) &ats[ats_count]; LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about transmission to `%s'.\n", GNUNET_i2s (&ntm->peer)); -#endif if ((GNUNET_NO == h->outbound_hdr_only) && (msize != ntohs (em->size) + sizeof (struct NotifyTrafficMessage) + @@ -1052,7 +1012,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_break (0); break; } - h->outbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count); + h->outbound_notify (h->cls, &ntm->peer, em, ats, ats_count); break; case GNUNET_MESSAGE_TYPE_CORE_SEND_READY: if (msize != sizeof (struct SendMessageReady)) @@ -1063,18 +1023,16 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) } smr = (const struct SendMessageReady *) msg; pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey); - if (pr == NULL) + if (NULL == pr) { GNUNET_break (0); reconnect_later (h); return; } -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Received notification about transmission readiness to `%s'.\n", GNUNET_i2s (&smr->peer)); -#endif - if (pr->pending_head == NULL) + if (NULL == pr->pending_head) { /* request must have been cancelled between the original request * and the response from core, ignore core's readiness */ @@ -1088,7 +1046,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) * ignore! (we should have already sent another request) */ break; } - if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr)) + if ((NULL != pr->prev) || (NULL != pr->next) || (h->ready_peer_head == pr)) { /* we should not already be on the ready list... */ GNUNET_break (0); @@ -1119,14 +1077,12 @@ init_done_task (void *cls, int success) { struct GNUNET_CORE_Handle *h = cls; - if (success == GNUNET_SYSERR) + if (GNUNET_SYSERR == success) return; /* shutdown */ - if (success == GNUNET_NO) + if (GNUNET_NO == success) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to exchange INIT with core, retrying\n"); -#endif if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK) reconnect_later (h); return; @@ -1152,13 +1108,10 @@ reconnect (struct GNUNET_CORE_Handle *h) uint16_t *ts; unsigned int hpos; -#if DEBUG_CORE - LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to CORE service\n"); -#endif - GNUNET_assert (h->client == NULL); + GNUNET_assert (NULL == h->client); GNUNET_assert (h->currently_down == GNUNET_YES); h->client = GNUNET_CLIENT_connect ("core", h->cfg); - if (h->client == NULL) + if (NULL == h->client) { reconnect_later (h); return; @@ -1185,6 +1138,10 @@ reconnect (struct GNUNET_CORE_Handle *h) else opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND; } + LOG (GNUNET_ERROR_TYPE_INFO, + "(Re)connecting to CORE service, monitoring messages of type %u\n", + opt); + init->options = htonl (opt); ts = (uint16_t *) & init[1]; for (hpos = 0; hpos < h->hcnt; hpos++) @@ -1203,8 +1160,8 @@ reconnect (struct GNUNET_CORE_Handle *h) * @param cfg configuration to use * @param queue_size size of the per-peer message queue * @param cls closure for the various callbacks that follow (including handlers in the handlers array) - * @param init callback to call on timeout or once we have successfully - * connected to the core service; note that timeout is only meaningful if init is not NULL + * @param init callback to call once we have successfully + * connected to the core service * @param connects function to call on peer connect, can be NULL * @param disconnects function to call on peer disconnect / timeout, can be NULL * @param inbound_notify function to call for all inbound messages, can be NULL @@ -1255,9 +1212,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_assert (h->hcnt < (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct InitMessage)) / sizeof (uint16_t)); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n"); -#endif reconnect (h); return h; } @@ -1275,10 +1230,8 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) { struct ControlMessage *cm; -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n"); -#endif - if (handle->cth != NULL) + if (NULL != handle->cth) { GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth); handle->cth = NULL; @@ -1287,15 +1240,15 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle) { GNUNET_CONTAINER_DLL_remove (handle->control_pending_head, handle->control_pending_tail, cm); - if (cm->th != NULL) + if (NULL != cm->th) cm->th->cm = NULL; - if (cm->cont != NULL) + if (NULL != cm->cont) cm->cont (cm->cont_cls, GNUNET_SYSERR); GNUNET_free (cm); } - if (handle->client != NULL) + if (NULL != handle->client) { - GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); + GNUNET_CLIENT_disconnect (handle->client); handle->client = NULL; } GNUNET_CONTAINER_multihashmap_iterate (handle->peers, @@ -1340,8 +1293,7 @@ run_request_next_transmission (void *cls, * @param cork is corking allowed for this transmission? * @param priority how important is the message? * @param maxdelay how long can the message wait? - * @param target who should receive the message, - * use NULL for this peer (loopback) + * @param target who should receive the message, never NULL (can be this peer's identity for loopback) * @param notify_size how many bytes of buffer space does notify want? * @param notify function to call when buffer space is available * @param notify_cls closure for notify @@ -1402,18 +1354,14 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, GNUNET_break (handle->queue_size != 0); GNUNET_break (pr->queue_size == 1); GNUNET_free (th); -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping transmission request: cannot drop queue head and limit is one\n"); -#endif return NULL; } if (priority <= minp->priority) { -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Dropping transmission request: priority too low\n"); -#endif GNUNET_free (th); return NULL; /* priority too low */ } @@ -1432,7 +1380,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, /* insertion sort */ prev = pos; - while ((pos != NULL) && (pos->timeout.abs_value < th->timeout.abs_value)) + while ((NULL != pos) && (pos->timeout.abs_value < th->timeout.abs_value)) { prev = pos; pos = pos->next; @@ -1441,9 +1389,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork, th); pr->queue_size++; /* was the request queue previously empty? */ -#if DEBUG_CORE LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n"); -#endif if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) && (pr->next == NULL) && (pr->prev == NULL) && (handle->ready_peer_head != pr)) @@ -1468,7 +1414,7 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) was_head = (pr->pending_head == th); GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th); pr->queue_size--; - if (th->cm != NULL) + if (NULL != th->cm) { /* we're currently in the control queue, remove */ GNUNET_CONTAINER_DLL_remove (h->control_pending_head, @@ -1478,14 +1424,15 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th) GNUNET_free (th); if (was_head) { - if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head)) + if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head)) { /* the request that was 'approved' by core was * canceled before it could be transmitted; remove * us from the 'ready' list */ GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr); } - request_next_transmission (pr); + if (NULL != h->client) + request_next_transmission (pr); } } |