diff options
-rw-r--r-- | src/peerstore/gnunet-service-peerstore.c | 4 | ||||
-rw-r--r-- | src/peerstore/peerstore_api.c | 89 | ||||
-rw-r--r-- | src/sensor/Makefile.am | 1 | ||||
-rw-r--r-- | src/sensor/gnunet-service-sensor.c | 57 |
4 files changed, 140 insertions, 11 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index c620bd57ba..140db80d87 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c @@ -32,7 +32,7 @@ /** * Interval for expired records cleanup (in seconds) */ -#define CLEANUP_INTERVAL 300 /* 5mins */ +#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */ /** * Our configuration. @@ -96,7 +96,7 @@ cleanup_expired_records(void *cls, deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get()); GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted); GNUNET_SCHEDULER_add_delayed( - GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, CLEANUP_INTERVAL), + GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL), &cleanup_expired_records, NULL); } diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c index 2b1cc6a1dd..b53bc2f1a2 100644 --- a/src/peerstore/peerstore_api.c +++ b/src/peerstore/peerstore_api.c @@ -245,6 +245,27 @@ static void reconnect (struct GNUNET_PEERSTORE_Handle *h); /** + * Callback after MQ envelope is sent + * + * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *' + */ +void watch_request_sent (void *cls); + +/** + * Callback after MQ envelope is sent + * + * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *' + */ +void iterate_request_sent (void *cls); + +/** + * Callback after MQ envelope is sent + * + * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *' + */ +void store_request_sent (void *cls); + +/** * MQ message handlers */ static const struct GNUNET_MQ_MessageHandler mq_handlers[] = { @@ -268,6 +289,28 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error) } /** + * Iterator over previous watches to resend them + */ +int rewatch_it(void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_PEERSTORE_Handle *h = cls; + struct GNUNET_PEERSTORE_WatchContext *wc = value; + struct StoreKeyHashMessage *hm; + + if(GNUNET_YES == wc->request_sent) + { /* Envelope gone, create new one. */ + wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH); + hm->keyhash = wc->keyhash; + wc->request_sent = GNUNET_NO; + } + GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc); + GNUNET_MQ_send(h->mq, wc->ev); + return GNUNET_YES; +} + +/** * Close the existing connection to PEERSTORE and reconnect. * * @param h handle to the service @@ -275,6 +318,11 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error) static void reconnect (struct GNUNET_PEERSTORE_Handle *h) { + struct GNUNET_PEERSTORE_IterateContext *ic; + GNUNET_PEERSTORE_Processor icb; + void *icb_cls; + struct GNUNET_PEERSTORE_StoreContext *sc; + LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n"); if (NULL != h->mq) { @@ -287,13 +335,43 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h) h->client = NULL; } h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg); - //FIXME: retry connecting if fails again (client == NULL) + GNUNET_assert(NULL != h->client); h->mq = GNUNET_MQ_queue_for_connection_client(h->client, mq_handlers, &handle_client_error, h); - //FIXME: resend pending requests after reconnecting - + LOG(GNUNET_ERROR_TYPE_DEBUG, + "Resending pending requests after reconnect.\n"); + if (NULL != h->watches) + { + GNUNET_CONTAINER_multihashmap_iterate(h->watches, + &rewatch_it, h); + } + ic = h->iterate_head; + while (NULL != ic) + { + if (GNUNET_YES == ic->request_sent) + { + icb = ic->callback; + icb_cls = ic->callback_cls; + GNUNET_PEERSTORE_iterate_cancel(ic); + if(NULL != icb) + icb(icb_cls, NULL,_("Iteration canceled due to reconnection.")); + } + else + { + GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic); + GNUNET_MQ_send(h->mq, ic->ev); + } + ic = ic->next; + } + sc = h->store_head; + while (NULL != sc) + { + GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc); + GNUNET_MQ_send(h->mq, sc->ev); + sc = sc->next; + } } /** @@ -336,6 +414,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) void GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h) { + LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n"); if(NULL != h->watches) { GNUNET_CONTAINER_multihashmap_destroy(h->watches); @@ -442,7 +521,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h, sc->cont = cont; sc->cont_cls = cont_cls; sc->h = h; - GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc); + GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc); GNUNET_MQ_notify_sent(ev, &store_request_sent, sc); GNUNET_MQ_send(h->mq, ev); return sc; @@ -604,7 +683,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h, ic->ev = ev; ic->h = h; ic->request_sent = GNUNET_NO; - GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic); + GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic); LOG(GNUNET_ERROR_TYPE_DEBUG, "Sending an iterate request for sub system `%s'\n", sub_system); GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic); diff --git a/src/sensor/Makefile.am b/src/sensor/Makefile.am index e3d9bbe696..b6c55f5e61 100644 --- a/src/sensor/Makefile.am +++ b/src/sensor/Makefile.am @@ -36,6 +36,7 @@ gnunet_service_sensor_SOURCES = \ gnunet_service_sensor_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/peerstore/libgnunetpeerstore.la \ $(GN_LIBINTL) libgnunetsensor_la_SOURCES = \ diff --git a/src/sensor/gnunet-service-sensor.c b/src/sensor/gnunet-service-sensor.c index 6d47cb1e27..b42562ace6 100644 --- a/src/sensor/gnunet-service-sensor.c +++ b/src/sensor/gnunet-service-sensor.c @@ -28,6 +28,7 @@ #include "gnunet_util_lib.h" #include "sensor.h" #include "gnunet_statistics_service.h" +#include "gnunet_peerstore_service.h" /** * Minimum sensor execution interval (in seconds) @@ -204,6 +205,21 @@ static const char *datatypes[] = { "uint64", "double", "string", NULL }; struct GNUNET_STATISTICS_Handle *statistics; /** + * Handle to peerstore service + */ +struct GNUNET_PEERSTORE_Handle *peerstore; + +/** + * Service name + */ +char *subsystem = "sensor"; + +/** + * My peer id + */ +struct GNUNET_PeerIdentity peerid; + +/** * Remove sensor execution from scheduler * * @param cls unused @@ -290,7 +306,15 @@ shutdown_task (void *cls, GNUNET_CONTAINER_multihashmap_iterate(sensors, &destroy_sensor, NULL); GNUNET_CONTAINER_multihashmap_destroy(sensors); if(NULL != statistics) + { GNUNET_STATISTICS_destroy(statistics, GNUNET_YES); + statistics = NULL; + } + if(NULL != peerstore) + { + GNUNET_PEERSTORE_disconnect(peerstore); + peerstore = NULL; + } GNUNET_SCHEDULER_shutdown(); } @@ -816,8 +840,21 @@ int sensor_statistics_iterator (void *cls, int is_persistent) { struct SensorInfo *sensorinfo = cls; + struct GNUNET_TIME_Absolute expiry; GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %" PRIu64 "\n", sensorinfo->name, value); + //FIXME: store first line, last line or all ?? + expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval); + GNUNET_PEERSTORE_store(peerstore, + subsystem, + &peerid, + sensorinfo->name, + &value, + sizeof(value), + expiry, + GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, + NULL, + NULL); return GNUNET_OK; } @@ -845,6 +882,7 @@ void end_sensor_run_stat (void *cls, int success) void sensor_process_callback (void *cls, const char *line) { struct SensorInfo *sensorinfo = cls; + struct GNUNET_TIME_Absolute expiry; if(NULL == line) //end of output { @@ -854,6 +892,18 @@ void sensor_process_callback (void *cls, const char *line) return; } GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %s\n", sensorinfo->name, line); + //FIXME: store first line, last line or all ?? + expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval); + GNUNET_PEERSTORE_store(peerstore, + subsystem, + &peerid, + sensorinfo->name, + line, + strlen(line) + 1, + expiry, + GNUNET_PEERSTORE_STOREOPTION_MULTIPLE, + NULL, + NULL); } /** @@ -903,10 +953,6 @@ sensor_run (void *cls, GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Starting the execution of sensor `%s'\n", sensorinfo->name); if(sources[0] == sensorinfo->source) //gnunet-statistics { - if(NULL == statistics) - { - statistics = GNUNET_STATISTICS_create("sensor", cfg); - } sensorinfo->gnunet_stat_get_handle = GNUNET_STATISTICS_get(statistics, sensorinfo->gnunet_stat_service, sensorinfo->gnunet_stat_name, @@ -1032,6 +1078,9 @@ run (void *cls, sensors = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); reload_sensors(); schedule_all_sensors(); + statistics = GNUNET_STATISTICS_create("sensor", cfg); + GNUNET_CRYPTO_get_peer_identity(cfg, &peerid); + peerstore = GNUNET_PEERSTORE_connect(cfg); GNUNET_SERVER_add_handlers (server, handlers); GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, |