aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/peerstore/gnunet-service-peerstore.c4
-rw-r--r--src/peerstore/peerstore_api.c89
-rw-r--r--src/sensor/Makefile.am1
-rw-r--r--src/sensor/gnunet-service-sensor.c57
4 files changed, 140 insertions, 11 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c
index c620bd57ba..140db80d87 100644
--- a/src/peerstore/gnunet-service-peerstore.c
+++ b/src/peerstore/gnunet-service-peerstore.c
@@ -32,7 +32,7 @@
/**
* Interval for expired records cleanup (in seconds)
*/
-#define CLEANUP_INTERVAL 300 /* 5mins */
+#define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */
/**
* Our configuration.
@@ -96,7 +96,7 @@ cleanup_expired_records(void *cls,
deleted = db->expire_records(db->cls, GNUNET_TIME_absolute_get());
GNUNET_log(GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted);
GNUNET_SCHEDULER_add_delayed(
- GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, CLEANUP_INTERVAL),
+ GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL),
&cleanup_expired_records, NULL);
}
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 2b1cc6a1dd..b53bc2f1a2 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -245,6 +245,27 @@ static void
reconnect (struct GNUNET_PEERSTORE_Handle *h);
/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_WatchContext *'
+ */
+void watch_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_IterateContext *'
+ */
+void iterate_request_sent (void *cls);
+
+/**
+ * Callback after MQ envelope is sent
+ *
+ * @param cls a 'struct GNUNET_PEERSTORE_StoreContext *'
+ */
+void store_request_sent (void *cls);
+
+/**
* MQ message handlers
*/
static const struct GNUNET_MQ_MessageHandler mq_handlers[] = {
@@ -268,6 +289,28 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
}
/**
+ * Iterator over previous watches to resend them
+ */
+int rewatch_it(void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_PEERSTORE_Handle *h = cls;
+ struct GNUNET_PEERSTORE_WatchContext *wc = value;
+ struct StoreKeyHashMessage *hm;
+
+ if(GNUNET_YES == wc->request_sent)
+ { /* Envelope gone, create new one. */
+ wc->ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+ hm->keyhash = wc->keyhash;
+ wc->request_sent = GNUNET_NO;
+ }
+ GNUNET_MQ_notify_sent(wc->ev, &watch_request_sent, wc);
+ GNUNET_MQ_send(h->mq, wc->ev);
+ return GNUNET_YES;
+}
+
+/**
* Close the existing connection to PEERSTORE and reconnect.
*
* @param h handle to the service
@@ -275,6 +318,11 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
static void
reconnect (struct GNUNET_PEERSTORE_Handle *h)
{
+ struct GNUNET_PEERSTORE_IterateContext *ic;
+ GNUNET_PEERSTORE_Processor icb;
+ void *icb_cls;
+ struct GNUNET_PEERSTORE_StoreContext *sc;
+
LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
if (NULL != h->mq)
{
@@ -287,13 +335,43 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
h->client = NULL;
}
h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
- //FIXME: retry connecting if fails again (client == NULL)
+ GNUNET_assert(NULL != h->client);
h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
mq_handlers,
&handle_client_error,
h);
- //FIXME: resend pending requests after reconnecting
-
+ LOG(GNUNET_ERROR_TYPE_DEBUG,
+ "Resending pending requests after reconnect.\n");
+ if (NULL != h->watches)
+ {
+ GNUNET_CONTAINER_multihashmap_iterate(h->watches,
+ &rewatch_it, h);
+ }
+ ic = h->iterate_head;
+ while (NULL != ic)
+ {
+ if (GNUNET_YES == ic->request_sent)
+ {
+ icb = ic->callback;
+ icb_cls = ic->callback_cls;
+ GNUNET_PEERSTORE_iterate_cancel(ic);
+ if(NULL != icb)
+ icb(icb_cls, NULL,_("Iteration canceled due to reconnection."));
+ }
+ else
+ {
+ GNUNET_MQ_notify_sent(ic->ev, &iterate_request_sent, ic);
+ GNUNET_MQ_send(h->mq, ic->ev);
+ }
+ ic = ic->next;
+ }
+ sc = h->store_head;
+ while (NULL != sc)
+ {
+ GNUNET_MQ_notify_sent(sc->ev, &store_request_sent, sc);
+ GNUNET_MQ_send(h->mq, sc->ev);
+ sc = sc->next;
+ }
}
/**
@@ -336,6 +414,7 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
void
GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
{
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnecting.\n");
if(NULL != h->watches)
{
GNUNET_CONTAINER_multihashmap_destroy(h->watches);
@@ -442,7 +521,7 @@ GNUNET_PEERSTORE_store (struct GNUNET_PEERSTORE_Handle *h,
sc->cont = cont;
sc->cont_cls = cont_cls;
sc->h = h;
- GNUNET_CONTAINER_DLL_insert(h->store_head, h->store_tail, sc);
+ GNUNET_CONTAINER_DLL_insert_tail(h->store_head, h->store_tail, sc);
GNUNET_MQ_notify_sent(ev, &store_request_sent, sc);
GNUNET_MQ_send(h->mq, ev);
return sc;
@@ -604,7 +683,7 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
ic->ev = ev;
ic->h = h;
ic->request_sent = GNUNET_NO;
- GNUNET_CONTAINER_DLL_insert(h->iterate_head, h->iterate_tail, ic);
+ GNUNET_CONTAINER_DLL_insert_tail(h->iterate_head, h->iterate_tail, ic);
LOG(GNUNET_ERROR_TYPE_DEBUG,
"Sending an iterate request for sub system `%s'\n", sub_system);
GNUNET_MQ_notify_sent(ev, &iterate_request_sent, ic);
diff --git a/src/sensor/Makefile.am b/src/sensor/Makefile.am
index e3d9bbe696..b6c55f5e61 100644
--- a/src/sensor/Makefile.am
+++ b/src/sensor/Makefile.am
@@ -36,6 +36,7 @@ gnunet_service_sensor_SOURCES = \
gnunet_service_sensor_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
+ $(top_builddir)/src/peerstore/libgnunetpeerstore.la \
$(GN_LIBINTL)
libgnunetsensor_la_SOURCES = \
diff --git a/src/sensor/gnunet-service-sensor.c b/src/sensor/gnunet-service-sensor.c
index 6d47cb1e27..b42562ace6 100644
--- a/src/sensor/gnunet-service-sensor.c
+++ b/src/sensor/gnunet-service-sensor.c
@@ -28,6 +28,7 @@
#include "gnunet_util_lib.h"
#include "sensor.h"
#include "gnunet_statistics_service.h"
+#include "gnunet_peerstore_service.h"
/**
* Minimum sensor execution interval (in seconds)
@@ -204,6 +205,21 @@ static const char *datatypes[] = { "uint64", "double", "string", NULL };
struct GNUNET_STATISTICS_Handle *statistics;
/**
+ * Handle to peerstore service
+ */
+struct GNUNET_PEERSTORE_Handle *peerstore;
+
+/**
+ * Service name
+ */
+char *subsystem = "sensor";
+
+/**
+ * My peer id
+ */
+struct GNUNET_PeerIdentity peerid;
+
+/**
* Remove sensor execution from scheduler
*
* @param cls unused
@@ -290,7 +306,15 @@ shutdown_task (void *cls,
GNUNET_CONTAINER_multihashmap_iterate(sensors, &destroy_sensor, NULL);
GNUNET_CONTAINER_multihashmap_destroy(sensors);
if(NULL != statistics)
+ {
GNUNET_STATISTICS_destroy(statistics, GNUNET_YES);
+ statistics = NULL;
+ }
+ if(NULL != peerstore)
+ {
+ GNUNET_PEERSTORE_disconnect(peerstore);
+ peerstore = NULL;
+ }
GNUNET_SCHEDULER_shutdown();
}
@@ -816,8 +840,21 @@ int sensor_statistics_iterator (void *cls,
int is_persistent)
{
struct SensorInfo *sensorinfo = cls;
+ struct GNUNET_TIME_Absolute expiry;
GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %" PRIu64 "\n", sensorinfo->name, value);
+ //FIXME: store first line, last line or all ??
+ expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval);
+ GNUNET_PEERSTORE_store(peerstore,
+ subsystem,
+ &peerid,
+ sensorinfo->name,
+ &value,
+ sizeof(value),
+ expiry,
+ GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
+ NULL,
+ NULL);
return GNUNET_OK;
}
@@ -845,6 +882,7 @@ void end_sensor_run_stat (void *cls, int success)
void sensor_process_callback (void *cls, const char *line)
{
struct SensorInfo *sensorinfo = cls;
+ struct GNUNET_TIME_Absolute expiry;
if(NULL == line) //end of output
{
@@ -854,6 +892,18 @@ void sensor_process_callback (void *cls, const char *line)
return;
}
GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Received a value for sensor `%s': %s\n", sensorinfo->name, line);
+ //FIXME: store first line, last line or all ??
+ expiry = GNUNET_TIME_relative_to_absolute(sensorinfo->interval);
+ GNUNET_PEERSTORE_store(peerstore,
+ subsystem,
+ &peerid,
+ sensorinfo->name,
+ line,
+ strlen(line) + 1,
+ expiry,
+ GNUNET_PEERSTORE_STOREOPTION_MULTIPLE,
+ NULL,
+ NULL);
}
/**
@@ -903,10 +953,6 @@ sensor_run (void *cls,
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Starting the execution of sensor `%s'\n", sensorinfo->name);
if(sources[0] == sensorinfo->source) //gnunet-statistics
{
- if(NULL == statistics)
- {
- statistics = GNUNET_STATISTICS_create("sensor", cfg);
- }
sensorinfo->gnunet_stat_get_handle = GNUNET_STATISTICS_get(statistics,
sensorinfo->gnunet_stat_service,
sensorinfo->gnunet_stat_name,
@@ -1032,6 +1078,9 @@ run (void *cls,
sensors = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
reload_sensors();
schedule_all_sensors();
+ statistics = GNUNET_STATISTICS_create("sensor", cfg);
+ GNUNET_CRYPTO_get_peer_identity(cfg, &peerid);
+ peerstore = GNUNET_PEERSTORE_connect(cfg);
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SERVER_disconnect_notify (server,
&handle_client_disconnect,