diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-25 15:37:04 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-25 15:37:04 +0000 |
commit | 789c62896357697e70bc477bf2ebfc33786d580c (patch) | |
tree | 31b8be842e6e58c300b173363c3d9954681c19b5 /src/namestore/namestore_api_monitor.c | |
parent | bc09b870c221f1d9c3c61b8ee251fa0f25c7aa22 (diff) |
convert namestore_api_monitor to MQ
Diffstat (limited to 'src/namestore/namestore_api_monitor.c')
-rw-r--r-- | src/namestore/namestore_api_monitor.c | 249 |
1 files changed, 133 insertions, 116 deletions
diff --git a/src/namestore/namestore_api_monitor.c b/src/namestore/namestore_api_monitor.c index 9fd45600d6..85131f9ccb 100644 --- a/src/namestore/namestore_api_monitor.c +++ b/src/namestore/namestore_api_monitor.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2013 GNUnet e.V. + Copyright (C) 2013, 2016 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -48,7 +48,7 @@ struct GNUNET_NAMESTORE_ZoneMonitor /** * Handle to namestore service. */ - struct GNUNET_CLIENT_Connection *h; + struct GNUNET_MQ_Handle *mq; /** * Function to call on events. @@ -61,16 +61,11 @@ struct GNUNET_NAMESTORE_ZoneMonitor GNUNET_NAMESTORE_RecordsSynchronizedCallback sync_cb; /** - * Closure for 'monitor' and 'sync_cb'. + * Closure for @e monitor and @e sync_cb. */ void *cls; /** - * Transmission handle to client. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - - /** * Monitored zone. */ struct GNUNET_CRYPTO_EcdsaPrivateKey zone; @@ -84,55 +79,42 @@ struct GNUNET_NAMESTORE_ZoneMonitor /** - * Send our request to start monitoring to the service. + * Reconnect to the namestore service. * - * @param cls the monitor handle - * @param size number of bytes available in buf - * @param buf where to copy the message to the service - * @return number of bytes copied to buf + * @param zm monitor to reconnect */ -static size_t -transmit_monitor_message (void *cls, - size_t size, - void *buf); +static void +reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm); /** - * Reconnect to the namestore service. + * Handle SYNC message from the namestore service. * - * @param zm monitor to reconnect + * @param cls the monitor + * @param msg the sync message */ static void -reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) +handle_sync (void *cls, + const struct GNUNET_MessageHeader *msg) { - if (NULL != zm->h) - GNUNET_CLIENT_disconnect (zm->h); - zm->monitor (zm->cls, - NULL, - NULL, 0, NULL); - GNUNET_assert (NULL != (zm->h = GNUNET_CLIENT_connect ("namestore", zm->cfg))); - zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h, - sizeof (struct ZoneMonitorStartMessage), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_monitor_message, - zm); + struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; + + if (NULL != zm->sync_cb) + zm->sync_cb (zm->cls); } /** * We've received a notification about a change to our zone. - * Forward to monitor callback. + * Check that it is well-formed. * * @param cls the zone monitor handle - * @param msg the message from the service. + * @param lrm the message from the service. */ -static void -handle_updates (void *cls, - const struct GNUNET_MessageHeader *msg) +static int +check_result (void *cls, + const struct RecordResultMessage *lrm) { - struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; - const struct RecordResultMessage *lrm; size_t lrm_len; size_t exp_lrm_len; size_t name_len; @@ -141,30 +123,6 @@ handle_updates (void *cls, const char *name_tmp; const char *rd_ser_tmp; - if (NULL == msg) - { - reconnect (zm); - return; - } - if ( (ntohs (msg->size) == sizeof (struct GNUNET_MessageHeader)) && - (GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC == ntohs (msg->type) ) ) - { - GNUNET_CLIENT_receive (zm->h, - &handle_updates, - zm, - GNUNET_TIME_UNIT_FOREVER_REL); - if (NULL != zm->sync_cb) - zm->sync_cb (zm->cls); - return; - } - if ( (ntohs (msg->size) < sizeof (struct RecordResultMessage)) || - (GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT != ntohs (msg->type) ) ) - { - GNUNET_break (0); - reconnect (zm); - return; - } - lrm = (const struct RecordResultMessage *) msg; lrm_len = ntohs (lrm->gns_header.header.size); rd_len = ntohs (lrm->rd_len); rd_count = ntohs (lrm->rd_count); @@ -173,79 +131,143 @@ handle_updates (void *cls, if (lrm_len != exp_lrm_len) { GNUNET_break (0); - reconnect (zm); - return; + return GNUNET_SYSERR; } if (0 == name_len) { GNUNET_break (0); - reconnect (zm); - return; + return GNUNET_SYSERR; } name_tmp = (const char *) &lrm[1]; if ((name_tmp[name_len -1] != '\0') || (name_len > MAX_NAME_LEN)) { GNUNET_break (0); - reconnect (zm); - return; + return GNUNET_SYSERR; } rd_ser_tmp = (const char *) &name_tmp[name_len]; { struct GNUNET_GNSRECORD_Data rd[rd_count]; - if (GNUNET_OK != GNUNET_GNSRECORD_records_deserialize (rd_len, rd_ser_tmp, rd_count, rd)) + if (GNUNET_OK != + GNUNET_GNSRECORD_records_deserialize (rd_len, + rd_ser_tmp, + rd_count, + rd)) { GNUNET_break (0); - reconnect (zm); - return; + return GNUNET_SYSERR; } - GNUNET_CLIENT_receive (zm->h, - &handle_updates, - zm, - GNUNET_TIME_UNIT_FOREVER_REL); + } + return GNUNET_OK; +} + + +/** + * We've received a notification about a change to our zone. + * Forward to monitor callback. + * + * @param cls the zone monitor handle + * @param lrm the message from the service. + */ +static void +handle_result (void *cls, + const struct RecordResultMessage *lrm) +{ + struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; + size_t name_len; + size_t rd_len; + unsigned rd_count; + const char *name_tmp; + const char *rd_ser_tmp; + + rd_len = ntohs (lrm->rd_len); + rd_count = ntohs (lrm->rd_count); + name_len = ntohs (lrm->name_len); + name_tmp = (const char *) &lrm[1]; + rd_ser_tmp = (const char *) &name_tmp[name_len]; + { + struct GNUNET_GNSRECORD_Data rd[rd_count]; + + GNUNET_assert (GNUNET_OK == + GNUNET_GNSRECORD_records_deserialize (rd_len, + rd_ser_tmp, + rd_count, + rd)); zm->monitor (zm->cls, &lrm->private_key, name_tmp, - rd_count, rd); + rd_count, + rd); } } /** - * Send our request to start monitoring to the service. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls the monitor handle - * @param size number of bytes available in buf - * @param buf where to copy the message to the service - * @return number of bytes copied to buf + * @param cls closure with the `struct GNUNET_NAMESTORE_ZoneMonitor *` + * @param error error code */ -static size_t -transmit_monitor_message (void *cls, - size_t size, - void *buf) +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_NAMESTORE_ZoneMonitor *zm = cls; - struct ZoneMonitorStartMessage sm; - zm->th = NULL; - if (size < sizeof (struct ZoneMonitorStartMessage)) + reconnect (zm); +} + + +/** + * Reconnect to the namestore service. + * + * @param zm monitor to reconnect + */ +static void +reconnect (struct GNUNET_NAMESTORE_ZoneMonitor *zm) +{ + GNUNET_MQ_hd_fixed_size (sync, + GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_SYNC, + struct GNUNET_MessageHeader); + GNUNET_MQ_hd_var_size (result, + GNUNET_MESSAGE_TYPE_NAMESTORE_RECORD_RESULT, + struct RecordResultMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_sync_handler (zm), + make_result_handler (zm), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MQ_Envelope *env; + struct ZoneMonitorStartMessage *sm; + + if (NULL != zm->mq) { - reconnect (zm); - return 0; + GNUNET_MQ_destroy (zm->mq); + zm->monitor (zm->cls, + NULL, + NULL, + 0, + NULL); } - sm.header.type = htons (GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START); - sm.header.size = htons (sizeof (struct ZoneMonitorStartMessage)); - sm.iterate_first = htonl (zm->iterate_first); - sm.zone = zm->zone; - memcpy (buf, &sm, sizeof (sm)); - GNUNET_CLIENT_receive (zm->h, - &handle_updates, - zm, - GNUNET_TIME_UNIT_FOREVER_REL); - return sizeof (sm); + zm->mq = GNUNET_CLIENT_connecT (zm->cfg, + "namestore", + handlers, + &mq_error_handler, + zm); + if (NULL == zm->mq) + return; + env = GNUNET_MQ_msg (sm, + GNUNET_MESSAGE_TYPE_NAMESTORE_MONITOR_START); + sm->iterate_first = htonl (zm->iterate_first); + sm->zone = zm->zone; + GNUNET_MQ_send (zm->mq, + env); } + /** * Begin monitoring a zone for changes. If @a iterate_first is set, * we Will first call the @a monitor function on all existing records @@ -270,25 +292,21 @@ GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *c void *cls) { struct GNUNET_NAMESTORE_ZoneMonitor *zm; - struct GNUNET_CLIENT_Connection *client; - if (NULL == (client = GNUNET_CLIENT_connect ("namestore", cfg))) - return NULL; zm = GNUNET_new (struct GNUNET_NAMESTORE_ZoneMonitor); - zm->cfg = cfg; - zm->h = client; if (NULL != zone) zm->zone = *zone; zm->iterate_first = iterate_first; zm->monitor = monitor; zm->sync_cb = sync_cb; zm->cls = cls; - zm->th = GNUNET_CLIENT_notify_transmit_ready (zm->h, - sizeof (struct ZoneMonitorStartMessage), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_YES, - &transmit_monitor_message, - zm); + zm->cfg = cfg; + reconnect (zm); + if (NULL == zm->mq) + { + GNUNET_free (zm); + return NULL; + } return zm; } @@ -301,12 +319,11 @@ GNUNET_NAMESTORE_zone_monitor_start (const struct GNUNET_CONFIGURATION_Handle *c void GNUNET_NAMESTORE_zone_monitor_stop (struct GNUNET_NAMESTORE_ZoneMonitor *zm) { - if (NULL != zm->th) + if (NULL != zm->mq) { - GNUNET_CLIENT_notify_transmit_ready_cancel (zm->th); - zm->th = NULL; + GNUNET_MQ_destroy (zm->mq); + zm->mq = NULL; } - GNUNET_CLIENT_disconnect (zm->h); GNUNET_free (zm); } |