From 3d7b29ec1c5d1c2de96cf4c9badaa112e86ef899 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 21 Jun 2016 18:29:03 +0000 Subject: update statistics API to use new MQ API style, also get rid of timeout argument --- src/core/test_core_quota_compliance.c | 16 +- src/datastore/gnunet-service-datastore.c | 35 +- src/fs/perf_gnunet_service_fs_p2p.c | 2 +- src/fs/perf_gnunet_service_fs_p2p_respect.c | 2 +- src/hostlist/gnunet-daemon-hostlist_client.c | 36 +- src/include/gnunet_statistics_service.h | 15 +- src/include/gnunet_strings_lib.h | 2 +- src/regex/gnunet-regex-profiler.c | 1 - src/set/gnunet-set-profiler.c | 5 +- src/statistics/Makefile.am | 22 +- src/statistics/gnunet-statistics.c | 88 ++- src/statistics/statistics_api.c | 934 +++++++++++---------------- src/statistics/test_statistics_api.c | 86 ++- src/statistics/test_statistics_api_loop.c | 2 +- src/testbed/testbed_api_statistics.c | 7 +- 15 files changed, 616 insertions(+), 637 deletions(-) diff --git a/src/core/test_core_quota_compliance.c b/src/core/test_core_quota_compliance.c index 59a3e8a7fb..05b1ae3d9f 100644 --- a/src/core/test_core_quota_compliance.c +++ b/src/core/test_core_quota_compliance.c @@ -242,29 +242,29 @@ measurement_stop (void *cls) else ok = 0; /* pass */ GNUNET_STATISTICS_get (p1.stats, "core", "# discarded CORE_SEND requests", - GNUNET_TIME_UNIT_FOREVER_REL, NULL, &print_stat, &p1); + NULL, &print_stat, &p1); GNUNET_STATISTICS_get (p1.stats, "core", "# discarded CORE_SEND request bytes", - GNUNET_TIME_UNIT_FOREVER_REL, NULL, &print_stat, &p1); + NULL, &print_stat, &p1); GNUNET_STATISTICS_get (p1.stats, "core", "# discarded lower priority CORE_SEND requests", - GNUNET_TIME_UNIT_FOREVER_REL, NULL, &print_stat, NULL); + NULL, &print_stat, NULL); GNUNET_STATISTICS_get (p1.stats, "core", "# discarded lower priority CORE_SEND request bytes", - GNUNET_TIME_UNIT_FOREVER_REL, NULL, &print_stat, &p1); + NULL, &print_stat, &p1); GNUNET_STATISTICS_get (p2.stats, "core", "# discarded CORE_SEND requests", - GNUNET_TIME_UNIT_FOREVER_REL, NULL, &print_stat, &p2); + NULL, &print_stat, &p2); GNUNET_STATISTICS_get (p2.stats, "core", "# discarded CORE_SEND request bytes", - GNUNET_TIME_UNIT_FOREVER_REL, NULL, &print_stat, &p2); + NULL, &print_stat, &p2); GNUNET_STATISTICS_get (p2.stats, "core", "# discarded lower priority CORE_SEND requests", - GNUNET_TIME_UNIT_FOREVER_REL, NULL, &print_stat, &p2); + NULL, &print_stat, &p2); GNUNET_STATISTICS_get (p2.stats, "core", "# discarded lower priority CORE_SEND request bytes", - GNUNET_TIME_UNIT_FOREVER_REL, NULL, &print_stat, &p2); + NULL, &print_stat, &p2); if (ok != 0) kind = GNUNET_ERROR_TYPE_ERROR; diff --git a/src/datastore/gnunet-service-datastore.c b/src/datastore/gnunet-service-datastore.c index 64c3640ad8..a67d1c7725 100644 --- a/src/datastore/gnunet-service-datastore.c +++ b/src/datastore/gnunet-service-datastore.c @@ -53,6 +53,11 @@ */ static char *quota_stat_name; +/** + * Task to timeout stat GET. + */ +static struct GNUNET_SCHEDULER_Task *stat_timeout_task; + /** * After how many payload-changing operations * do we sync our statistics? @@ -1526,8 +1531,12 @@ static void process_stat_done (void *cls, int success) { - stat_get = NULL; + if (NULL != stat_timeout_task) + { + GNUNET_SCHEDULER_cancel (stat_timeout_task); + stat_timeout_task = NULL; + } plugin = load_plugin (); if (NULL == plugin) { @@ -1575,6 +1584,20 @@ process_stat_done (void *cls, } +/** + * Fetching stats took to long, run without. + * + * @param cls NULL + */ +static void +stat_timeout (void *cls) +{ + stat_timeout_task = NULL; + GNUNET_STATISTICS_get_cancel (stat_get); + process_stat_done (NULL, GNUNET_NO); +} + + /** * Task run during shutdown. */ @@ -1617,6 +1640,11 @@ cleaning_task (void *cls) GNUNET_STATISTICS_get_cancel (stat_get); stat_get = NULL; } + if (NULL != stat_timeout_task) + { + GNUNET_SCHEDULER_cancel (stat_timeout_task); + stat_timeout_task = NULL; + } GNUNET_free_non_null (plugin_name); plugin_name = NULL; if (last_sync > 0) @@ -1813,12 +1841,15 @@ run (void *cls, GNUNET_STATISTICS_get (stats, "datastore", quota_stat_name, - GNUNET_TIME_UNIT_SECONDS, &process_stat_done, &process_stat_in, NULL); if (NULL == stat_get) process_stat_done (NULL, GNUNET_SYSERR); + else + stat_timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &stat_timeout, + NULL); GNUNET_SERVER_disconnect_notify (server, &cleanup_reservations, NULL); diff --git a/src/fs/perf_gnunet_service_fs_p2p.c b/src/fs/perf_gnunet_service_fs_p2p.c index 40b2b977b6..5211beef8a 100644 --- a/src/fs/perf_gnunet_service_fs_p2p.c +++ b/src/fs/perf_gnunet_service_fs_p2p.c @@ -213,7 +213,7 @@ stat_run (void *cls, #else stats[sm->value].subsystem, stats[sm->value].name, #endif - GNUNET_TIME_UNIT_FOREVER_REL, &get_done, &print_stat, + &get_done, &print_stat, sm); return; } diff --git a/src/fs/perf_gnunet_service_fs_p2p_respect.c b/src/fs/perf_gnunet_service_fs_p2p_respect.c index 2527c89784..8098afbe96 100644 --- a/src/fs/perf_gnunet_service_fs_p2p_respect.c +++ b/src/fs/perf_gnunet_service_fs_p2p_respect.c @@ -254,7 +254,7 @@ stat_run (void *cls, #else stats[sm->value].subsystem, stats[sm->value].name, #endif - GNUNET_TIME_UNIT_FOREVER_REL, &get_done, &print_stat, + &get_done, &print_stat, sm); return; } diff --git a/src/hostlist/gnunet-daemon-hostlist_client.c b/src/hostlist/gnunet-daemon-hostlist_client.c index 15a82c2d54..df0cabe1d3 100644 --- a/src/hostlist/gnunet-daemon-hostlist_client.c +++ b/src/hostlist/gnunet-daemon-hostlist_client.c @@ -1265,8 +1265,14 @@ handler_advertisement (void *cls, const struct GNUNET_PeerIdentity *peer, * successfully obtained, #GNUNET_SYSERR if not. */ static void -primary_task (void *cls, int success) +primary_task (void *cls, + int success) { + if (NULL != ti_check_download) + { + GNUNET_SCHEDULER_cancel (ti_check_download); + ti_check_download = NULL; + } sget = NULL; GNUNET_assert (NULL != stats); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1275,6 +1281,24 @@ primary_task (void *cls, int success) } +/** + * Continuation called by the statistics code once + * we go the stat. Initiates hostlist download scheduling. + * + * @param cls closure + * @param success #GNUNET_OK if statistics were + * successfully obtained, #GNUNET_SYSERR if not. + */ +static void +stat_timeout_task (void *cls) +{ + GNUNET_STATISTICS_get_cancel (sget); + sget = NULL; + ti_check_download = GNUNET_SCHEDULER_add_now (&task_check, + NULL); +} + + /** * We've received the previous delay value from statistics. Remember it. * @@ -1637,7 +1661,6 @@ GNUNET_HOSTLIST_client_start (const struct GNUNET_CONFIGURATION_Handle *c, sget = GNUNET_STATISTICS_get (stats, "hostlist", gettext_noop ("# milliseconds between hostlist downloads"), - GNUNET_TIME_UNIT_MINUTES, &primary_task, &process_stat, NULL); @@ -1645,7 +1668,14 @@ GNUNET_HOSTLIST_client_start (const struct GNUNET_CONFIGURATION_Handle *c, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Statistics request failed, scheduling hostlist download\n"); - ti_check_download = GNUNET_SCHEDULER_add_now (&task_check, NULL); + ti_check_download = GNUNET_SCHEDULER_add_now (&task_check, + NULL); + } + else + { + ti_check_download = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &stat_timeout_task, + NULL); } return GNUNET_OK; } diff --git a/src/include/gnunet_statistics_service.h b/src/include/gnunet_statistics_service.h index 715f06130c..2765a07a9e 100644 --- a/src/include/gnunet_statistics_service.h +++ b/src/include/gnunet_statistics_service.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet - Copyright (C) 2009-2013 GNUnet e.V. + Copyright (C) 2009-2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -138,14 +138,15 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle, /** - * Continuation called by #GNUNET_STATISTICS_get functions. + * Continuation called by #GNUNET_STATISTICS_get() functions. * * @param cls closure * @param success #GNUNET_OK if statistics were * successfully obtained, #GNUNET_SYSERR if not. */ -typedef void (*GNUNET_STATISTICS_Callback) (void *cls, - int success); +typedef void +(*GNUNET_STATISTICS_Callback) (void *cls, + int success); /** @@ -160,8 +161,6 @@ struct GNUNET_STATISTICS_GetHandle; * @param handle identification of the statistics service * @param subsystem limit to the specified subsystem, NULL for all subsystems * @param name name of the statistic value, NULL for all values - * @param timeout after how long should we give up (and call - * notify with buf NULL and size 0)? * @param cont continuation to call when done (can be NULL) * This callback CANNOT destroy the statistics handle in the same call. * @param proc function to call on each value @@ -172,9 +171,9 @@ struct GNUNET_STATISTICS_GetHandle * GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle, const char *subsystem, const char *name, - struct GNUNET_TIME_Relative timeout, GNUNET_STATISTICS_Callback cont, - GNUNET_STATISTICS_Iterator proc, void *cls); + GNUNET_STATISTICS_Iterator proc, + void *cls); /** diff --git a/src/include/gnunet_strings_lib.h b/src/include/gnunet_strings_lib.h index 03276134c9..15d4f57ac0 100644 --- a/src/include/gnunet_strings_lib.h +++ b/src/include/gnunet_strings_lib.h @@ -222,7 +222,7 @@ GNUNET_STRINGS_buffer_fill (char *buffer, * in the buffer and assign the count (varargs) of type "const char**" * to the locations of the respective strings in the buffer. * - * @param buffer the buffer to parse + * @param buffer the buffer to parse FIXME: not 'const', is it? * @param size size of the @a buffer * @param count number of strings to locate * @param ... pointers to where to store the strings diff --git a/src/regex/gnunet-regex-profiler.c b/src/regex/gnunet-regex-profiler.c index f656818488..2114e2cb15 100644 --- a/src/regex/gnunet-regex-profiler.c +++ b/src/regex/gnunet-regex-profiler.c @@ -663,7 +663,6 @@ stats_connect_cb (void *cls, peer->stats_handle = ca_result; if (NULL == GNUNET_STATISTICS_get (peer->stats_handle, NULL, NULL, - GNUNET_TIME_UNIT_FOREVER_REL, &stats_cb, &stats_iterator, peer)) { diff --git a/src/set/gnunet-set-profiler.c b/src/set/gnunet-set-profiler.c index 6d97f0bb1f..df94435293 100644 --- a/src/set/gnunet-set-profiler.c +++ b/src/set/gnunet-set-profiler.c @@ -149,8 +149,9 @@ check_all_done (void) } statistics_file = fopen (statistics_filename, "w"); - GNUNET_STATISTICS_get (statistics, NULL, NULL, GNUNET_TIME_UNIT_FOREVER_REL, - statistics_done, statistics_result, NULL); + GNUNET_STATISTICS_get (statistics, NULL, NULL, + &statistics_done, + &statistics_result, NULL); } diff --git a/src/statistics/Makefile.am b/src/statistics/Makefile.am index 1168fd4765..0907c4a371 100644 --- a/src/statistics/Makefile.am +++ b/src/statistics/Makefile.am @@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/src/include if MINGW - WINFLAGS = -Wl,--no-undefined -Wl,--export-all-symbols + WINFLAGS = -Wl,--no-undefined -Wl,--export-all-symbols endif if USE_COVERAGE @@ -26,23 +26,23 @@ libgnunetstatistics_la_LIBADD = \ $(GN_LIBINTL) $(XLIB) libgnunetstatistics_la_LDFLAGS = \ $(GN_LIB_LDFLAGS) $(WINFLAGS) \ - -version-info 1:3:1 + -version-info 2:0:0 libexec_PROGRAMS = \ gnunet-service-statistics bin_PROGRAMS = \ - gnunet-statistics + gnunet-statistics gnunet_statistics_SOURCES = \ - gnunet-statistics.c + gnunet-statistics.c gnunet_statistics_LDADD = \ libgnunetstatistics.la \ $(top_builddir)/src/util/libgnunetutil.la \ $(GN_LIBINTL) gnunet_service_statistics_SOURCES = \ - gnunet-service-statistics.c + gnunet-service-statistics.c gnunet_service_statistics_LDADD = \ libgnunetstatistics.la \ $(top_builddir)/src/util/libgnunetutil.la \ @@ -52,7 +52,7 @@ check_PROGRAMS = \ test_statistics_api \ test_statistics_api_loop \ test_statistics_api_watch \ - test_statistics_api_watch_zero_value + test_statistics_api_watch_zero_value if ENABLE_TEST_RUN AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH; @@ -63,25 +63,25 @@ test_statistics_api_SOURCES = \ test_statistics_api.c test_statistics_api_LDADD = \ libgnunetstatistics.la \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la test_statistics_api_loop_SOURCES = \ test_statistics_api_loop.c test_statistics_api_loop_LDADD = \ libgnunetstatistics.la \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la test_statistics_api_watch_SOURCES = \ test_statistics_api_watch.c test_statistics_api_watch_LDADD = \ libgnunetstatistics.la \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la test_statistics_api_watch_zero_value_SOURCES = \ test_statistics_api_watch_zero_value.c test_statistics_api_watch_zero_value_LDADD = \ libgnunetstatistics.la \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la if HAVE_PYTHON check_SCRIPTS = \ @@ -101,5 +101,3 @@ test_gnunet_statistics.py: test_gnunet_statistics.py.in Makefile EXTRA_DIST = \ test_statistics_api_data.conf \ test_gnunet_statistics.py.in - - diff --git a/src/statistics/gnunet-statistics.c b/src/statistics/gnunet-statistics.c index f21da6059d..192a450ac6 100644 --- a/src/statistics/gnunet-statistics.c +++ b/src/statistics/gnunet-statistics.c @@ -29,7 +29,6 @@ #include "gnunet_statistics_service.h" #include "statistics.h" -#define GET_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) /** * Final status code. @@ -69,7 +68,7 @@ static char *remote_host; /** * Remote host's port */ -static unsigned long long remote_port; +static unsigned long long remote_port; /** * Value to set @@ -81,6 +80,11 @@ static unsigned long long set_val; */ static int set_value; +/** + * Handle for pending GET operation. + */ +static struct GNUNET_STATISTICS_GetHandle *gh; + /** * Callback function to process statistic values. @@ -134,10 +138,11 @@ printer (void *cls, * successfully obtained, #GNUNET_SYSERR if not. */ static void -cleanup (void *cls, int success) +cleanup (void *cls, + int success) { - - if (success != GNUNET_OK) + gh = NULL; + if (GNUNET_OK != success) { if (NULL == remote_host) FPRINTF (stderr, @@ -166,12 +171,21 @@ shutdown_task (void *cls) if (NULL == h) return; + if (NULL != gh) + { + GNUNET_STATISTICS_get_cancel (gh); + gh = NULL; + } if ( (GNUNET_YES == watch) && (NULL != subsystem) && (NULL != name) ) GNUNET_assert (GNUNET_OK == - GNUNET_STATISTICS_watch_cancel (h, subsystem, name, &printer, h)); - GNUNET_STATISTICS_destroy (h, GNUNET_NO); + GNUNET_STATISTICS_watch_cancel (h, + subsystem, + name, + &printer, h)); + GNUNET_STATISTICS_destroy (h, + GNUNET_NO); h = NULL; } @@ -207,12 +221,17 @@ main_task (void *cls) ret = 1; return; } - GNUNET_STATISTICS_set (h, name, (uint64_t) set_val, persistent); - GNUNET_STATISTICS_destroy (h, GNUNET_YES); + GNUNET_STATISTICS_set (h, + name, + (uint64_t) set_val, + persistent); + GNUNET_STATISTICS_destroy (h, + GNUNET_YES); h = NULL; return; } - if (NULL == (h = GNUNET_STATISTICS_create ("gnunet-statistics", cfg))) + if (NULL == (h = GNUNET_STATISTICS_create ("gnunet-statistics", + cfg))) { ret = 1; return; @@ -220,10 +239,12 @@ main_task (void *cls) if (GNUNET_NO == watch) { if (NULL == - GNUNET_STATISTICS_get (h, subsystem, name, GET_TIMEOUT, - &cleanup, - &printer, h)) - cleanup (h, GNUNET_SYSERR); + (gh = GNUNET_STATISTICS_get (h, + subsystem, + name, + &cleanup, + &printer, h)) ) + cleanup (h, GNUNET_SYSERR); } else { @@ -235,15 +256,21 @@ main_task (void *cls) ret = 1; return; } - if (GNUNET_OK != GNUNET_STATISTICS_watch (h, subsystem, name, - &printer, h)) + if (GNUNET_OK != + GNUNET_STATISTICS_watch (h, + subsystem, + name, + &printer, h)) { - fprintf (stderr, _("Failed to initialize watch routine\n")); - GNUNET_SCHEDULER_add_now (&shutdown_task, h); + fprintf (stderr, + _("Failed to initialize watch routine\n")); + GNUNET_SCHEDULER_add_now (&shutdown_task, + h); return; } } - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, h); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + h); } @@ -291,11 +318,17 @@ resolver_test_task (void *cls, /* Manipulate configuration */ GNUNET_CONFIGURATION_set_value_string (cfg, - "statistics", "UNIXPATH", ""); + "statistics", + "UNIXPATH", + ""); GNUNET_CONFIGURATION_set_value_string (cfg, - "statistics", "HOSTNAME", remote_host); + "statistics", + "HOSTNAME", + remote_host); GNUNET_CONFIGURATION_set_value_number (cfg, - "statistics", "PORT", remote_port); + "statistics", + "PORT", + remote_port); GNUNET_SCHEDULER_add_now (&main_task, cfg); } @@ -309,7 +342,9 @@ resolver_test_task (void *cls, * @param cfg configuration */ static void -run (void *cls, char *const *args, const char *cfgfile, +run (void *cls, + char *const *args, + const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *cfg) { set_value = GNUNET_NO; @@ -324,7 +359,9 @@ run (void *cls, char *const *args, const char *cfgfile, set_value = GNUNET_YES; } if (NULL != remote_host) - GNUNET_CLIENT_service_test ("resolver", cfg, GNUNET_TIME_UNIT_SECONDS, + GNUNET_CLIENT_service_test ("resolver", + cfg, + GNUNET_TIME_UNIT_SECONDS, &resolver_test_task, (void *) cfg); else GNUNET_SCHEDULER_add_now (&main_task, (void *) cfg); @@ -367,7 +404,8 @@ main (int argc, char *const *argv) }; remote_port = 0; remote_host = NULL; - if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) + if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) return 2; ret = (GNUNET_OK == diff --git a/src/statistics/statistics_api.c b/src/statistics/statistics_api.c index 32b973eecc..37aa990176 100644 --- a/src/statistics/statistics_api.c +++ b/src/statistics/statistics_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2009, 2010, 2011 GNUnet e.V. + Copyright (C) 2009, 2010, 2011, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -88,7 +88,7 @@ struct GNUNET_STATISTICS_WatchEntry GNUNET_STATISTICS_Iterator proc; /** - * Closure for proc + * Closure for @e proc */ void *proc_cls; @@ -137,7 +137,7 @@ struct GNUNET_STATISTICS_GetHandle GNUNET_STATISTICS_Iterator proc; /** - * Closure for proc and cont. + * Closure for @e proc and @e cont. */ void *cls; @@ -146,11 +146,6 @@ struct GNUNET_STATISTICS_GetHandle */ struct GNUNET_TIME_Absolute timeout; - /** - * Task run on timeout. - */ - struct GNUNET_SCHEDULER_Task * timeout_task; - /** * Associated value. */ @@ -167,7 +162,7 @@ struct GNUNET_STATISTICS_GetHandle int aborted; /** - * Is this a GET, SET, UPDATE or WATCH? + * Is this a #ACTION_GET, #ACTION_SET, #ACTION_UPDATE or #ACTION_WATCH? */ enum ActionType type; @@ -195,14 +190,9 @@ struct GNUNET_STATISTICS_Handle const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Message queue to the service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Currently pending transmission request. - */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_MQ_Handle *mq; /** * Head of the linked list of pending actions (first action @@ -230,7 +220,7 @@ struct GNUNET_STATISTICS_Handle /** * Task doing exponential back-off trying to reconnect. */ - struct GNUNET_SCHEDULER_Task * backoff_task; + struct GNUNET_SCHEDULER_Task *backoff_task; /** * Time for next connect retry. @@ -248,7 +238,7 @@ struct GNUNET_STATISTICS_Handle uint64_t peak_rss; /** - * Size of the 'watches' array. + * Size of the @e watches array. */ unsigned int watches_size; @@ -320,6 +310,15 @@ static void schedule_action (struct GNUNET_STATISTICS_Handle *h); +/** + * Reconnect at a later time, respecting back-off. + * + * @param h statistics handle + */ +static void +reconnect_later (struct GNUNET_STATISTICS_Handle *h); + + /** * Transmit request to service that we want to watch * the development of a particular value. @@ -353,7 +352,8 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h, ai->type = ACTION_WATCH; ai->proc = watch->proc; ai->cls = watch->proc_cls; - GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail, + GNUNET_CONTAINER_DLL_insert_tail (h->action_head, + h->action_tail, ai); schedule_action (h); } @@ -367,11 +367,6 @@ schedule_watch_request (struct GNUNET_STATISTICS_Handle *h, static void free_action_item (struct GNUNET_STATISTICS_GetHandle *gh) { - if (NULL != gh->timeout_task) - { - GNUNET_SCHEDULER_cancel (gh->timeout_task); - gh->timeout_task = NULL; - } GNUNET_free_non_null (gh->subsystem); GNUNET_free_non_null (gh->name); GNUNET_free (gh); @@ -388,11 +383,6 @@ do_disconnect (struct GNUNET_STATISTICS_Handle *h) { struct GNUNET_STATISTICS_GetHandle *c; - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } h->receiving = GNUNET_NO; if (NULL != (c = h->current)) { @@ -400,393 +390,400 @@ do_disconnect (struct GNUNET_STATISTICS_Handle *h) if ( (NULL != c->cont) && (GNUNET_YES != c->aborted) ) { - c->cont (c->cls, GNUNET_SYSERR); + c->cont (c->cls, + GNUNET_SYSERR); c->cont = NULL; } free_action_item (c); } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } } /** - * Try to (re)connect to the statistics service. + * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message. * - * @param h statistics handle to reconnect - * @return #GNUNET_YES on success, #GNUNET_NO on failure. + * @param cls statistics handle + * @param smsg message received from the service, never NULL + * @return #GNUNET_OK if the message was well-formed */ static int -try_connect (struct GNUNET_STATISTICS_Handle *h) +check_statistics_value (void *cls, + const struct GNUNET_STATISTICS_ReplyMessage *smsg) { - struct GNUNET_STATISTICS_GetHandle *gh; - struct GNUNET_STATISTICS_GetHandle *gn; - unsigned int i; + const char *service; + const char *name; + uint16_t size; - if (NULL != h->backoff_task) - return GNUNET_NO; - if (NULL != h->client) - return GNUNET_YES; - h->client = GNUNET_CLIENT_connect ("statistics", h->cfg); - if (NULL != h->client) + size = ntohs (smsg->header.size); + size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage); + if (size != + GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], + size, + 2, + &service, + &name)) { - gn = h->action_head; - while (NULL != (gh = gn)) - { - gn = gh->next; - if (gh->type == ACTION_WATCH) - { - GNUNET_CONTAINER_DLL_remove (h->action_head, - h->action_tail, - gh); - free_action_item (gh); - } - } - for (i = 0; i < h->watches_size; i++) - { - if (NULL != h->watches[i]) - schedule_watch_request (h, h->watches[i]); - } - return GNUNET_YES; + GNUNET_break (0); + return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Failed to connect to statistics service!\n"); - return GNUNET_NO; + return GNUNET_OK; } /** - * We've waited long enough, reconnect now. + * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message. * - * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect + * @param cls statistics handle + * @param msg message received from the service, never NULL + * @return #GNUNET_OK if the message was well-formed */ static void -reconnect_task (void *cls) +handle_statistics_value (void *cls, + const struct GNUNET_STATISTICS_ReplyMessage *smsg) { struct GNUNET_STATISTICS_Handle *h = cls; + const char *service; + const char *name; + uint16_t size; - h->backoff_task = NULL; - schedule_action (h); + if (h->current->aborted) + return; /* iteration aborted, don't bother */ + + size = ntohs (smsg->header.size); + size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage); + GNUNET_assert (size == + GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], + size, + 2, + &service, + &name)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received valid statistic on `%s:%s': %llu\n", + service, name, + GNUNET_ntohll (smsg->value)); + if (GNUNET_OK != + h->current->proc (h->current->cls, + service, + name, + GNUNET_ntohll (smsg->value), + 0 != + (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Processing of remaining statistics aborted by client.\n"); + h->current->aborted = GNUNET_YES; + } } /** - * Task used by 'reconnect_later' to shutdown the handle + * We have received a watch value from the service. Process it. * - * @param cls the statistics handle + * @param cls statistics handle + * @param msg the watch value message */ static void -do_destroy (void *cls) +handle_statistics_watch_value (void *cls, + const struct GNUNET_STATISTICS_WatchValueMessage *wvm) { struct GNUNET_STATISTICS_Handle *h = cls; + struct GNUNET_STATISTICS_WatchEntry *w; + uint32_t wid; - GNUNET_STATISTICS_destroy (h, GNUNET_NO); + GNUNET_break (0 == ntohl (wvm->reserved)); + wid = ntohl (wvm->wid); + if (wid >= h->watches_size) + { + do_disconnect (h); + reconnect_later (h); + return; + } + w = h->watches[wid]; + if (NULL == w) + return; + (void) w->proc (w->proc_cls, + w->subsystem, + w->name, + GNUNET_ntohll (wvm->value), + 0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT)); } /** - * Reconnect at a later time, respecting back-off. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param h statistics handle + * @param cls closure with the `struct GNUNET_STATISTICS_Handle *` + * @param error error code */ static void -reconnect_later (struct GNUNET_STATISTICS_Handle *h) +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { - int loss; - struct GNUNET_STATISTICS_GetHandle *gh; + struct GNUNET_STATISTICS_Handle *h = cls; - GNUNET_assert (NULL == h->backoff_task); - if (GNUNET_YES == h->do_destroy) + if (GNUNET_NO != h->do_destroy) { - /* So we are shutting down and the service is not reachable. - * Chances are that it's down for good and we are not going to connect to - * it anymore. - * Give up and don't sync the rest of the data. - */ - loss = GNUNET_NO; - for (gh = h->action_head; NULL != gh; gh = gh->next) - if ( (gh->make_persistent) && (ACTION_SET == gh->type) ) - loss = GNUNET_YES; - if (GNUNET_YES == loss) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Could not save some persistent statistics\n")); h->do_destroy = GNUNET_NO; - GNUNET_SCHEDULER_add_now (&do_destroy, h); + GNUNET_STATISTICS_destroy (h, + GNUNET_NO); return; } - h->backoff_task = - GNUNET_SCHEDULER_add_delayed (h->backoff, &reconnect_task, h); - h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff); + do_disconnect (h); + reconnect_later (h); } /** - * Process a #GNUNET_MESSAGE_TYPE_STATISTICS_VALUE message. + * Task used to destroy the statistics handle. * - * @param h statistics handle - * @param msg message received from the service, never NULL - * @return #GNUNET_OK if the message was well-formed + * @param cls the `struct GNUNET_STATISTICS_Handle` */ -static int -process_statistics_value_message (struct GNUNET_STATISTICS_Handle *h, - const struct GNUNET_MessageHeader *msg) +static void +destroy_task (void *cls) { - char *service; - char *name; - const struct GNUNET_STATISTICS_ReplyMessage *smsg; - uint16_t size; + struct GNUNET_STATISTICS_Handle *h = cls; - if (h->current->aborted) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Iteration was aborted, ignoring VALUE\n"); - return GNUNET_OK; /* don't bother */ - } - size = ntohs (msg->size); - if (size < sizeof (struct GNUNET_STATISTICS_ReplyMessage)) + GNUNET_STATISTICS_destroy (h, GNUNET_NO); +} + + +/** + * Handle a #GNUNET_MESSAGE_TYPE_TEST (sic) message. We receive this + * message at the end of the shutdown when the service confirms that + * all data has been written to disk. + * + * @param cls our `struct GNUNET_STATISTICS_Handle *` + * @param msg the message + */ +static void +handle_test (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_STATISTICS_Handle *h = cls; + + if (GNUNET_SYSERR != h->do_destroy) { + /* not in shutdown, why do we get 'TEST'? */ GNUNET_break (0); - return GNUNET_SYSERR; + do_disconnect (h); + reconnect_later (h); + return; } - smsg = (const struct GNUNET_STATISTICS_ReplyMessage *) msg; - size -= sizeof (struct GNUNET_STATISTICS_ReplyMessage); - if (size != - GNUNET_STRINGS_buffer_tokenize ((const char *) &smsg[1], size, 2, - &service, &name)) + h->do_destroy = GNUNET_NO; + GNUNET_SCHEDULER_add_now (&destroy_task, + h); +} + + +/** + * Handle a #GNUNET_MESSAGE_TYPE_STATISTICS_END message. We receive + * this message in response to a query to indicate that there are no + * further matching results. + * + * @param cls our `struct GNUNET_STATISTICS_Handle *` + * @param msg the message + */ +static void +handle_statistics_end (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_STATISTICS_Handle *h = cls; + struct GNUNET_STATISTICS_GetHandle *c; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received end of statistics marker\n"); + if (NULL == (c = h->current)) { GNUNET_break (0); - return GNUNET_SYSERR; + do_disconnect (h); + reconnect_later (h); + return; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received valid statistic on `%s:%s': %llu\n", - service, name, - GNUNET_ntohll (smsg->value)); - if (GNUNET_OK != - h->current->proc (h->current->cls, service, name, - GNUNET_ntohll (smsg->value), - 0 != - (ntohl (smsg->uid) & GNUNET_STATISTICS_PERSIST_BIT))) + h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; + h->current = NULL; + schedule_action (h); + if (NULL != c->cont) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Processing of remaining statistics aborted by client.\n"); - h->current->aborted = GNUNET_YES; + c->cont (c->cls, + GNUNET_OK); + c->cont = NULL; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "VALUE processed successfully\n"); - return GNUNET_OK; + free_action_item (c); } /** - * We have received a watch value from the service. Process it. + * Try to (re)connect to the statistics service. * - * @param h statistics handle - * @param msg the watch value message - * @return #GNUNET_OK if the message was well-formed, #GNUNET_SYSERR if not, - * #GNUNET_NO if this watch has been cancelled + * @param h statistics handle to reconnect + * @return #GNUNET_YES on success, #GNUNET_NO on failure. */ static int -process_watch_value (struct GNUNET_STATISTICS_Handle *h, - const struct GNUNET_MessageHeader *msg) +try_connect (struct GNUNET_STATISTICS_Handle *h) { - const struct GNUNET_STATISTICS_WatchValueMessage *wvm; - struct GNUNET_STATISTICS_WatchEntry *w; - uint32_t wid; + GNUNET_MQ_hd_fixed_size (test, + GNUNET_MESSAGE_TYPE_TEST, + struct GNUNET_MessageHeader); + GNUNET_MQ_hd_fixed_size (statistics_end, + GNUNET_MESSAGE_TYPE_STATISTICS_END, + struct GNUNET_MessageHeader); + GNUNET_MQ_hd_var_size (statistics_value, + GNUNET_MESSAGE_TYPE_STATISTICS_VALUE, + struct GNUNET_STATISTICS_ReplyMessage); + GNUNET_MQ_hd_fixed_size (statistics_watch_value, + GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE, + struct GNUNET_STATISTICS_WatchValueMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_test_handler (h), + make_statistics_end_handler (h), + make_statistics_value_handler (h), + make_statistics_watch_value_handler (h), + GNUNET_MQ_handler_end () + }; + struct GNUNET_STATISTICS_GetHandle *gh; + struct GNUNET_STATISTICS_GetHandle *gn; - if (sizeof (struct GNUNET_STATISTICS_WatchValueMessage) != ntohs (msg->size)) + if (NULL != h->backoff_task) + return GNUNET_NO; + if (NULL != h->mq) + return GNUNET_YES; + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "statistics", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) { - GNUNET_break (0); - return GNUNET_SYSERR; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Failed to connect to statistics service!\n"); + return GNUNET_NO; } - wvm = (const struct GNUNET_STATISTICS_WatchValueMessage *) msg; - GNUNET_break (0 == ntohl (wvm->reserved)); - wid = ntohl (wvm->wid); - if (wid >= h->watches_size) + gn = h->action_head; + while (NULL != (gh = gn)) { - GNUNET_break (0); - return GNUNET_SYSERR; + gn = gh->next; + if (gh->type == ACTION_WATCH) + { + GNUNET_CONTAINER_DLL_remove (h->action_head, + h->action_tail, + gh); + free_action_item (gh); + } } - w = h->watches[wid]; - if (NULL == w) - return GNUNET_NO; - (void) w->proc (w->proc_cls, w->subsystem, w->name, - GNUNET_ntohll (wvm->value), - 0 != (ntohl (wvm->flags) & GNUNET_STATISTICS_PERSIST_BIT)); - return GNUNET_OK; + for (unsigned int i = 0; i < h->watches_size; i++) + if (NULL != h->watches[i]) + schedule_watch_request (h, + h->watches[i]); + return GNUNET_YES; } /** - * Task used to destroy the statistics handle. + * We've waited long enough, reconnect now. * - * @param cls the `struct GNUNET_STATISTICS_Handle` + * @param cls the `struct GNUNET_STATISTICS_Handle` to reconnect */ static void -destroy_task (void *cls) +reconnect_task (void *cls) { struct GNUNET_STATISTICS_Handle *h = cls; - GNUNET_STATISTICS_destroy (h, GNUNET_NO); + h->backoff_task = NULL; + schedule_action (h); } /** - * Function called with messages from stats service. + * Task used by #reconnect_later() to shutdown the handle * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error + * @param cls the statistics handle */ static void -receive_stats (void *cls, - const struct GNUNET_MessageHeader *msg) +do_destroy (void *cls) { struct GNUNET_STATISTICS_Handle *h = cls; - struct GNUNET_STATISTICS_GetHandle *c; - int ret; - if (NULL == msg) - { - LOG (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "Error receiving statistics from service, is the service running?\n"); - do_disconnect (h); - reconnect_later (h); - return; - } - switch (ntohs (msg->type)) + GNUNET_STATISTICS_destroy (h, + GNUNET_NO); +} + + +/** + * Reconnect at a later time, respecting back-off. + * + * @param h statistics handle + */ +static void +reconnect_later (struct GNUNET_STATISTICS_Handle *h) +{ + int loss; + struct GNUNET_STATISTICS_GetHandle *gh; + + GNUNET_assert (NULL == h->backoff_task); + if (GNUNET_YES == h->do_destroy) { - case GNUNET_MESSAGE_TYPE_TEST: - if (GNUNET_SYSERR != h->do_destroy) - { - /* not in shutdown, why do we get 'TEST'? */ - GNUNET_break (0); - do_disconnect (h); - reconnect_later (h); - return; - } + /* So we are shutting down and the service is not reachable. + * Chances are that it's down for good and we are not going to connect to + * it anymore. + * Give up and don't sync the rest of the data. + */ + loss = GNUNET_NO; + for (gh = h->action_head; NULL != gh; gh = gh->next) + if ( (gh->make_persistent) && (ACTION_SET == gh->type) ) + loss = GNUNET_YES; + if (GNUNET_YES == loss) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Could not save some persistent statistics\n")); h->do_destroy = GNUNET_NO; - GNUNET_SCHEDULER_add_now (&destroy_task, h); - break; - case GNUNET_MESSAGE_TYPE_STATISTICS_END: - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received end of statistics marker\n"); - if (NULL == (c = h->current)) - { - GNUNET_break (0); - do_disconnect (h); - reconnect_later (h); - return; - } - h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; - if (h->watches_size > 0) - { - GNUNET_CLIENT_receive (h->client, &receive_stats, h, - GNUNET_TIME_UNIT_FOREVER_REL); - } - else - { - h->receiving = GNUNET_NO; - } - h->current = NULL; - schedule_action (h); - if (NULL != c->cont) - { - c->cont (c->cls, GNUNET_OK); - c->cont = NULL; - } - free_action_item (c); - return; - case GNUNET_MESSAGE_TYPE_STATISTICS_VALUE: - if (GNUNET_OK != process_statistics_value_message (h, msg)) - { - do_disconnect (h); - reconnect_later (h); - return; - } - /* finally, look for more! */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Processing VALUE done, now reading more\n"); - GNUNET_CLIENT_receive (h->client, &receive_stats, h, - GNUNET_TIME_absolute_get_remaining (h-> - current->timeout)); - h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; - return; - case GNUNET_MESSAGE_TYPE_STATISTICS_WATCH_VALUE: - if (GNUNET_OK != - (ret = process_watch_value (h, msg))) - { - do_disconnect (h); - if (GNUNET_NO == ret) - h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; - reconnect_later (h); - return; - } - h->backoff = GNUNET_TIME_UNIT_MILLISECONDS; - GNUNET_assert (h->watches_size > 0); - GNUNET_CLIENT_receive (h->client, &receive_stats, h, - GNUNET_TIME_UNIT_FOREVER_REL); - return; - default: - GNUNET_break (0); - do_disconnect (h); - reconnect_later (h); + GNUNET_SCHEDULER_add_now (&do_destroy, + h); return; } + h->backoff_task + = GNUNET_SCHEDULER_add_delayed (h->backoff, + &reconnect_task, + h); + h->backoff = GNUNET_TIME_STD_BACKOFF (h->backoff); } + /** * Transmit a GET request (and if successful, start to receive * the response). * * @param handle statistics handle - * @param size how many bytes can we write to @a buf - * @param buf where to write requests to the service - * @return number of bytes written to @a buf */ -static size_t -transmit_get (struct GNUNET_STATISTICS_Handle *handle, - size_t size, - void *buf) +static void +transmit_get (struct GNUNET_STATISTICS_Handle *handle) { struct GNUNET_STATISTICS_GetHandle *c; struct GNUNET_MessageHeader *hdr; + struct GNUNET_MQ_Envelope *env; size_t slen1; size_t slen2; - uint16_t msize; GNUNET_assert (NULL != (c = handle->current)); - if (NULL == buf) - { - /* timeout / error */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of request for statistics failed!\n"); - do_disconnect (handle); - reconnect_later (handle); - return 0; - } slen1 = strlen (c->subsystem) + 1; slen2 = strlen (c->name) + 1; - msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); - GNUNET_assert (msize <= size); - hdr = (struct GNUNET_MessageHeader *) buf; - hdr->size = htons (msize); - hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_GET); + env = GNUNET_MQ_msg_extra (hdr, + slen1 + slen2, + GNUNET_MESSAGE_TYPE_STATISTICS_GET); GNUNET_assert (slen1 + slen2 == - GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2, + GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], + slen1 + slen2, + 2, c->subsystem, c->name)); - if (GNUNET_YES != handle->receiving) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of GET done, now reading response\n"); - handle->receiving = GNUNET_YES; - GNUNET_CLIENT_receive (handle->client, &receive_stats, handle, - GNUNET_TIME_absolute_get_remaining (c->timeout)); - } - return msize; + GNUNET_MQ_send (handle->mq, + env); } @@ -795,53 +792,34 @@ transmit_get (struct GNUNET_STATISTICS_Handle *handle, * the response). * * @param handle statistics handle - * @param size how many bytes can we write to @a buf - * @param buf where to write requests to the service - * @return number of bytes written to @a buf */ -static size_t -transmit_watch (struct GNUNET_STATISTICS_Handle *handle, - size_t size, - void *buf) +static void +transmit_watch (struct GNUNET_STATISTICS_Handle *handle) { struct GNUNET_MessageHeader *hdr; + struct GNUNET_MQ_Envelope *env; size_t slen1; size_t slen2; - uint16_t msize; - if (NULL == buf) - { - /* timeout / error */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of request for statistics failed!\n"); - do_disconnect (handle); - reconnect_later (handle); - return 0; - } LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmitting watch request for `%s'\n", handle->current->name); slen1 = strlen (handle->current->subsystem) + 1; slen2 = strlen (handle->current->name) + 1; - msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); - GNUNET_assert (msize <= size); - hdr = (struct GNUNET_MessageHeader *) buf; - hdr->size = htons (msize); - hdr->type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_WATCH); + env = GNUNET_MQ_msg_extra (hdr, + slen1 + slen2, + GNUNET_MESSAGE_TYPE_STATISTICS_WATCH); GNUNET_assert (slen1 + slen2 == - GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], slen1 + slen2, 2, + GNUNET_STRINGS_buffer_fill ((char *) &hdr[1], + slen1 + slen2, + 2, handle->current->subsystem, handle->current->name)); - if (GNUNET_YES != handle->receiving) - { - handle->receiving = GNUNET_YES; - GNUNET_CLIENT_receive (handle->client, &receive_stats, handle, - GNUNET_TIME_UNIT_FOREVER_REL); - } + GNUNET_MQ_send (handle->mq, + env); GNUNET_assert (NULL == handle->current->cont); free_action_item (handle->current); handle->current = NULL; - return msize; } @@ -849,39 +827,20 @@ transmit_watch (struct GNUNET_STATISTICS_Handle *handle, * Transmit a SET/UPDATE request. * * @param handle statistics handle - * @param size how many bytes can we write to @a buf - * @param buf where to write requests to the service - * @return number of bytes written to @a buf */ -static size_t -transmit_set (struct GNUNET_STATISTICS_Handle *handle, - size_t size, - void *buf) +static void +transmit_set (struct GNUNET_STATISTICS_Handle *handle) { struct GNUNET_STATISTICS_SetMessage *r; + struct GNUNET_MQ_Envelope *env; size_t slen; size_t nlen; - size_t nsize; - if (NULL == buf) - { - do_disconnect (handle); - reconnect_later (handle); - return 0; - } slen = strlen (handle->current->subsystem) + 1; nlen = strlen (handle->current->name) + 1; - nsize = sizeof (struct GNUNET_STATISTICS_SetMessage) + slen + nlen; - if (size < nsize) - { - GNUNET_break (0); - do_disconnect (handle); - reconnect_later (handle); - return 0; - } - r = buf; - r->header.size = htons (nsize); - r->header.type = htons (GNUNET_MESSAGE_TYPE_STATISTICS_SET); + env = GNUNET_MQ_msg_extra (r, + slen + nlen, + GNUNET_MESSAGE_TYPE_STATISTICS_SET); r->flags = 0; r->value = GNUNET_htonll (handle->current->value); if (handle->current->make_persistent) @@ -889,52 +848,17 @@ transmit_set (struct GNUNET_STATISTICS_Handle *handle, if (handle->current->type == ACTION_UPDATE) r->flags |= htonl (GNUNET_STATISTICS_SETFLAG_RELATIVE); GNUNET_assert (slen + nlen == - GNUNET_STRINGS_buffer_fill ((char *) &r[1], slen + nlen, 2, + GNUNET_STRINGS_buffer_fill ((char *) &r[1], + slen + nlen, + 2, handle->current->subsystem, handle->current->name)); GNUNET_assert (NULL == handle->current->cont); free_action_item (handle->current); handle->current = NULL; update_memory_statistics (handle); - return nsize; -} - - -/** - * Function called when we are ready to transmit a request to the service. - * - * @param cls the `struct GNUNET_STATISTICS_Handle` - * @param size how many bytes can we write to @a buf - * @param buf where to write requests to the service - * @return number of bytes written to @a buf - */ -static size_t -transmit_action (void *cls, size_t size, void *buf) -{ - struct GNUNET_STATISTICS_Handle *h = cls; - size_t ret; - - h->th = NULL; - ret = 0; - if (NULL != h->current) - switch (h->current->type) - { - case ACTION_GET: - ret = transmit_get (h, size, buf); - break; - case ACTION_SET: - case ACTION_UPDATE: - ret = transmit_set (h, size, buf); - break; - case ACTION_WATCH: - ret = transmit_watch (h, size, buf); - break; - default: - GNUNET_assert (0); - break; - } - schedule_action (h); - return ret; + GNUNET_MQ_send (handle->mq, + env); } @@ -952,10 +876,10 @@ GNUNET_STATISTICS_create (const char *subsystem, struct GNUNET_STATISTICS_Handle *ret; if (GNUNET_YES == - GNUNET_CONFIGURATION_get_value_yesno (cfg, "statistics", "DISABLE")) + GNUNET_CONFIGURATION_get_value_yesno (cfg, + "statistics", + "DISABLE")) return NULL; - GNUNET_assert (NULL != subsystem); - GNUNET_assert (NULL != cfg); ret = GNUNET_new (struct GNUNET_STATISTICS_Handle); ret->cfg = cfg; ret->subsystem = GNUNET_strdup (subsystem); @@ -978,8 +902,6 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, { struct GNUNET_STATISTICS_GetHandle *pos; struct GNUNET_STATISTICS_GetHandle *next; - struct GNUNET_TIME_Relative timeout; - int i; if (NULL == h) return; @@ -989,26 +911,19 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, GNUNET_SCHEDULER_cancel (h->backoff_task); h->backoff_task = NULL; } - if (sync_first) + if ( (sync_first) && + (GNUNET_YES == try_connect (h)) ) { - if (NULL != h->current) - { - if (ACTION_GET == h->current->type) - { - if (NULL != h->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->th); - h->th = NULL; - } - free_action_item (h->current); - h->current = NULL; - } - } + if ( (NULL != h->current) && + (ACTION_GET == h->current->type) ) + h->current->aborted = GNUNET_YES; next = h->action_head; while (NULL != (pos = next)) { next = pos->next; - if (ACTION_GET == pos->type) + if ( (ACTION_GET == pos->type) || + (ACTION_WATCH == pos->type) || + (GNUNET_NO == pos->make_persistent) ) { GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, @@ -1016,25 +931,11 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, free_action_item (pos); } } - if ( (NULL == h->current) && - (NULL != (h->current = h->action_head)) ) - GNUNET_CONTAINER_DLL_remove (h->action_head, - h->action_tail, - h->current); h->do_destroy = GNUNET_YES; - if ((NULL != h->current) && (NULL == h->th) && - (NULL != h->client)) - { - timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout); - h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize, - timeout, GNUNET_YES, - &transmit_action, h); - GNUNET_assert (NULL != h->th); - } - if (NULL != h->th) - return; /* do not finish destruction just yet */ + schedule_action (h); + return; /* do not finish destruction just yet */ } + /* do clean up all */ while (NULL != (pos = h->action_head)) { GNUNET_CONTAINER_DLL_remove (h->action_head, @@ -1043,7 +944,7 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, free_action_item (pos); } do_disconnect (h); - for (i = 0; i < h->watches_size; i++) + for (unsigned int i = 0; i < h->watches_size; i++) { if (NULL == h->watches[i]) continue; @@ -1051,52 +952,14 @@ GNUNET_STATISTICS_destroy (struct GNUNET_STATISTICS_Handle *h, GNUNET_free (h->watches[i]->name); GNUNET_free (h->watches[i]); } - GNUNET_array_grow (h->watches, h->watches_size, 0); + GNUNET_array_grow (h->watches, + h->watches_size, + 0); GNUNET_free (h->subsystem); GNUNET_free (h); } -/** - * Function called to transmit TEST message to service to - * confirm that the service has received all of our 'SET' - * messages (during statistics disconnect/shutdown). - * - * @param cls the `struct GNUNET_STATISTICS_Handle` - * @param size how many bytes can we write to @a buf - * @param buf where to write requests to the service - * @return number of bytes written to @a buf - */ -static size_t -transmit_test_on_shutdown (void *cls, - size_t size, - void *buf) -{ - struct GNUNET_STATISTICS_Handle *h = cls; - struct GNUNET_MessageHeader hdr; - - h->th = NULL; - if (NULL == buf) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - _("Failed to receive acknowledgement from statistics service, some statistics might have been lost!\n")); - h->do_destroy = GNUNET_NO; - GNUNET_SCHEDULER_add_now (&destroy_task, h); - return 0; - } - hdr.type = htons (GNUNET_MESSAGE_TYPE_TEST); - hdr.size = htons (sizeof (struct GNUNET_MessageHeader)); - memcpy (buf, &hdr, sizeof (hdr)); - if (GNUNET_YES != h->receiving) - { - h->receiving = GNUNET_YES; - GNUNET_CLIENT_receive (h->client, &receive_stats, h, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return sizeof (struct GNUNET_MessageHeader); -} - - /** * Schedule the next action to be performed. * @@ -1105,76 +968,61 @@ transmit_test_on_shutdown (void *cls, static void schedule_action (struct GNUNET_STATISTICS_Handle *h) { - struct GNUNET_TIME_Relative timeout; - - if ( (NULL != h->th) || - (NULL != h->backoff_task) ) + if (NULL != h->backoff_task) return; /* action already pending */ if (GNUNET_YES != try_connect (h)) { reconnect_later (h); return; } - if (NULL != h->current) - return; /* action already pending */ /* schedule next action */ - h->current = h->action_head; - if (NULL == h->current) + while (NULL == h->current) { - if (GNUNET_YES == h->do_destroy) + h->current = h->action_head; + if (NULL == h->current) { + struct GNUNET_MessageHeader *hdr; + struct GNUNET_MQ_Envelope *env; + + if (GNUNET_YES != h->do_destroy) + return; /* nothing to do */ + /* let service know that we're done */ h->do_destroy = GNUNET_SYSERR; /* in 'TEST' mode */ - h->th = GNUNET_CLIENT_notify_transmit_ready (h->client, - sizeof (struct GNUNET_MessageHeader), - SET_TRANSMIT_TIMEOUT, - GNUNET_NO, - &transmit_test_on_shutdown, h); + env = GNUNET_MQ_msg (hdr, + GNUNET_MESSAGE_TYPE_TEST); + GNUNET_MQ_send (h->mq, + env); + return; + } + GNUNET_CONTAINER_DLL_remove (h->action_head, + h->action_tail, + h->current); + switch (h->current->type) + { + case ACTION_GET: + transmit_get (h); + break; + case ACTION_SET: + case ACTION_UPDATE: + transmit_set (h); + break; + case ACTION_WATCH: + transmit_watch (h); + break; + default: + GNUNET_assert (0); + break; } - return; - } - GNUNET_CONTAINER_DLL_remove (h->action_head, h->action_tail, h->current); - timeout = GNUNET_TIME_absolute_get_remaining (h->current->timeout); - if (NULL == - (h->th = - GNUNET_CLIENT_notify_transmit_ready (h->client, h->current->msize, - timeout, GNUNET_YES, - &transmit_action, h))) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Failed to transmit request to statistics service.\n"); - do_disconnect (h); - reconnect_later (h); } } -/** - * We have run into a timeout on a #GNUNET_STATISTICS_get() operation, - * call the continuation. - * - * @param cls the `struct GNUNET_STATISTICS_GetHandle` - */ -static void -run_get_timeout (void *cls) -{ - struct GNUNET_STATISTICS_GetHandle *gh = cls; - GNUNET_STATISTICS_Callback cont = gh->cont; - void *cont_cls = gh->cls; - - gh->timeout_task = NULL; - GNUNET_STATISTICS_get_cancel (gh); - cont (cont_cls, GNUNET_SYSERR); -} - - /** * Get statistic from the peer. * * @param handle identification of the statistics service * @param subsystem limit to the specified subsystem, NULL for our subsystem * @param name name of the statistic value, NULL for all values - * @param timeout after how long should we give up (and call - * cont with an error code)? * @param cont continuation to call when done (can be NULL) * This callback CANNOT destroy the statistics handle in the same call. * @param proc function to call on each value @@ -1183,10 +1031,11 @@ run_get_timeout (void *cls) */ struct GNUNET_STATISTICS_GetHandle * GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle, - const char *subsystem, const char *name, - struct GNUNET_TIME_Relative timeout, + const char *subsystem, + const char *name, GNUNET_STATISTICS_Callback cont, - GNUNET_STATISTICS_Iterator proc, void *cls) + GNUNET_STATISTICS_Iterator proc, + void *cls) { size_t slen1; size_t slen2; @@ -1211,12 +1060,8 @@ GNUNET_STATISTICS_get (struct GNUNET_STATISTICS_Handle *handle, ai->cont = cont; ai->proc = proc; ai->cls = cls; - ai->timeout = GNUNET_TIME_relative_to_absolute (timeout); ai->type = ACTION_GET; ai->msize = slen1 + slen2 + sizeof (struct GNUNET_MessageHeader); - ai->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, - &run_get_timeout, - ai); GNUNET_CONTAINER_DLL_insert_tail (handle->action_head, handle->action_tail, ai); @@ -1236,23 +1081,18 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh) { if (NULL == gh) return; - if (NULL != gh->timeout_task) - { - GNUNET_SCHEDULER_cancel (gh->timeout_task); - gh->timeout_task = NULL; - } gh->cont = NULL; if (gh->sh->current == gh) { gh->aborted = GNUNET_YES; + return; } - else - { - GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, gh->sh->action_tail, gh); - GNUNET_free (gh->name); - GNUNET_free (gh->subsystem); - GNUNET_free (gh); - } + GNUNET_CONTAINER_DLL_remove (gh->sh->action_head, + gh->sh->action_tail, + gh); + GNUNET_free (gh->name); + GNUNET_free (gh->subsystem); + GNUNET_free (gh); } @@ -1268,8 +1108,10 @@ GNUNET_STATISTICS_get_cancel (struct GNUNET_STATISTICS_GetHandle *gh) */ int GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle, - const char *subsystem, const char *name, - GNUNET_STATISTICS_Iterator proc, void *proc_cls) + const char *subsystem, + const char *name, + GNUNET_STATISTICS_Iterator proc, + void *proc_cls) { struct GNUNET_STATISTICS_WatchEntry *w; @@ -1280,8 +1122,11 @@ GNUNET_STATISTICS_watch (struct GNUNET_STATISTICS_Handle *handle, w->name = GNUNET_strdup (name); w->proc = proc; w->proc_cls = proc_cls; - GNUNET_array_append (handle->watches, handle->watches_size, w); - schedule_watch_request (handle, w); + GNUNET_array_append (handle->watches, + handle->watches_size, + w); + schedule_watch_request (handle, + w); return GNUNET_OK; } @@ -1304,11 +1149,10 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle, void *proc_cls) { struct GNUNET_STATISTICS_WatchEntry *w; - unsigned int i; if (NULL == handle) return GNUNET_SYSERR; - for (i=0;iwatches_size;i++) + for (unsigned int i=0;iwatches_size;i++) { w = handle->watches[i]; if (NULL == w) @@ -1329,7 +1173,6 @@ GNUNET_STATISTICS_watch_cancel (struct GNUNET_STATISTICS_Handle *handle, } - /** * Queue a request to change a statistic. * @@ -1421,7 +1264,8 @@ add_setter_action (struct GNUNET_STATISTICS_Handle *h, ai->msize = nsize; ai->value = value; ai->type = type; - GNUNET_CONTAINER_DLL_insert_tail (h->action_head, h->action_tail, + GNUNET_CONTAINER_DLL_insert_tail (h->action_head, + h->action_tail, ai); schedule_action (h); } @@ -1445,7 +1289,11 @@ GNUNET_STATISTICS_set (struct GNUNET_STATISTICS_Handle *handle, if (NULL == handle) return; GNUNET_assert (GNUNET_NO == handle->do_destroy); - add_setter_action (handle, name, make_persistent, value, ACTION_SET); + add_setter_action (handle, + name, + make_persistent, + value, + ACTION_SET); } diff --git a/src/statistics/test_statistics_api.c b/src/statistics/test_statistics_api.c index fedc56c3bb..0c7bb3488d 100644 --- a/src/statistics/test_statistics_api.c +++ b/src/statistics/test_statistics_api.c @@ -31,11 +31,17 @@ static struct GNUNET_STATISTICS_Handle *h; static int -check_1 (void *cls, const char *subsystem, const char *name, uint64_t value, +check_1 (void *cls, + const char *subsystem, + const char *name, + uint64_t value, int is_persistent) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received value %llu for `%s:%s\n", - (unsigned long long) value, subsystem, name); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received value %llu for `%s:%s\n", + (unsigned long long) value, + subsystem, + name); GNUNET_assert (0 == strcmp (name, "test-1")); GNUNET_assert (0 == strcmp (subsystem, "test-statistics-api")); GNUNET_assert (value == 1); @@ -45,11 +51,17 @@ check_1 (void *cls, const char *subsystem, const char *name, uint64_t value, static int -check_2 (void *cls, const char *subsystem, const char *name, uint64_t value, +check_2 (void *cls, + const char *subsystem, + const char *name, + uint64_t value, int is_persistent) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received value %llu for `%s:%s\n", - (unsigned long long) value, subsystem, name); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received value %llu for `%s:%s\n", + (unsigned long long) value, + subsystem, + name); GNUNET_assert (0 == strcmp (name, "test-2")); GNUNET_assert (0 == strcmp (subsystem, "test-statistics-api")); GNUNET_assert (value == 2); @@ -59,11 +71,17 @@ check_2 (void *cls, const char *subsystem, const char *name, uint64_t value, static int -check_3 (void *cls, const char *subsystem, const char *name, uint64_t value, +check_3 (void *cls, + const char *subsystem, + const char *name, + uint64_t value, int is_persistent) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received value %llu for `%s:%s\n", - (unsigned long long) value, subsystem, name); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received value %llu for `%s:%s\n", + (unsigned long long) value, + subsystem, + name); GNUNET_assert (0 == strcmp (name, "test-3")); GNUNET_assert (0 == strcmp (subsystem, "test-statistics-api")); GNUNET_assert (value == 3); @@ -73,7 +91,8 @@ check_3 (void *cls, const char *subsystem, const char *name, uint64_t value, static void -next_fin (void *cls, int success) +next_fin (void *cls, + int success) { int *ok = cls; @@ -87,16 +106,19 @@ static void next (void *cls, int success) { GNUNET_assert (success == GNUNET_OK); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Issuing GET request\n"); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Issuing GET request\n"); GNUNET_break (NULL != GNUNET_STATISTICS_get (h, NULL, "test-2", - GNUNET_TIME_UNIT_SECONDS, &next_fin, + &next_fin, &check_2, cls)); } static void -run (void *cls, char *const *args, const char *cfgfile, +run (void *cls, + char *const *args, + const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *cfg) { h = GNUNET_STATISTICS_create ("test-statistics-api", cfg); @@ -107,19 +129,23 @@ run (void *cls, char *const *args, const char *cfgfile, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Issuing GET request\n"); GNUNET_break (NULL != GNUNET_STATISTICS_get (h, NULL, "test-1", - GNUNET_TIME_UNIT_SECONDS, &next, + &next, &check_1, cls)); } static void -run_more (void *cls, char *const *args, const char *cfgfile, +run_more (void *cls, + char *const *args, + const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *cfg) { - h = GNUNET_STATISTICS_create ("test-statistics-api", cfg); + h = GNUNET_STATISTICS_create ("test-statistics-api", + cfg); GNUNET_break (NULL != - GNUNET_STATISTICS_get (h, NULL, "test-3", - GNUNET_TIME_UNIT_SECONDS, &next_fin, + GNUNET_STATISTICS_get (h, NULL, + "test-3", + &next_fin, &check_3, cls)); } @@ -128,7 +154,6 @@ int main (int argc, char *argv_ign[]) { int ok = 1; - char *const argv[] = { "test-statistics-api", "-c", "test_statistics_api_data.conf", @@ -146,15 +171,19 @@ main (int argc, char *argv_ign[]) NULL); binary = GNUNET_OS_get_libexec_binary_path ("gnunet-service-statistics"); proc = - GNUNET_OS_start_process (GNUNET_YES, GNUNET_OS_INHERIT_STD_OUT_AND_ERR, + GNUNET_OS_start_process (GNUNET_YES, + GNUNET_OS_INHERIT_STD_OUT_AND_ERR, NULL, NULL, NULL, binary, "gnunet-service-statistics", "-c", "test_statistics_api_data.conf", NULL); GNUNET_assert (NULL != proc); - GNUNET_PROGRAM_run (5, argv, "test-statistics-api", "nohelp", options, &run, + GNUNET_PROGRAM_run (5, argv, + "test-statistics-api", "nohelp", + options, &run, &ok); - if (0 != GNUNET_OS_process_kill (proc, GNUNET_TERM_SIG)) + if (0 != GNUNET_OS_process_kill (proc, + GNUNET_TERM_SIG)) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); ok = 1; @@ -170,14 +199,19 @@ main (int argc, char *argv_ign[]) ok = 1; /* restart to check persistence! */ proc = - GNUNET_OS_start_process (GNUNET_YES, GNUNET_OS_INHERIT_STD_OUT_AND_ERR, + GNUNET_OS_start_process (GNUNET_YES, + GNUNET_OS_INHERIT_STD_OUT_AND_ERR, NULL, NULL, NULL, binary, "gnunet-service-statistics", - "-c", "test_statistics_api_data.conf", NULL); - GNUNET_PROGRAM_run (5, argv, "test-statistics-api", "nohelp", options, + "-c", "test_statistics_api_data.conf", + NULL); + GNUNET_PROGRAM_run (5, argv, + "test-statistics-api", "nohelp", + options, &run_more, &ok); - if (0 != GNUNET_OS_process_kill (proc, GNUNET_TERM_SIG)) + if (0 != GNUNET_OS_process_kill (proc, + GNUNET_TERM_SIG)) { GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); ok = 1; diff --git a/src/statistics/test_statistics_api_loop.c b/src/statistics/test_statistics_api_loop.c index 64e6dec5f2..a7e2cbab50 100644 --- a/src/statistics/test_statistics_api_loop.c +++ b/src/statistics/test_statistics_api_loop.c @@ -70,7 +70,7 @@ run (void *cls, char *const *args, const char *cfgfile, i = 0; GNUNET_break (NULL != GNUNET_STATISTICS_get (h, NULL, "test-0", - GNUNET_TIME_UNIT_MINUTES, &next, + &next, &check_1, cls)); } diff --git a/src/testbed/testbed_api_statistics.c b/src/testbed/testbed_api_statistics.c index a4778f84d5..4fd117953e 100644 --- a/src/testbed/testbed_api_statistics.c +++ b/src/testbed/testbed_api_statistics.c @@ -262,10 +262,11 @@ service_connect_comp (void *cls, struct PeerGetStatsContext *peer_sc = cls; struct GNUNET_STATISTICS_Handle *h = ca_result; - LOG_DEBUG ("Retrieving statistics of peer %u\n", peer_sc->peer_index); + LOG_DEBUG ("Retrieving statistics of peer %u\n", + peer_sc->peer_index); peer_sc->get_handle = - GNUNET_STATISTICS_get (h, peer_sc->sc->subsystem, peer_sc->sc->name, - GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_STATISTICS_get (h, peer_sc->sc->subsystem, + peer_sc->sc->name, &iteration_completion_cb, iterator_cb, peer_sc); } -- cgit v1.2.3-18-g5258