aboutsummaryrefslogtreecommitdiff
path: root/src/core/core_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/core_api.c')
-rw-r--r--src/core/core_api.c167
1 files changed, 57 insertions, 110 deletions
diff --git a/src/core/core_api.c b/src/core/core_api.c
index 66df134..526dc9f 100644
--- a/src/core/core_api.c
+++ b/src/core/core_api.c
@@ -367,9 +367,7 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
struct GNUNET_CORE_Handle *h = cls;
h->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service after delay\n");
-#endif
reconnect (h);
}
@@ -450,7 +448,7 @@ reconnect_later (struct GNUNET_CORE_Handle *h)
}
if (h->client != NULL)
{
- GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+ GNUNET_CLIENT_disconnect (h->client);
h->client = NULL;
}
h->currently_down = GNUNET_YES;
@@ -546,11 +544,9 @@ request_next_transmission (struct PeerRecord *pr)
smr->smr_id = htons (th->smr_id = pr->smr_id_gen++);
GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head,
h->control_pending_tail, cm);
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Adding SEND REQUEST for peer `%s' to message queue\n",
GNUNET_i2s (&pr->peer));
-#endif
trigger_next_request (h, GNUNET_NO);
}
@@ -580,10 +576,15 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
* us from the 'ready' list */
GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
}
-#if DEBUG_CORE
+ if (NULL != th->cm)
+ {
+ /* we're currently in the control queue, remove */
+ GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
+ h->control_pending_tail, th->cm);
+ GNUNET_free (th->cm);
+ }
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Signalling timeout of request for transmission to CORE service\n");
-#endif
request_next_transmission (pr);
GNUNET_assert (0 == th->get_message (th->get_message_cls, 0, NULL));
GNUNET_free (th);
@@ -592,6 +593,11 @@ transmission_timeout (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
/**
* Transmit the next message to the core service.
+ *
+ * @param cls closure with the 'struct GNUNET_CORE_Handle'
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
*/
static size_t
transmit_message (void *cls, size_t size, void *buf)
@@ -609,10 +615,8 @@ transmit_message (void *cls, size_t size, void *buf)
h->cth = NULL;
if (buf == NULL)
{
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Transmission failed, initiating reconnect\n");
-#endif
reconnect_later (h);
return 0;
}
@@ -626,11 +630,9 @@ transmit_message (void *cls, size_t size, void *buf)
trigger_next_request (h, GNUNET_NO);
return 0;
}
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting control message with %u bytes of type %u to core.\n",
(unsigned int) msize, (unsigned int) ntohs (hdr->type));
-#endif
memcpy (buf, hdr, msize);
GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
h->control_pending_tail, cm);
@@ -660,11 +662,9 @@ transmit_message (void *cls, size_t size, void *buf)
GNUNET_SCHEDULER_cancel (pr->timeout_task);
pr->timeout_task = GNUNET_SCHEDULER_NO_TASK;
}
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting SEND request to `%s' with %u bytes.\n",
GNUNET_i2s (&pr->peer), (unsigned int) th->msize);
-#endif
sm = (struct SendMessage *) buf;
sm->header.type = htons (GNUNET_MESSAGE_TYPE_CORE_SEND);
sm->priority = htonl (th->priority);
@@ -676,28 +676,22 @@ transmit_message (void *cls, size_t size, void *buf)
th->get_message (th->get_message_cls,
size - sizeof (struct SendMessage), &sm[1]);
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Transmitting SEND request to `%s' yielded %u bytes.\n",
GNUNET_i2s (&pr->peer), ret);
-#endif
GNUNET_free (th);
if (0 == ret)
{
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Size of clients message to peer %s is 0!\n",
GNUNET_i2s (&pr->peer));
-#endif
/* client decided to send nothing! */
request_next_transmission (pr);
return 0;
}
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Produced SEND message to core with %u bytes payload\n",
(unsigned int) ret);
-#endif
GNUNET_assert (ret >= sizeof (struct GNUNET_MessageHeader));
if (ret + sizeof (struct SendMessage) >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
@@ -729,17 +723,13 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO))
{
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Core connection down, not processing queue\n");
-#endif
return;
}
if (NULL != h->cth)
{
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG, "Request pending, not processing queue\n");
-#endif
return;
}
if (h->control_pending_head != NULL)
@@ -751,10 +741,8 @@ trigger_next_request (struct GNUNET_CORE_Handle *h, int ignore_currently_down)
h->ready_peer_head->pending_head->msize + sizeof (struct SendMessage);
else
{
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Request queue empty, not processing queue\n");
-#endif
return; /* no pending message */
}
h->cth =
@@ -791,7 +779,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
uint16_t et;
uint32_t ats_count;
- if (msg == NULL)
+ if (NULL == msg)
{
LOG (GNUNET_ERROR_TYPE_INFO,
_
@@ -800,11 +788,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
return;
}
msize = ntohs (msg->size);
-#if DEBUG_CORE > 2
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Processing message of type %u and size %u from core service\n",
ntohs (msg->type), msize);
-#endif
switch (ntohs (msg->type))
{
case GNUNET_MESSAGE_TYPE_CORE_INIT_REPLY:
@@ -828,22 +814,18 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
{
/* mark so we don't call init on reconnect */
h->init = NULL;
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG, "Connected to core service of peer `%s'.\n",
GNUNET_i2s (&h->me));
-#endif
init (h->cls, h, &h->me);
}
else
{
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Successfully reconnected to core service.\n");
-#endif
}
/* fake 'connect to self' */
pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &h->me.hashPubKey);
- GNUNET_assert (pr == NULL);
+ GNUNET_assert (NULL == pr);
pr = GNUNET_malloc (sizeof (struct PeerRecord));
pr->peer = h->me;
pr->ch = h;
@@ -871,11 +853,9 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
reconnect_later (h);
return;
}
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received notification about connection from `%s'.\n",
GNUNET_i2s (&cnm->peer));
-#endif
if (0 == memcmp (&h->me, &cnm->peer, sizeof (struct GNUNET_PeerIdentity)))
{
/* connect to self!? */
@@ -883,7 +863,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
return;
}
pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &cnm->peer.hashPubKey);
- if (pr != NULL)
+ if (NULL != pr)
{
GNUNET_break (0);
reconnect_later (h);
@@ -915,13 +895,11 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
return;
}
GNUNET_break (0 == ntohl (dnm->reserved));
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received notification about disconnect from `%s'.\n",
GNUNET_i2s (&dnm->peer));
-#endif
pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &dnm->peer.hashPubKey);
- if (pr == NULL)
+ if (NULL == pr)
{
GNUNET_break (0);
reconnect_later (h);
@@ -941,31 +919,21 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
return;
}
ntm = (const struct NotifyTrafficMessage *) msg;
-
ats_count = ntohl (ntm->ats_count);
if ((msize <
sizeof (struct NotifyTrafficMessage) +
ats_count * sizeof (struct GNUNET_ATS_Information) +
- sizeof (struct GNUNET_MessageHeader)) ||
- (GNUNET_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)))
+ sizeof (struct GNUNET_MessageHeader)) )
{
GNUNET_break (0);
reconnect_later (h);
return;
}
- em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
-#if DEBUG_CORE
+ ats = (const struct GNUNET_ATS_Information*) &ntm[1];
+ em = (const struct GNUNET_MessageHeader *) &ats[ats_count];
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received message of type %u and size %u from peer `%4s'\n",
ntohs (em->type), ntohs (em->size), GNUNET_i2s (&ntm->peer));
-#endif
- pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
- if (pr == NULL)
- {
- GNUNET_break (0);
- reconnect_later (h);
- return;
- }
if ((GNUNET_NO == h->inbound_hdr_only) &&
(msize !=
ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
@@ -989,8 +957,15 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
GNUNET_break_op (0);
continue;
}
+ pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
+ if (NULL == pr)
+ {
+ GNUNET_break (0);
+ reconnect_later (h);
+ return;
+ }
if (GNUNET_OK !=
- h->handlers[hpos].callback (h->cls, &ntm->peer, em, &ntm->ats,
+ h->handlers[hpos].callback (h->cls, &ntm->peer, em, ats,
ats_count))
{
/* error in processing, do not process other messages! */
@@ -998,7 +973,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
}
}
if (NULL != h->inbound_notify)
- h->inbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count);
+ h->inbound_notify (h->cls, &ntm->peer, em, ats, ats_count);
break;
case GNUNET_MESSAGE_TYPE_CORE_NOTIFY_OUTBOUND:
if (msize < sizeof (struct NotifyTrafficMessage))
@@ -1008,36 +983,21 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
return;
}
ntm = (const struct NotifyTrafficMessage *) msg;
- if (0 == memcmp (&h->me, &ntm->peer, sizeof (struct GNUNET_PeerIdentity)))
- {
- /* self-change!? */
- GNUNET_break (0);
- return;
- }
ats_count = ntohl (ntm->ats_count);
if ((msize <
sizeof (struct NotifyTrafficMessage) +
ats_count * sizeof (struct GNUNET_ATS_Information) +
- sizeof (struct GNUNET_MessageHeader)) ||
- (GNUNET_ATS_ARRAY_TERMINATOR != ntohl ((&ntm->ats)[ats_count].type)))
- {
- GNUNET_break (0);
- reconnect_later (h);
- return;
- }
- em = (const struct GNUNET_MessageHeader *) &(&ntm->ats)[ats_count + 1];
- pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &ntm->peer.hashPubKey);
- if (pr == NULL)
+ sizeof (struct GNUNET_MessageHeader)) )
{
GNUNET_break (0);
reconnect_later (h);
return;
}
-#if DEBUG_CORE
+ ats = (const struct GNUNET_ATS_Information*) &ntm[1];
+ em = (const struct GNUNET_MessageHeader *) &ats[ats_count];
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received notification about transmission to `%s'.\n",
GNUNET_i2s (&ntm->peer));
-#endif
if ((GNUNET_NO == h->outbound_hdr_only) &&
(msize !=
ntohs (em->size) + sizeof (struct NotifyTrafficMessage) +
@@ -1052,7 +1012,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
GNUNET_break (0);
break;
}
- h->outbound_notify (h->cls, &ntm->peer, em, &ntm->ats, ats_count);
+ h->outbound_notify (h->cls, &ntm->peer, em, ats, ats_count);
break;
case GNUNET_MESSAGE_TYPE_CORE_SEND_READY:
if (msize != sizeof (struct SendMessageReady))
@@ -1063,18 +1023,16 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
}
smr = (const struct SendMessageReady *) msg;
pr = GNUNET_CONTAINER_multihashmap_get (h->peers, &smr->peer.hashPubKey);
- if (pr == NULL)
+ if (NULL == pr)
{
GNUNET_break (0);
reconnect_later (h);
return;
}
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received notification about transmission readiness to `%s'.\n",
GNUNET_i2s (&smr->peer));
-#endif
- if (pr->pending_head == NULL)
+ if (NULL == pr->pending_head)
{
/* request must have been cancelled between the original request
* and the response from core, ignore core's readiness */
@@ -1088,7 +1046,7 @@ main_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg)
* ignore! (we should have already sent another request) */
break;
}
- if ((pr->prev != NULL) || (pr->next != NULL) || (h->ready_peer_head == pr))
+ if ((NULL != pr->prev) || (NULL != pr->next) || (h->ready_peer_head == pr))
{
/* we should not already be on the ready list... */
GNUNET_break (0);
@@ -1119,14 +1077,12 @@ init_done_task (void *cls, int success)
{
struct GNUNET_CORE_Handle *h = cls;
- if (success == GNUNET_SYSERR)
+ if (GNUNET_SYSERR == success)
return; /* shutdown */
- if (success == GNUNET_NO)
+ if (GNUNET_NO == success)
{
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Failed to exchange INIT with core, retrying\n");
-#endif
if (h->reconnect_task == GNUNET_SCHEDULER_NO_TASK)
reconnect_later (h);
return;
@@ -1152,13 +1108,10 @@ reconnect (struct GNUNET_CORE_Handle *h)
uint16_t *ts;
unsigned int hpos;
-#if DEBUG_CORE
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting to CORE service\n");
-#endif
- GNUNET_assert (h->client == NULL);
+ GNUNET_assert (NULL == h->client);
GNUNET_assert (h->currently_down == GNUNET_YES);
h->client = GNUNET_CLIENT_connect ("core", h->cfg);
- if (h->client == NULL)
+ if (NULL == h->client)
{
reconnect_later (h);
return;
@@ -1185,6 +1138,10 @@ reconnect (struct GNUNET_CORE_Handle *h)
else
opt |= GNUNET_CORE_OPTION_SEND_FULL_OUTBOUND;
}
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "(Re)connecting to CORE service, monitoring messages of type %u\n",
+ opt);
+
init->options = htonl (opt);
ts = (uint16_t *) & init[1];
for (hpos = 0; hpos < h->hcnt; hpos++)
@@ -1203,8 +1160,8 @@ reconnect (struct GNUNET_CORE_Handle *h)
* @param cfg configuration to use
* @param queue_size size of the per-peer message queue
* @param cls closure for the various callbacks that follow (including handlers in the handlers array)
- * @param init callback to call on timeout or once we have successfully
- * connected to the core service; note that timeout is only meaningful if init is not NULL
+ * @param init callback to call once we have successfully
+ * connected to the core service
* @param connects function to call on peer connect, can be NULL
* @param disconnects function to call on peer disconnect / timeout, can be NULL
* @param inbound_notify function to call for all inbound messages, can be NULL
@@ -1255,9 +1212,7 @@ GNUNET_CORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
GNUNET_assert (h->hcnt <
(GNUNET_SERVER_MAX_MESSAGE_SIZE -
sizeof (struct InitMessage)) / sizeof (uint16_t));
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to CORE service\n");
-#endif
reconnect (h);
return h;
}
@@ -1275,10 +1230,8 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
{
struct ControlMessage *cm;
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from CORE service\n");
-#endif
- if (handle->cth != NULL)
+ if (NULL != handle->cth)
{
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->cth);
handle->cth = NULL;
@@ -1287,15 +1240,15 @@ GNUNET_CORE_disconnect (struct GNUNET_CORE_Handle *handle)
{
GNUNET_CONTAINER_DLL_remove (handle->control_pending_head,
handle->control_pending_tail, cm);
- if (cm->th != NULL)
+ if (NULL != cm->th)
cm->th->cm = NULL;
- if (cm->cont != NULL)
+ if (NULL != cm->cont)
cm->cont (cm->cont_cls, GNUNET_SYSERR);
GNUNET_free (cm);
}
- if (handle->client != NULL)
+ if (NULL != handle->client)
{
- GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
+ GNUNET_CLIENT_disconnect (handle->client);
handle->client = NULL;
}
GNUNET_CONTAINER_multihashmap_iterate (handle->peers,
@@ -1340,8 +1293,7 @@ run_request_next_transmission (void *cls,
* @param cork is corking allowed for this transmission?
* @param priority how important is the message?
* @param maxdelay how long can the message wait?
- * @param target who should receive the message,
- * use NULL for this peer (loopback)
+ * @param target who should receive the message, never NULL (can be this peer's identity for loopback)
* @param notify_size how many bytes of buffer space does notify want?
* @param notify function to call when buffer space is available
* @param notify_cls closure for notify
@@ -1402,18 +1354,14 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
GNUNET_break (handle->queue_size != 0);
GNUNET_break (pr->queue_size == 1);
GNUNET_free (th);
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Dropping transmission request: cannot drop queue head and limit is one\n");
-#endif
return NULL;
}
if (priority <= minp->priority)
{
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Dropping transmission request: priority too low\n");
-#endif
GNUNET_free (th);
return NULL; /* priority too low */
}
@@ -1432,7 +1380,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
/* insertion sort */
prev = pos;
- while ((pos != NULL) && (pos->timeout.abs_value < th->timeout.abs_value))
+ while ((NULL != pos) && (pos->timeout.abs_value < th->timeout.abs_value))
{
prev = pos;
pos = pos->next;
@@ -1441,9 +1389,7 @@ GNUNET_CORE_notify_transmit_ready (struct GNUNET_CORE_Handle *handle, int cork,
th);
pr->queue_size++;
/* was the request queue previously empty? */
-#if DEBUG_CORE
LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission request added to queue\n");
-#endif
if ((pr->pending_head == th) && (pr->ntr_task == GNUNET_SCHEDULER_NO_TASK) &&
(pr->next == NULL) && (pr->prev == NULL) &&
(handle->ready_peer_head != pr))
@@ -1468,7 +1414,7 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
was_head = (pr->pending_head == th);
GNUNET_CONTAINER_DLL_remove (pr->pending_head, pr->pending_tail, th);
pr->queue_size--;
- if (th->cm != NULL)
+ if (NULL != th->cm)
{
/* we're currently in the control queue, remove */
GNUNET_CONTAINER_DLL_remove (h->control_pending_head,
@@ -1478,14 +1424,15 @@ GNUNET_CORE_notify_transmit_ready_cancel (struct GNUNET_CORE_TransmitHandle *th)
GNUNET_free (th);
if (was_head)
{
- if ((pr->prev != NULL) || (pr->next != NULL) || (pr == h->ready_peer_head))
+ if ((NULL != pr->prev) || (NULL != pr->next) || (pr == h->ready_peer_head))
{
/* the request that was 'approved' by core was
* canceled before it could be transmitted; remove
* us from the 'ready' list */
GNUNET_CONTAINER_DLL_remove (h->ready_peer_head, h->ready_peer_tail, pr);
}
- request_next_transmission (pr);
+ if (NULL != h->client)
+ request_next_transmission (pr);
}
}