aboutsummaryrefslogtreecommitdiff
path: root/src/namestore/namestore_api_monitor.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2016-06-25 15:37:04 +0000
committerChristian Grothoff <christian@grothoff.org>2016-06-25 15:37:04 +0000
commit789c62896357697e70bc477bf2ebfc33786d580c (patch)
tree31b8be842e6e58c300b173363c3d9954681c19b5 /src/namestore/namestore_api_monitor.c
parentbc09b870c221f1d9c3c61b8ee251fa0f25c7aa22 (diff)
convert namestore_api_monitor to MQ
Diffstat (limited to 'src/namestore/namestore_api_monitor.c')
-rw-r--r--src/namestore/namestore_api_monitor.c249
1 files changed, 133 insertions, 116 deletions
diff --git a/src/namestore/namestore_api_monitor.c b/src/namestore/namestore_api_monitor.c
index 9fd45600d6..85131f9ccb 100644
--- a/src/namestore/namestore_api_monitor.c
+++ b/src/namestore/namestore_api_monitor.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- Copyright (C) 2013 GNUnet e.V.
+ Copyright (C) 2013, 2016 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -48,7 +48,7 @@ struct GNUNET_NAMESTORE_ZoneMonitor
/**
* Handle to namestore service.
*/
- struct GNUNET_CLIENT_Connection *h;
+ struct GNUNET_MQ_Handle *mq;
/**
* Function to call on events.
@@ -61,16 +61,11 @@ struct GNUNET_NAMESTORE_ZoneMonitor
GNUNET_NAMESTORE_RecordsSynchronizedCallback sync_cb;
/**
- * Closure for 'monitor' and 'sync_cb'.
+ * Closure for @e monitor and @e sync_cb.
*/
void *cls;
/**
- * Transmission handle to client.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
- /**
* Monitored zone.
*/
struct GNUNET_CRYPTO_EcdsaPrivateKey zone;
@@ -84,55 +79,42 @@ struct GNUNET_NAMESTORE_ZoneMonitor
/**
- * Send our request to start monitoring to the service.
+ * Reconnect to the namestore service.
*
- * @param cls the monitor handle
- * @param size number of bytes available in buf
- * @param buf where to copy the message to the service
- * @return number of bytes copied to buf
+ * @param zm monitor to reconnect
*/
-static size_t
-transmit_monitor_message (void *cls,
- size_t size,
- void *buf);
+static void
+reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm);
/**
- * Reconnect to the namestore service.
+ * Handle SYNC message from the namestore service.
*
- * @param zm monitor to reconnect
+ * @param cls the monitor
+ * @param msg the sync message
*/
static void
-reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
+handle_sync (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
- if (NULL != zm->h)
- GNUNET_CLIENT_disconnect (zm->h);
- zm->monitor (zm->cls,
- NULL,
- NULL, 0, NULL);
- GNUNET_assert (NULL != (zm->h = GNUNET_CLIENT_connect ("namestore", zm->cfg)));
- zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h,
- sizeof (struct ZoneMonitorStartMessage),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_monitor_message,
- zm);
+ struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
+
+ if (NULL != zm->sync_cb)
+ zm->sync_cb (zm->cls);
}
/**
* We've received a notification about a change to our zone.
- * Forward to monitor callback.
+ * Check that it is well-formed.
*
* @param cls the zone monitor handle
- * @param msg the message from the service.
+ * @param lrm the message from the service.
*/
-static void
-handle_updates (void *cls,
- const struct GNUNET_MessageHeader *msg)
+static int
+check_result (void *cls,
+ const struct RecordResultMessage *lrm)
{
- struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
- const struct RecordResultMessage *lrm;
size_t lrm_len;
size_t exp_lrm_len;
size_t name_len;
@@ -141,30 +123,6 @@ handle_updates (void *cls,
const char *name_tmp;
const char *rd_ser_tmp;
- if (NULL == msg)
- {
- reconnect (zm);
- return;
- }
- if ( (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)) &&
- (GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC == ntohs (msg->type) ) )
- {
- GNUNET_CLIENT_receive (zm->h,
- &handle_updates,
- zm,
- GNUNET_TIME_UNIT_FOREVER_REL);
- if (NULL != zm->sync_cb)
- zm->sync_cb (zm->cls);
- return;
- }
- if ( (ntohs (msg->size) < sizeof (struct RecordResultMessage)) ||
- (GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT != ntohs (msg->type) ) )
- {
- GNUNET_break (0);
- reconnect (zm);
- return;
- }
- lrm = (const struct RecordResultMessage *) msg;
lrm_len = ntohs (lrm->gns_header.header.size);
rd_len = ntohs (lrm->rd_len);
rd_count = ntohs (lrm->rd_count);
@@ -173,79 +131,143 @@ handle_updates (void *cls,
if (lrm_len != exp_lrm_len)
{
GNUNET_break (0);
- reconnect (zm);
- return;
+ return GNUNET_SYSERR;
}
if (0 == name_len)
{
GNUNET_break (0);
- reconnect (zm);
- return;
+ return GNUNET_SYSERR;
}
name_tmp = (const char *) &lrm[1];
if ((name_tmp[name_len -1] != '\0') || (name_len > MAX_NAME_LEN))
{
GNUNET_break (0);
- reconnect (zm);
- return;
+ return GNUNET_SYSERR;
}
rd_ser_tmp = (const char *) &name_tmp[name_len];
{
struct GNUNET_GNSRECORD_Data rd[rd_count];
- if (GNUNET_OK != GNUNET_GNSRECORD_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd))
+ if (GNUNET_OK !=
+ GNUNET_GNSRECORD_records_deserialize (rd_len,
+ rd_ser_tmp,
+ rd_count,
+ rd))
{
GNUNET_break (0);
- reconnect (zm);
- return;
+ return GNUNET_SYSERR;
}
- GNUNET_CLIENT_receive (zm->h,
- &handle_updates,
- zm,
- GNUNET_TIME_UNIT_FOREVER_REL);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * We've received a notification about a change to our zone.
+ * Forward to monitor callback.
+ *
+ * @param cls the zone monitor handle
+ * @param lrm the message from the service.
+ */
+static void
+handle_result (void *cls,
+ const struct RecordResultMessage *lrm)
+{
+ struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
+ size_t name_len;
+ size_t rd_len;
+ unsigned rd_count;
+ const char *name_tmp;
+ const char *rd_ser_tmp;
+
+ rd_len = ntohs (lrm->rd_len);
+ rd_count = ntohs (lrm->rd_count);
+ name_len = ntohs (lrm->name_len);
+ name_tmp = (const char *) &lrm[1];
+ rd_ser_tmp = (const char *) &name_tmp[name_len];
+ {
+ struct GNUNET_GNSRECORD_Data rd[rd_count];
+
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_GNSRECORD_records_deserialize (rd_len,
+ rd_ser_tmp,
+ rd_count,
+ rd));
zm->monitor (zm->cls,
&lrm->private_key,
name_tmp,
- rd_count, rd);
+ rd_count,
+ rd);
}
}
/**
- * Send our request to start monitoring to the service.
+ * Generic error handler, called with the appropriate error code and
+ * the same closure specified at the creation of the message queue.
+ * Not every message queue implementation supports an error handler.
*
- * @param cls the monitor handle
- * @param size number of bytes available in buf
- * @param buf where to copy the message to the service
- * @return number of bytes copied to buf
+ * @param cls closure with the `struct GNUNET_NAMESTORE_ZoneMonitor *`
+ * @param error error code
*/
-static size_t
-transmit_monitor_message (void *cls,
- size_t size,
- void *buf)
+static void
+mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
{
struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls;
- struct ZoneMonitorStartMessage sm;
- zm->th = NULL;
- if (size < sizeof (struct ZoneMonitorStartMessage))
+ reconnect (zm);
+}
+
+
+/**
+ * Reconnect to the namestore service.
+ *
+ * @param zm monitor to reconnect
+ */
+static void
+reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
+{
+ GNUNET_MQ_hd_fixed_size (sync,
+ GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC,
+ struct GNUNET_MessageHeader);
+ GNUNET_MQ_hd_var_size (result,
+ GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT,
+ struct RecordResultMessage);
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ make_sync_handler (zm),
+ make_result_handler (zm),
+ GNUNET_MQ_handler_end ()
+ };
+ struct GNUNET_MQ_Envelope *env;
+ struct ZoneMonitorStartMessage *sm;
+
+ if (NULL != zm->mq)
{
- reconnect (zm);
- return 0;
+ GNUNET_MQ_destroy (zm->mq);
+ zm->monitor (zm->cls,
+ NULL,
+ NULL,
+ 0,
+ NULL);
}
- sm.header.type = htons (GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START);
- sm.header.size = htons (sizeof (struct ZoneMonitorStartMessage));
- sm.iterate_first = htonl (zm->iterate_first);
- sm.zone = zm->zone;
- memcpy (buf, &sm, sizeof (sm));
- GNUNET_CLIENT_receive (zm->h,
- &handle_updates,
- zm,
- GNUNET_TIME_UNIT_FOREVER_REL);
- return sizeof (sm);
+ zm->mq = GNUNET_CLIENT_connecT (zm->cfg,
+ "namestore",
+ handlers,
+ &mq_error_handler,
+ zm);
+ if (NULL == zm->mq)
+ return;
+ env = GNUNET_MQ_msg (sm,
+ GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START);
+ sm->iterate_first = htonl (zm->iterate_first);
+ sm->zone = zm->zone;
+ GNUNET_MQ_send (zm->mq,
+ env);
}
+
/**
* Begin monitoring a zone for changes. If @a iterate_first is set,
* we Will first call the @a monitor function on all existing records
@@ -270,25 +292,21 @@ GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *c
void *cls)
{
struct GNUNET_NAMESTORE_ZoneMonitor *zm;
- struct GNUNET_CLIENT_Connection *client;
- if (NULL == (client = GNUNET_CLIENT_connect ("namestore", cfg)))
- return NULL;
zm = GNUNET_new (struct GNUNET_NAMESTORE_ZoneMonitor);
- zm->cfg = cfg;
- zm->h = client;
if (NULL != zone)
zm->zone = *zone;
zm->iterate_first = iterate_first;
zm->monitor = monitor;
zm->sync_cb = sync_cb;
zm->cls = cls;
- zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h,
- sizeof (struct ZoneMonitorStartMessage),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_YES,
- &transmit_monitor_message,
- zm);
+ zm->cfg = cfg;
+ reconnect (zm);
+ if (NULL == zm->mq)
+ {
+ GNUNET_free (zm);
+ return NULL;
+ }
return zm;
}
@@ -301,12 +319,11 @@ GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *c
void
GNUNET_NAMESTORE_zone_monitor_stop (struct GNUNET_NAMESTORE_ZoneMonitor *zm)
{
- if (NULL != zm->th)
+ if (NULL != zm->mq)
{
- GNUNET_CLIENT_notify_transmit_ready_cancel (zm->th);
- zm->th = NULL;
+ GNUNET_MQ_destroy (zm->mq);
+ zm->mq = NULL;
}
- GNUNET_CLIENT_disconnect (zm->h);
GNUNET_free (zm);
}