diff options
author | Christian Grothoff <christian@grothoff.org> | 2016-06-22 07:19:52 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2016-06-22 07:19:52 +0000 |
commit | eed68e90e8564b578fd92ef130d305465cecf936 (patch) | |
tree | 0e2bc52e369179487bde1c047165c94d93669b36 /src/arm | |
parent | dd9ed7931e52705b216c346127108520c5e4460b (diff) |
convert monitor API to use MQ
Diffstat (limited to 'src/arm')
-rw-r--r-- | src/arm/arm_monitor_api.c | 282 | ||||
-rw-r--r-- | src/arm/gnunet-service-arm.c | 13 |
2 files changed, 111 insertions, 184 deletions
diff --git a/src/arm/arm_monitor_api.c b/src/arm/arm_monitor_api.c index 19a2f4eb93..6d4129928e 100644 --- a/src/arm/arm_monitor_api.c +++ b/src/arm/arm_monitor_api.c @@ -42,22 +42,17 @@ struct GNUNET_ARM_MonitorHandle /** * Our control connection to the ARM service. */ - struct GNUNET_CLIENT_Connection *monitor; + struct GNUNET_MQ_Handle *mq; /** * The configuration that we are using. */ - struct GNUNET_CONFIGURATION_Handle *cfg; - - /** - * Handle for our current transmission request. - */ - struct GNUNET_CLIENT_TransmitHandle *cth; + const struct GNUNET_CONFIGURATION_Handle *cfg; /** * ID of the reconnect task (if any). */ - struct GNUNET_SCHEDULER_Task * reconnect_task; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** * Current delay we use for re-trying to connect to core. @@ -65,32 +60,24 @@ struct GNUNET_ARM_MonitorHandle struct GNUNET_TIME_Relative retry_backoff; /** - * Are we currently disconnected and hence unable to send? - */ - unsigned char currently_down; - - /** * Callback to invoke on status updates. */ GNUNET_ARM_ServiceStatusCallback service_status; /** - * Closure for service_status. + * Closure for @e service_status. */ - void *cls; + void *service_status_cls; - /** - * ID of a task to run if we fail to get a reply to the init message in time. - */ - struct GNUNET_SCHEDULER_Task * init_timeout_task_id; }; -static void -monitor_notify_handler (void *cls, - const struct GNUNET_MessageHeader *msg); - - +/** + * Connect to the ARM service for monitoring. + * + * @param h handle to connect + * @return #GNUNET_OK on success + */ static int reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h); @@ -98,7 +85,7 @@ reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h); /** * Task scheduled to try to re-connect to arm. * - * @param cls the 'struct GNUNET_ARM_MonitorHandle' + * @param cls the `struct GNUNET_ARM_MonitorHandle` */ static void reconnect_arm_monitor_task (void *cls) @@ -108,7 +95,7 @@ reconnect_arm_monitor_task (void *cls) h->reconnect_task = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, "Connecting to ARM service for monitoring after delay\n"); - reconnect_arm_monitor (h); + GNUNET_break (GNUNET_OK == reconnect_arm_monitor (h)); } @@ -121,118 +108,123 @@ reconnect_arm_monitor_task (void *cls) static void reconnect_arm_monitor_later (struct GNUNET_ARM_MonitorHandle *h) { - if (NULL != h->cth) + if (NULL != h->mq) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); - h->cth = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } + GNUNET_assert (NULL == h->reconnect_task); + h->reconnect_task + = GNUNET_SCHEDULER_add_delayed (h->retry_backoff, + &reconnect_arm_monitor_task, h); + h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); +} - if (NULL != h->monitor) - { - GNUNET_CLIENT_disconnect (h->monitor); - h->monitor = NULL; - } - if (NULL != h->init_timeout_task_id) +/** + * Check notification messages received from ARM is well-formed. + * + * @param cls our `struct GNUNET_ARM_MonitorHandle` + * @param msg the message received from the arm service + * @return #GNUNET_OK if the message is well-formed + */ +static int +check_monitor_notify (void *cls, + const struct GNUNET_ARM_StatusMessage *res) +{ + size_t sl = ntohs (res->header.size) - sizeof (struct GNUNET_ARM_StatusMessage); + const char *name = (const char *) &res[1]; + + if ( (0 == sl) || + ('\0' != name[sl-1]) ) { - GNUNET_SCHEDULER_cancel (h->init_timeout_task_id); - h->init_timeout_task_id = NULL; + GNUNET_break (0); + return GNUNET_SYSERR; } - - GNUNET_assert (NULL == h->reconnect_task); - h->reconnect_task = - GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_arm_monitor_task, h); - - h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); + return GNUNET_OK; } /** - * Init message timed out. Disconnect and try again. + * Handler for notification messages received from ARM. * - * @param cls arm monitor handle + * @param cls our `struct GNUNET_ARM_MonitorHandle` + * @param msg the message received from the arm service */ static void -init_timeout_task (void *cls) +handle_monitor_notify (void *cls, + const struct GNUNET_ARM_StatusMessage *res) { struct GNUNET_ARM_MonitorHandle *h = cls; + enum GNUNET_ARM_ServiceStatus status; + status = (enum GNUNET_ARM_ServiceStatus) ntohl (res->status); LOG (GNUNET_ERROR_TYPE_DEBUG, - "Init message timed out\n"); - h->init_timeout_task_id = NULL; - reconnect_arm_monitor_later (h); + "Received notification from ARM for service `%s' with status %d\n", + (const char *) &res[1], + (int) status); + if (NULL != h->service_status) + h->service_status (h->service_status_cls, + (const char *) &res[1], + status); } /** - * Transmit the monitoring initialization message to the arm service. + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. * - * @param cls closure with the 'struct GNUNET_ARM_MonitorHandle' - * @param size number of bytes available in buf - * @param buf where the callee should write the message - * @return number of bytes written to buf + * @param cls closure with the `struct GNUNET_ARM_MonitorHandle *` + * @param error error code */ -static size_t -transmit_monitoring_init_message (void *cls, size_t size, void *buf) +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) { struct GNUNET_ARM_MonitorHandle *h = cls; - struct GNUNET_MessageHeader *msg; - uint16_t msize; - GNUNET_assert (NULL == h->reconnect_task); - GNUNET_assert (NULL == h->init_timeout_task_id); - h->cth = NULL; - if (NULL == buf) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmission failed, initiating reconnect\n"); - reconnect_arm_monitor_later (h); - return 0; - } - msize = sizeof (struct GNUNET_MessageHeader); - if (size < msize) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Request is too big (%u < %u), not sending it\n", size, msize); - h->cth = GNUNET_CLIENT_notify_transmit_ready (h->monitor, msize, - GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_NO, - transmit_monitoring_init_message, h); - return 0; - } - - msg = buf; - msg->size = htons (msize); - msg->type = htons (GNUNET_MESSAGE_TYPE_ARM_MONITOR); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting ARM monitoring init message with %u bytes to arm.\n", - (unsigned int) msize); - - h->init_timeout_task_id = GNUNET_SCHEDULER_add_delayed ( - INIT_TIMEOUT, init_timeout_task, h); - GNUNET_CLIENT_receive (h->monitor, &monitor_notify_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); - return msize; + reconnect_arm_monitor_later (h); } +/** + * Connect to the ARM service for monitoring. + * + * @param h handle to connect + * @return #GNUNET_OK on success + */ static int reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h) { - GNUNET_assert (NULL == h->monitor); - h->monitor = GNUNET_CLIENT_connect ("arm", h->cfg); - if (NULL == h->monitor) + GNUNET_MQ_hd_var_size (monitor_notify, + GNUNET_MESSAGE_TYPE_ARM_STATUS, + struct GNUNET_ARM_StatusMessage); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_monitor_notify_handler (h), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MessageHeader *msg; + struct GNUNET_MQ_Envelope *env; + + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "arm", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "arm_api, GNUNET_CLIENT_connect returned NULL\n"); if (NULL != h->service_status) - h->service_status (h->cls, NULL, GNUNET_ARM_SERVICE_STOPPED); + h->service_status (h->service_status_cls, + NULL, + GNUNET_ARM_SERVICE_STOPPED); return GNUNET_SYSERR; } - LOG (GNUNET_ERROR_TYPE_DEBUG, - "arm_api, GNUNET_CLIENT_connect returned non-NULL\n"); - h->cth = GNUNET_CLIENT_notify_transmit_ready (h->monitor, - sizeof (struct GNUNET_MessageHeader), GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, &transmit_monitoring_init_message, h); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_ARM_MONITOR); + GNUNET_MQ_send (h->mq, + env); return GNUNET_OK; } @@ -245,22 +237,20 @@ reconnect_arm_monitor (struct GNUNET_ARM_MonitorHandle *h) * the ARM service may internally use a different * configuration to determine how to start the service). * @param cont callback to invoke on status updates - * @param cont_cls closure + * @param cont_cls closure for @a cont * @return context to use for further ARM monitor operations, NULL on error. */ struct GNUNET_ARM_MonitorHandle * GNUNET_ARM_monitor (const struct GNUNET_CONFIGURATION_Handle *cfg, - GNUNET_ARM_ServiceStatusCallback cont, void *cont_cls) + GNUNET_ARM_ServiceStatusCallback cont, + void *cont_cls) { struct GNUNET_ARM_MonitorHandle *h; h = GNUNET_new (struct GNUNET_ARM_MonitorHandle); - h->cfg = GNUNET_CONFIGURATION_dup (cfg); - h->currently_down = GNUNET_YES; - h->reconnect_task = NULL; - h->init_timeout_task_id = NULL; + h->cfg = cfg; h->service_status = cont; - h->cls = cont_cls; + h->service_status_cls = cont_cls; if (GNUNET_OK != reconnect_arm_monitor (h)) { GNUNET_free (h); @@ -278,86 +268,18 @@ GNUNET_ARM_monitor (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_ARM_monitor_disconnect_and_free (struct GNUNET_ARM_MonitorHandle *h) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from ARM service\n"); - if (NULL != h->cth) + if (NULL != h->mq) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); - h->cth = NULL; - } - if (NULL != h->init_timeout_task_id) - { - GNUNET_SCHEDULER_cancel (h->init_timeout_task_id); - h->init_timeout_task_id = NULL; - } - if (NULL != h->monitor) - { - GNUNET_CLIENT_disconnect (h->monitor); - h->monitor = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } if (NULL != h->reconnect_task) { GNUNET_SCHEDULER_cancel (h->reconnect_task); h->reconnect_task = NULL; } - GNUNET_CONFIGURATION_destroy (h->cfg); GNUNET_free (h); } -/** - * Handler for notification messages received from ARM. - * - * @param cls our "struct GNUNET_ARM_MonitorHandle" - * @param msg the message received from the arm service - */ -static void -monitor_notify_handler (void *cls, const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_ARM_MonitorHandle *h = cls; - uint16_t msize; - const struct GNUNET_ARM_StatusMessage *res; - enum GNUNET_ARM_ServiceStatus status; - - if (NULL == msg) - { - LOG (GNUNET_ERROR_TYPE_INFO, - _("Monitoring client was disconnected from arm service, trying to reconnect.\n")); - reconnect_arm_monitor_later (h); - return; - } - msize = ntohs (msg->size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Processing message of type %u and size %u from arm service\n", - ntohs (msg->type), msize); - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_ARM_STATUS: - if (msize <= sizeof (struct GNUNET_ARM_StatusMessage)) - { - GNUNET_break (0); - reconnect_arm_monitor_later (h); - return; - } - if (NULL != h->init_timeout_task_id) - { - GNUNET_SCHEDULER_cancel (h->init_timeout_task_id); - h->init_timeout_task_id = NULL; - } - res = (const struct GNUNET_ARM_StatusMessage *) msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received response from ARM for service `%s': %u\n", - (const char *) &res[1], ntohs (msg->type)); - status = (enum GNUNET_ARM_ServiceStatus) ntohl (res->status); - GNUNET_CLIENT_receive (h->monitor, &monitor_notify_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); - if (NULL != h->service_status) - h->service_status (h->cls, (const char *) &res[1], status); - break; - default: - reconnect_arm_monitor_later (h); - return; - } -} - - /* end of arm_api.c */ diff --git a/src/arm/gnunet-service-arm.c b/src/arm/gnunet-service-arm.c index 0ccffa27ba..8bc6e9e07c 100644 --- a/src/arm/gnunet-service-arm.c +++ b/src/arm/gnunet-service-arm.c @@ -989,7 +989,8 @@ handle_stop (void *cls, * @param message the actual message */ static void -handle_list (void *cls, struct GNUNET_SERVER_Client *client, +handle_list (void *cls, + struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct GNUNET_ARM_ListResultMessage *msg; @@ -1595,7 +1596,8 @@ setup_service (void *cls, * @param client identification of the client */ static void -handle_client_connecting (void *cls, struct GNUNET_SERVER_Client *client) +handle_client_connecting (void *cls, + struct GNUNET_SERVER_Client *client) { /* All clients are considered to be of the "monitor" kind * (that is, they don't affect ARM shutdown). @@ -1615,9 +1617,12 @@ handle_client_connecting (void *cls, struct GNUNET_SERVER_Client *client) * #GNUNET_SYSERR to close it (signal serious error) */ static void -handle_monitor (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *message) +handle_monitor (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) { + /* FIXME: might want to start by letting monitor know about + services that are already running */ /* Removal is handled by the server implementation, internally. */ if ((NULL != client) && (NULL != notifier)) { |