diff options
Diffstat (limited to 'src/statistics/statistics_api.c')
-rw-r--r-- | src/statistics/statistics_api.c | 244 |
1 files changed, 163 insertions, 81 deletions
diff --git a/src/statistics/statistics_api.c b/src/statistics/statistics_api.c index 2838b70..e1b3698 100644 --- a/src/statistics/statistics_api.c +++ b/src/statistics/statistics_api.c @@ -280,7 +280,9 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h, size_t nlen; size_t nsize; - GNUNET_assert (h != NULL); + GNUNET_assert (NULL != h); + GNUNET_assert (NULL != watch); + slen = strlen (watch->subsystem) + 1; nlen = strlen (watch->name) + 1; nsize = sizeof (struct GNUNET_MessageHeader) + slen + nlen; @@ -335,14 +337,14 @@ do_disconnect (struct GNUNET_STATISTICS_Handle *h) } if (NULL != h->client) { - GNUNET_CLIENT_disconnect (h->client, GNUNET_NO); + GNUNET_CLIENT_disconnect (h->client); h->client = NULL; } h->receiving = GNUNET_NO; if (NULL != (c = h->current)) { h->current = NULL; - if (c->cont != NULL) + if (NULL != c->cont) c->cont (c->cls, GNUNET_SYSERR); free_action_item (c); } @@ -362,12 +364,12 @@ try_connect (struct GNUNET_STATISTICS_Handle *h) struct GNUNET_STATISTICS_GetHandle *gn; unsigned int i; - if (h->backoff_task != GNUNET_SCHEDULER_NO_TASK) + if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task) return GNUNET_NO; - if (h->client != NULL) + if (NULL != h->client) return GNUNET_YES; h->client = GNUNET_CLIENT_connect ("statistics", h->cfg); - if (h->client != NULL) + if (NULL != h->client) { gn = h->action_head; while (NULL != (gh = gn)) @@ -382,13 +384,14 @@ try_connect (struct GNUNET_STATISTICS_Handle *h) } } for (i = 0; i < h->watches_size; i++) - schedule_watch_request (h, h->watches[i]); + { + if (NULL != h->watches[i]) + schedule_watch_request (h, h->watches[i]); + } return GNUNET_YES; } -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, - _("Failed to connect to statistics service!\n")); -#endif + "Failed to connect to statistics service!\n"); return GNUNET_NO; } @@ -410,6 +413,22 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** + * Task used by 'reconnect_later' to shutdown the handle + * + * @param cls the statistics handle + * @param tc scheduler context + */ +static void +do_destroy (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STATISTICS_Handle *h = cls; + + GNUNET_STATISTICS_destroy (h, GNUNET_NO); +} + + +/** * Reconnect at a later time, respecting back-off. * * @param h statistics handle @@ -417,7 +436,29 @@ reconnect_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void reconnect_later (struct GNUNET_STATISTICS_Handle *h) { + int loss; + struct GNUNET_STATISTICS_GetHandle *gh; + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == h->backoff_task); + if (GNUNET_YES == h->do_destroy) + { + /* So we are shutting down and the service is not reachable. + * Chances are that it's down for good and we are not going to connect to + * it anymore. + * Give up and don't sync the rest of the data. + */ + loss = GNUNET_NO; + for (gh = h->action_head; NULL != gh; gh = gh->next) + if ( (gh->make_persistent) && (ACTION_SET == gh->type) ) + loss = GNUNET_YES; + if (GNUNET_YES == loss) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Could not save some persistent statistics\n")); + h->do_destroy = GNUNET_NO; + GNUNET_SCHEDULER_add_continuation (&do_destroy, h, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + return; + } h->backoff_task = GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h); h->backoff = GNUNET_TIME_relative_multiply (h->backoff, 2); @@ -444,9 +485,7 @@ process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h, if (h->current->aborted) { -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Iteration was aborted, ignoring VALUE\n"); -#endif return GNUNET_OK; /* don't bother */ } size = ntohs (msg->size); @@ -464,25 +503,19 @@ process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h, GNUNET_break (0); return GNUNET_SYSERR; } -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Received valid statistic on `%s:%s': %llu\n", service, name, GNUNET_ntohll (smsg->value)); -#endif if (GNUNET_OK != h->current->proc (h->current->cls, service, name, GNUNET_ntohll (smsg->value), 0 != (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT))) { -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Processing of remaining statistics aborted by client.\n"); -#endif h->current->aborted = GNUNET_YES; } -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "VALUE processed successfully\n"); -#endif return GNUNET_OK; } @@ -526,6 +559,16 @@ process_watch_value (struct GNUNET_STATISTICS_Handle *h, } +static void +destroy_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STATISTICS_Handle *h = cls; + + GNUNET_STATISTICS_destroy (h, GNUNET_NO); +} + + /** * Function called with messages from stats service. * @@ -539,22 +582,31 @@ receive_stats (void *cls, const struct GNUNET_MessageHeader *msg) struct GNUNET_STATISTICS_GetHandle *c; int ret; - if (msg == NULL) + if (NULL == msg) { -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, "Error receiving statistics from service, is the service running?\n"); -#endif do_disconnect (h); reconnect_later (h); return; } switch (ntohs (msg->type)) { + case GNUNET_MESSAGE_TYPE_TEST: + if (GNUNET_SYSERR != h->do_destroy) + { + /* not in shutdown, why do we get 'TEST'? */ + GNUNET_break (0); + do_disconnect (h); + reconnect_later (h); + return; + } + h->do_destroy = GNUNET_NO; + GNUNET_SCHEDULER_add_continuation (&destroy_task, h, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + break; case GNUNET_MESSAGE_TYPE_STATISTICS_END: -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Received end of statistics marker\n"); -#endif if (NULL == (c = h->current)) { GNUNET_break (0); @@ -574,7 +626,7 @@ receive_stats (void *cls, const struct GNUNET_MessageHeader *msg) } h->current = NULL; schedule_action (h); - if (c->cont != NULL) + if (NULL != c->cont) c->cont (c->cls, GNUNET_OK); free_action_item (c); return; @@ -586,10 +638,8 @@ receive_stats (void *cls, const struct GNUNET_MessageHeader *msg) return; } /* finally, look for more! */ -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Processing VALUE done, now reading more\n"); -#endif GNUNET_CLIENT_receive (h->client, &receive_stats, h, GNUNET_TIME_absolute_get_remaining (h-> current->timeout)); @@ -638,13 +688,11 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf) uint16_t msize; GNUNET_assert (NULL != (c = handle->current)); - if (buf == NULL) + if (NULL == buf) { /* timeout / error */ -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission of request for statistics failed!\n"); -#endif do_disconnect (handle); reconnect_later (handle); return 0; @@ -662,10 +710,8 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf) c->name)); if (GNUNET_YES != handle->receiving) { -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission of GET done, now reading response\n"); -#endif handle->receiving = GNUNET_YES; GNUNET_CLIENT_receive (handle->client, &receive_stats, handle, GNUNET_TIME_absolute_get_remaining (c->timeout)); @@ -691,21 +737,17 @@ transmit_watch (struct GNUNET_STATISTICS_Handle *handle, size_t size, void *buf) size_t slen2; uint16_t msize; - if (buf == NULL) + if (NULL == buf) { /* timeout / error */ -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission of request for statistics failed!\n"); -#endif do_disconnect (handle); reconnect_later (handle); return 0; } -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n", handle->current->name); -#endif slen1 = strlen (handle->current->subsystem) + 1; slen2 = strlen (handle->current->name) + 1; msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); @@ -833,8 +875,8 @@ GNUNET_STATISTICS_create (const char *subsystem, { struct GNUNET_STATISTICS_Handle *ret; - GNUNET_assert (subsystem != NULL); - GNUNET_assert (cfg != NULL); + GNUNET_assert (NULL != subsystem); + GNUNET_assert (NULL != cfg); ret = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_Handle)); ret->cfg = cfg; ret->subsystem = GNUNET_strdup (subsystem); @@ -859,8 +901,9 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first) struct GNUNET_TIME_Relative timeout; int i; - if (h == NULL) + if (NULL == h) return; + GNUNET_assert (GNUNET_NO == h->do_destroy); // Don't call twice. if (GNUNET_SCHEDULER_NO_TASK != h->backoff_task) { GNUNET_SCHEDULER_cancel (h->backoff_task); @@ -868,9 +911,9 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first) } if (sync_first) { - if (h->current != NULL) + if (NULL != h->current) { - if (h->current->type == ACTION_GET) + if (ACTION_GET == h->current->type) { GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); h->th = NULL; @@ -882,7 +925,7 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first) while (NULL != (pos = next)) { next = pos->next; - if (pos->type == ACTION_GET) + if (ACTION_GET == pos->type) { GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, @@ -896,24 +939,17 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first) h->action_tail, h->current); h->do_destroy = GNUNET_YES; - if ((h->current != NULL) && (h->th == NULL)) + if ((NULL != h->current) && (NULL == h->th) && + (NULL != h->client)) { - if (NULL == h->client) - { - /* instant-connect (regardless of back-off) to submit final value */ - h->client = GNUNET_CLIENT_connect ("statistics", h->cfg); - } - if (NULL != h->client) - { - timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout); - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize, - timeout, GNUNET_YES, - &transmit_action, h); - GNUNET_assert (NULL != h->th); - } + timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout); + h->th = + GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize, + timeout, GNUNET_YES, + &transmit_action, h); + GNUNET_assert (NULL != h->th); } - if (h->th != NULL) + if (NULL != h->th) return; /* do not finish destruction just yet */ } while (NULL != (pos = h->action_head)) @@ -939,6 +975,47 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, int sync_first) /** + * Function called to transmit TEST message to service to + * confirm that the service has received all of our 'SET' + * messages (during statistics disconnect/shutdown). + * + * @param cls the 'struct GNUNET_STATISTICS_Handle' + * @param size how many bytes can we write to buf + * @param buf where to write requests to the service + * @return number of bytes written to buf + */ +static size_t +transmit_test_on_shutdown (void *cls, + size_t size, + void *buf) +{ + struct GNUNET_STATISTICS_Handle *h = cls; + struct GNUNET_MessageHeader hdr; + + h->th = NULL; + if (NULL == buf) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n")); + h->do_destroy = GNUNET_NO; + GNUNET_SCHEDULER_add_continuation (&destroy_task, h, + GNUNET_SCHEDULER_REASON_PREREQ_DONE); + return 0; + } + hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST); + hdr.size = htons (sizeof (struct GNUNET_MessageHeader)); + memcpy (buf, &hdr, sizeof (hdr)); + if (GNUNET_YES != h->receiving) + { + h->receiving = GNUNET_YES; + GNUNET_CLIENT_receive (h->client, &receive_stats, h, + GNUNET_TIME_UNIT_FOREVER_REL); + } + return sizeof (struct GNUNET_MessageHeader); +} + + +/** * Schedule the next action to be performed. * * @param h statistics handle @@ -948,8 +1025,8 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h) { struct GNUNET_TIME_Relative timeout; - if ( (h->th != NULL) || - (h->backoff_task != GNUNET_SCHEDULER_NO_TASK) ) + if ( (NULL != h->th) || + (GNUNET_SCHEDULER_NO_TASK != h->backoff_task) ) return; /* action already pending */ if (GNUNET_YES != try_connect (h)) { @@ -962,10 +1039,14 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h) h->current = h->action_head; if (NULL == h->current) { - if (h->do_destroy) + if (GNUNET_YES == h->do_destroy) { - h->do_destroy = GNUNET_NO; - GNUNET_STATISTICS_destroy (h, GNUNET_YES); + h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */ + h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, + sizeof (struct GNUNET_MessageHeader), + SET_TRANSMIT_TIMEOUT, + GNUNET_NO, + &transmit_test_on_shutdown, h); } return; } @@ -977,10 +1058,8 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h) timeout, GNUNET_YES, &transmit_action, h))) { -#if DEBUG_STATISTICS LOG (GNUNET_ERROR_TYPE_DEBUG, "Failed to transmit request to statistics service.\n"); -#endif do_disconnect (h); reconnect_later (h); } @@ -996,6 +1075,7 @@ schedule_action (struct GNUNET_STATISTICS_Handle *h) * @param timeout after how long should we give up (and call * cont with an error code)? * @param cont continuation to call when done (can be NULL) + * This callback CANNOT destroy the statistics handle in the same call. * @param proc function to call on each value * @param cls closure for cont and proc * @return NULL on error @@ -1013,11 +1093,11 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle, if (NULL == handle) return NULL; - GNUNET_assert (proc != NULL); + GNUNET_assert (NULL != proc); GNUNET_assert (GNUNET_NO == handle->do_destroy); - if (subsystem == NULL) + if (NULL == subsystem) subsystem = ""; - if (name == NULL) + if (NULL == name) name = ""; slen1 = strlen (subsystem) + 1; slen2 = strlen (name) + 1; @@ -1082,7 +1162,7 @@ GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle, { struct GNUNET_STATISTICS_WatchEntry *w; - if (handle == NULL) + if (NULL == handle) return GNUNET_SYSERR; w = GNUNET_malloc (sizeof (struct GNUNET_STATISTICS_WatchEntry)); w->subsystem = GNUNET_strdup (subsystem); @@ -1113,11 +1193,13 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle, struct GNUNET_STATISTICS_WatchEntry *w; unsigned int i; - if (handle == NULL) + if (NULL == handle) return GNUNET_SYSERR; for (i=0;i<handle->watches_size;i++) { w = handle->watches[i]; + if (NULL == w) + continue; if ( (w->proc == proc) && (w->proc_cls == proc_cls) && (0 == strcmp (w->name, name)) && @@ -1154,8 +1236,8 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name, size_t nsize; int64_t delta; - GNUNET_assert (h != NULL); - GNUNET_assert (name != NULL); + GNUNET_assert (NULL != h); + GNUNET_assert (NULL != name); slen = strlen (h->subsystem) + 1; nlen = strlen (name) + 1; nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen; @@ -1164,16 +1246,16 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name, GNUNET_break (0); return; } - for (ai = h->action_head; ai != NULL; ai = ai->next) + for (ai = h->action_head; NULL != ai; ai = ai->next) { if (! ( (0 == strcmp (ai->subsystem, h->subsystem)) && (0 == strcmp (ai->name, name)) && - ( (ai->type == ACTION_UPDATE) || - (ai->type == ACTION_SET) ) ) ) + ( (ACTION_UPDATE == ai->type) || + (ACTION_SET == ai->type) ) ) ) continue; - if (ai->type == ACTION_SET) + if (ACTION_SET == ai->type) { - if (type == ACTION_UPDATE) + if (ACTION_UPDATE == type) { delta = (int64_t) value; if (delta > 0) @@ -1198,7 +1280,7 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, const char *name, } else { - if (type == ACTION_UPDATE) + if (ACTION_UPDATE == type) { /* make delta cummulative */ delta = (int64_t) value; @@ -1244,7 +1326,7 @@ void GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle, const char *name, uint64_t value, int make_persistent) { - if (handle == NULL) + if (NULL == handle) return; GNUNET_assert (GNUNET_NO == handle->do_destroy); add_setter_action (handle, name, make_persistent, value, ACTION_SET); @@ -1264,9 +1346,9 @@ void GNUNET_STATISTICS_update (struct GNUNET_STATISTICS_Handle *handle, const char *name, int64_t delta, int make_persistent) { - if (handle == NULL) + if (NULL == handle) return; - if (delta == 0) + if (0 == delta) return; GNUNET_assert (GNUNET_NO == handle->do_destroy); add_setter_action (handle, name, make_persistent, (uint64_t) delta, |