diff options
Diffstat (limited to 'src/arm/arm_api.c')
-rw-r--r-- | src/arm/arm_api.c | 1219 |
1 files changed, 505 insertions, 714 deletions
diff --git a/src/arm/arm_api.c b/src/arm/arm_api.c index a89d423ecd..ed36c61cd5 100644 --- a/src/arm/arm_api.c +++ b/src/arm/arm_api.c @@ -32,151 +32,131 @@ #define LOG(kind,...) GNUNET_log_from (kind, "arm-api",__VA_ARGS__) + /** - * Handle for interacting with ARM. + * Entry in a doubly-linked list of operations awaiting for replies + * (in-order) from the ARM service. */ -struct GNUNET_ARM_Handle +struct GNUNET_ARM_Operation { /** - * Our control connection to the ARM service. - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * The configuration that we are using. - */ - struct GNUNET_CONFIGURATION_Handle *cfg; - - /** - * Handle for our current transmission request. - */ - struct GNUNET_CLIENT_TransmitHandle *cth; - - /** - * Head of doubly-linked list of pending requests. - */ - struct ARMControlMessage *control_pending_head; - - /** - * Tail of doubly-linked list of pending requests. - */ - struct ARMControlMessage *control_pending_tail; - - /** - * Head of doubly-linked list of sent requests. + * This is a doubly-linked list. */ - struct ARMControlMessage *control_sent_head; + struct GNUNET_ARM_Operation *next; /** - * Tail of doubly-linked list of sent requests. + * This is a doubly-linked list. */ - struct ARMControlMessage *control_sent_tail; + struct GNUNET_ARM_Operation *prev; /** - * Callback to invoke on connection/disconnection. + * ARM handle. */ - GNUNET_ARM_ConnectionStatusCallback conn_status; + struct GNUNET_ARM_Handle *h; /** - * Closure for conn_status. + * Callback for service state change requests. */ - void *conn_status_cls; + GNUNET_ARM_ResultCallback result_cont; /** - * ARM control message for the 'arm_termination_handler' - * with the continuation to call once the ARM shutdown is done. + * Callback for service list requests. */ - struct ARMControlMessage *thm; + GNUNET_ARM_ServiceListCallback list_cont; /** - * ID of the reconnect task (if any). + * Closure for @e result_cont or @e list_cont. */ - struct GNUNET_SCHEDULER_Task *reconnect_task; + void *cont_cls; /** - * Current delay we use for re-trying to connect to core. + * Task for async completion. */ - struct GNUNET_TIME_Relative retry_backoff; + struct GNUNET_SCHEDULER_Task *async; /** - * Counter for request identifiers + * Unique ID for the request. */ - uint64_t request_id_counter; + uint64_t id; /** - * Are we currently disconnected and hence unable to send? + * Result of this operation for #notify_starting(). */ - unsigned char currently_down; + enum GNUNET_ARM_Result starting_ret; /** - * #GNUNET_YES if we're running a service test. + * Is this an operation to stop the ARM service? */ - unsigned char service_test_is_active; + int is_arm_stop; }; /** - * Entry in a doubly-linked list of control messages to be transmitted - * to the arm service. - * - * The actual message is allocated at the end of this struct. + * Handle for interacting with ARM. */ -struct ARMControlMessage +struct GNUNET_ARM_Handle { /** - * This is a doubly-linked list. + * Our connection to the ARM service. */ - struct ARMControlMessage *next; + struct GNUNET_MQ_Handle *mq; /** - * This is a doubly-linked list. + * The configuration that we are using. */ - struct ARMControlMessage *prev; + const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * ARM handle. + * Head of doubly-linked list of pending operations. */ - struct GNUNET_ARM_Handle *h; + struct GNUNET_ARM_Operation *operation_pending_head; /** - * Message to send. + * Tail of doubly-linked list of pending operations. */ - struct GNUNET_ARM_Message *msg; + struct GNUNET_ARM_Operation *operation_pending_tail; /** - * Callback for service state change requests. + * Callback to invoke on connection/disconnection. */ - GNUNET_ARM_ResultCallback result_cont; + GNUNET_ARM_ConnectionStatusCallback conn_status; /** - * Callback for service list requests. + * Closure for @e conn_status. */ - GNUNET_ARM_ServiceListCallback list_cont; + void *conn_status_cls; /** - * Closure for @e result_cont or @e list_cont. + * ARM operation where the goal is to wait for ARM shutdown to + * complete. This operation is special in that it waits for an + * error on the @e mq. So we complete it by calling the + * continuation in the #mq_error_handler(). Note that the operation + * is no longer in the @e operation_pending_head DLL once it is + * referenced from this field. */ - void *cont_cls; + struct GNUNET_ARM_Operation *thm; /** - * Timeout for the operation. + * ID of the reconnect task (if any). */ - struct GNUNET_TIME_Absolute timeout; + struct GNUNET_SCHEDULER_Task *reconnect_task; /** - * Task to run when request times out. + * Current delay we use for re-trying to connect to core. */ - struct GNUNET_SCHEDULER_Task *timeout_task_id; + struct GNUNET_TIME_Relative retry_backoff; /** - * Flags for passing std descriptors to ARM (when starting ARM). + * Counter for request identifiers. They are used to match replies + * from ARM to operations in the @e operation_pending_head DLL. */ - enum GNUNET_OS_InheritStdioFlags std_inheritance; + uint64_t request_id_counter; /** - * Type of the request expressed as a message type (start, stop or list). + * Have we detected that ARM is up? */ - uint16_t type; + int currently_up; + }; @@ -191,18 +171,6 @@ reconnect_arm (struct GNUNET_ARM_Handle *h); /** - * Check the list of pending requests, send the next - * one to the arm. - * - * @param h arm handle - * @param ignore_currently_down transmit message even if not initialized? - */ -static void -trigger_next_request (struct GNUNET_ARM_Handle *h, - int ignore_currently_down); - - -/** * Task scheduled to try to re-connect to arm. * * @param cls the `struct GNUNET_ARM_Handle` @@ -213,8 +181,6 @@ reconnect_arm_task (void *cls) struct GNUNET_ARM_Handle *h = cls; h->reconnect_task = NULL; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to ARM service after delay\n"); reconnect_arm (h); } @@ -228,28 +194,33 @@ reconnect_arm_task (void *cls) static void reconnect_arm_later (struct GNUNET_ARM_Handle *h) { - if (GNUNET_NO != h->currently_down) - return; - if (NULL != h->cth) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); - h->cth = NULL; - } - if (NULL != h->client) + struct GNUNET_ARM_Operation *op; + + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } - h->currently_down = GNUNET_YES; + h->currently_up = GNUNET_NO; GNUNET_assert (NULL == h->reconnect_task); h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_backoff, &reconnect_arm_task, h); - /* Don't clear pending messages on disconnection, deliver them later - clear_pending_messages (h, GNUNET_ARM_REQUEST_DISCONNECTED); - GNUNET_assert (NULL == h->control_pending_head); - */ + while (NULL != (op = h->operation_pending_head)) + { + if (NULL != op->result_cont) + op->result_cont (op->cont_cls, + GNUNET_ARM_REQUEST_DISCONNECTED, + 0); + if (NULL != op->list_cont) + op->list_cont (op->cont_cls, + GNUNET_ARM_REQUEST_DISCONNECTED, + 0, + NULL); + GNUNET_ARM_operation_cancel (op); + } + GNUNET_assert (NULL == h->operation_pending_head); h->retry_backoff = GNUNET_TIME_STD_BACKOFF (h->retry_backoff); if (NULL != h->conn_status) h->conn_status (h->conn_status_cls, @@ -264,176 +235,50 @@ reconnect_arm_later (struct GNUNET_ARM_Handle *h) * @param id unique message ID to use for the lookup * @return NULL if not found */ -static struct ARMControlMessage * -find_cm_by_id (struct GNUNET_ARM_Handle *h, +static struct GNUNET_ARM_Operation * +find_op_by_id (struct GNUNET_ARM_Handle *h, uint64_t id) { - struct ARMControlMessage *result; + struct GNUNET_ARM_Operation *result; - for (result = h->control_sent_head; NULL != result; result = result->next) - if (id == result->msg->request_id) + for (result = h->operation_pending_head; NULL != result; result = result->next) + if (id == result->id) return result; return NULL; } /** - * Handler for ARM 'termination' reply (failure to receive). - * - * @param cls our `struct GNUNET_ARM_Handle` - * @param msg expected to be NULL - */ -static void -arm_termination_handler (void *cls, - const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_ARM_Handle *h = cls; - struct ARMControlMessage *cm; - - if (NULL != msg) - { - GNUNET_break (0); - GNUNET_CLIENT_receive (h->client, - &arm_termination_handler, - h, - GNUNET_TIME_UNIT_FOREVER_REL); - return; - } - cm = h->thm; - h->thm = NULL; - h->currently_down = GNUNET_YES; - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; - if (NULL != cm->result_cont) - cm->result_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_SENT_OK, - (const char *) &cm->msg[1], - GNUNET_ARM_RESULT_STOPPED); - GNUNET_free (cm->msg); - GNUNET_free (cm); -} - - -/** * Handler for ARM replies. * * @param cls our `struct GNUNET_ARM_Handle` - * @param msg the message received from the arm service + * @param res the message received from the arm service */ static void -client_notify_handler (void *cls, - const struct GNUNET_MessageHeader *msg) +handle_arm_result (void *cls, + const struct GNUNET_ARM_ResultMessage *res) { struct GNUNET_ARM_Handle *h = cls; - const struct GNUNET_ARM_Message *arm_msg; - const struct GNUNET_ARM_ResultMessage *res; - const struct GNUNET_ARM_ListResultMessage *lres; - struct ARMControlMessage *cm; - const char **list; - const char *pos; + struct GNUNET_ARM_Operation *op; uint64_t id; enum GNUNET_ARM_Result result; - uint16_t size_check; - uint16_t rcount; - uint16_t msize; - unsigned char fail; + GNUNET_ARM_ResultCallback result_cont; + void *result_cont_cls; - list = NULL; - rcount = 0; - if (NULL == msg) - { - LOG (GNUNET_ERROR_TYPE_INFO, - _("Client was disconnected from arm service, trying to reconnect.\n")); - reconnect_arm_later (h); - return; - } - msize = ntohs (msg->size); - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Processing message of type %u and size %u from arm service\n", - ntohs (msg->type), msize); - if (msize < sizeof (struct GNUNET_ARM_Message)) - { - GNUNET_break (0); - reconnect_arm_later (h); - return; - } - arm_msg = (const struct GNUNET_ARM_Message *) msg; - GNUNET_break (0 == ntohl (arm_msg->reserved)); - id = GNUNET_ntohll (arm_msg->request_id); - cm = find_cm_by_id (h, id); - if (NULL == cm) + id = GNUNET_ntohll (res->arm_msg.request_id); + op = find_op_by_id (h, + id); + if (NULL == op) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Message with unknown id %llu\n", - id); - return; - } - fail = GNUNET_NO; - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_ARM_RESULT: - if (msize < sizeof (struct GNUNET_ARM_ResultMessage)) - { - GNUNET_assert (0); - fail = GNUNET_YES; - } - break; - case GNUNET_MESSAGE_TYPE_ARM_LIST_RESULT: - if (msize < sizeof (struct GNUNET_ARM_ListResultMessage)) - { - GNUNET_break (0); - fail = GNUNET_YES; - break; - } - size_check = 0; - lres = (const struct GNUNET_ARM_ListResultMessage *) msg; - rcount = ntohs (lres->count); - { - unsigned int i; - - list = GNUNET_malloc (sizeof (const char *) * rcount); - pos = (const char *)&lres[1]; - for (i = 0; i < rcount; i++) - { - const char *end = memchr (pos, 0, msize - size_check); - if (NULL == end) - { - GNUNET_break (0); - fail = GNUNET_YES; - break; - } - list[i] = pos; - size_check += (end - pos) + 1; - pos = end + 1; - } - if (GNUNET_YES == fail) - { - GNUNET_free (list); - list = NULL; - } - } - break; - default: - fail = GNUNET_YES; - break; - } - GNUNET_assert (NULL != cm->timeout_task_id); - GNUNET_SCHEDULER_cancel (cm->timeout_task_id); - GNUNET_CONTAINER_DLL_remove (h->control_sent_head, - h->control_sent_tail, - cm); - if (GNUNET_YES == fail) - { - reconnect_arm_later (h); - GNUNET_free (cm->msg); - GNUNET_free (cm); + (unsigned long long) id); return; } - if ( (GNUNET_MESSAGE_TYPE_ARM_RESULT == ntohs (msg->type)) && - (0 == strcasecmp ((const char *) &cm->msg[1], - "arm")) && - (NULL != (res = (const struct GNUNET_ARM_ResultMessage *) msg)) && - (GNUNET_ARM_RESULT_STOPPING == ntohl (res->result)) ) + + result = (enum GNUNET_ARM_Result) ntohl (res->result); + if ( (GNUNET_YES == op->is_arm_stop) && + (GNUNET_ARM_RESULT_STOPPING == result) ) { /* special case: if we are stopping 'gnunet-service-arm', we do not just wait for the result message, but also wait for the service to close @@ -443,184 +288,159 @@ client_notify_handler (void *cls, if (NULL != h->thm) { GNUNET_break (0); - cm->result_cont (h->thm->cont_cls, + op->result_cont (h->thm->cont_cls, GNUNET_ARM_REQUEST_SENT_OK, - (const char *) &h->thm->msg[1], GNUNET_ARM_RESULT_IS_NOT_KNOWN); - GNUNET_free (h->thm->msg); GNUNET_free (h->thm); } - h->thm = cm; - GNUNET_CLIENT_receive (h->client, - &arm_termination_handler, - h, - GNUNET_TIME_UNIT_FOREVER_REL); + GNUNET_CONTAINER_DLL_remove (h->operation_pending_head, + h->operation_pending_tail, + op); + h->thm = op; return; } - GNUNET_CLIENT_receive (h->client, - &client_notify_handler, - h, - GNUNET_TIME_UNIT_FOREVER_REL); - switch (ntohs (msg->type)) - { - case GNUNET_MESSAGE_TYPE_ARM_RESULT: - res = (const struct GNUNET_ARM_ResultMessage *) msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received response from ARM for service `%s': %u\n", - (const char *) &cm->msg[1], ntohs (msg->type)); - result = (enum GNUNET_ARM_Result) ntohl (res->result); - if (NULL != cm->result_cont) - cm->result_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_SENT_OK, - (const char *) &cm->msg[1], - result); - break; - case GNUNET_MESSAGE_TYPE_ARM_LIST_RESULT: - if (NULL != cm->list_cont) - cm->list_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_SENT_OK, - rcount, - list); - GNUNET_free_non_null (list); - break; - } - GNUNET_free (cm->msg); - GNUNET_free (cm); + result_cont = op->result_cont; + result_cont_cls = op->cont_cls; + GNUNET_ARM_operation_cancel (op); + if (NULL != result_cont) + result_cont (result_cont_cls, + GNUNET_ARM_REQUEST_SENT_OK, + result); } /** - * Transmit the next message to the arm service. + * Checked that list result message is well-formed. * - * @param cls closure with the `struct GNUNET_ARM_Handle` - * @param size number of bytes available in @a buf - * @param buf where the callee should write the message - * @return number of bytes written to @a buf + * @param cls our `struct GNUNET_ARM_Handle` + * @param lres the message received from the arm service + * @return #GNUNET_OK if message is well-formed */ -static size_t -transmit_arm_message (void *cls, - size_t size, - void *buf) +static int +check_arm_list_result (void *cls, + const struct GNUNET_ARM_ListResultMessage *lres) { - struct GNUNET_ARM_Handle *h = cls; - struct ARMControlMessage *cm; - struct GNUNET_ARM_Message *arm_msg; - uint64_t request_id; - int notify_connection; - uint16_t msize; + const char *pos = (const char *) &lres[1]; + uint16_t rcount = ntohs (lres->count); + uint16_t msize = ntohs (lres->arm_msg.header.size); + uint16_t size_check; - notify_connection = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "transmit_arm_message is running with %p buffer of size %lu. ARM is known to be %s\n", - buf, size, h->currently_down ? "unconnected" : "connected"); - GNUNET_assert (NULL == h->reconnect_task); - h->cth = NULL; - if ((GNUNET_YES == h->currently_down) && (NULL != buf)) - { - h->currently_down = GNUNET_NO; - notify_connection = GNUNET_YES; - h->retry_backoff = GNUNET_TIME_UNIT_MILLISECONDS; - GNUNET_CLIENT_receive (h->client, &client_notify_handler, h, - GNUNET_TIME_UNIT_FOREVER_REL); - } - if (NULL == buf) + size_check = 0; + for (unsigned int i = 0; i < rcount; i++) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmission failed, initiating reconnect\n"); - reconnect_arm_later (h); - return 0; - } - if (NULL == (cm = h->control_pending_head)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queue is empty, not sending anything\n"); - msize = 0; - goto end; - } - GNUNET_assert (NULL != cm->msg); - msize = ntohs (cm->msg->header.size); - if (size < msize) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Request is too big (%u < %u), not sending it\n", size, msize); - trigger_next_request (h, GNUNET_NO); - msize = 0; - goto end; + const char *end = memchr (pos, 0, msize - size_check); + if (NULL == end) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + size_check += (end - pos) + 1; + pos = end + 1; } - arm_msg = cm->msg; - if (0 == h->request_id_counter) - h->request_id_counter++; - request_id = h->request_id_counter++; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Transmitting control message with %u bytes of type %u to arm with id %llu\n", - (unsigned int) msize, - (unsigned int) ntohs (cm->msg->header.type), - request_id); - arm_msg->reserved = htonl (0); - arm_msg->request_id = GNUNET_htonll (request_id); - memcpy (buf, cm->msg, msize); - /* Otherwise we won't be able to find it later! */ - arm_msg->request_id = request_id; - GNUNET_CONTAINER_DLL_remove (h->control_pending_head, - h->control_pending_tail, - cm); - GNUNET_CONTAINER_DLL_insert_tail (h->control_sent_head, - h->control_sent_tail, - cm); - /* Don't free msg, keep it around (kind of wasteful, but then we don't - * really have many messages to handle, and it'll be freed when it times - * out anyway. - */ - trigger_next_request (h, GNUNET_NO); - - end: - if ((GNUNET_YES == notify_connection) && (NULL != h->conn_status)) - h->conn_status (h->conn_status_cls, GNUNET_YES); - return msize; + return GNUNET_OK; } /** - * Check the list of pending requests, send the next - * one to the arm. + * Handler for ARM list replies. * - * @param h arm handle - * @param ignore_currently_down transmit message even if not initialized? + * @param cls our `struct GNUNET_ARM_Handle` + * @param lres the message received from the arm service */ static void -trigger_next_request (struct GNUNET_ARM_Handle *h, - int ignore_currently_down) +handle_arm_list_result (void *cls, + const struct GNUNET_ARM_ListResultMessage *lres) { - uint16_t msize; + struct GNUNET_ARM_Handle *h = cls; + uint16_t rcount = ntohs (lres->count); + const char *list[rcount]; + const char *pos = (const char *) &lres[1]; + uint16_t msize = ntohs (lres->arm_msg.header.size); + struct GNUNET_ARM_Operation *op; + uint16_t size_check; + uint64_t id; - msize = sizeof (struct GNUNET_MessageHeader); - if ((GNUNET_YES == h->currently_down) && (ignore_currently_down == GNUNET_NO)) + id = GNUNET_ntohll (lres->arm_msg.request_id); + op = find_op_by_id (h, + id); + if (NULL == op) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "ARM connection down, not processing queue\n"); + "Message with unknown id %llu\n", + (unsigned long long) id); return; } - if (NULL != h->cth) + size_check = 0; + for (unsigned int i = 0; i < rcount; i++) + { + const char *end = memchr (pos, + 0, + msize - size_check); + + /* Assert, as this was already checked in #check_arm_list_result() */ + GNUNET_assert (NULL != end); + list[i] = pos; + size_check += (end - pos) + 1; + pos = end + 1; + } + if (NULL != op->list_cont) + op->list_cont (op->cont_cls, + GNUNET_ARM_REQUEST_SENT_OK, + rcount, + list); + GNUNET_ARM_operation_cancel (op); +} + + +/** + * Receive confirmation from test, ARM service is up. + * + * @param cls closure with the `struct GNUNET_ARM_Handle` + * @param msg message received + */ +static void +handle_confirm (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_ARM_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Got confirmation from ARM that we are up!\n"); + if (GNUNET_NO == h->currently_up) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Request pending, not processing queue\n"); - return; + h->currently_up = GNUNET_YES; + if (NULL != h->conn_status) + h->conn_status (h->conn_status_cls, + GNUNET_YES); } - if (NULL != h->control_pending_head) - msize = - ntohs (h->control_pending_head->msg->header.size); - else if (GNUNET_NO == ignore_currently_down) +} + + +/** + * Generic error handler, called with the appropriate error code and + * the same closure specified at the creation of the message queue. + * Not every message queue implementation supports an error handler. + * + * @param cls closure with the `struct GNUNET_ARM_Handle *` + * @param error error code + */ +static void +mq_error_handler (void *cls, + enum GNUNET_MQ_Error error) +{ + struct GNUNET_ARM_Handle *h = cls; + struct GNUNET_ARM_Operation *op; + + h->currently_up = GNUNET_NO; + if (NULL != (op = h->thm)) { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Request queue empty, not processing queue\n"); - return; /* no pending message */ + h->thm = NULL; + op->result_cont (op->cont_cls, + GNUNET_ARM_REQUEST_SENT_OK, + GNUNET_ARM_RESULT_STOPPED); + GNUNET_free (op); } - h->cth = - GNUNET_CLIENT_notify_transmit_ready (h->client, - msize, - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &transmit_arm_message, h); + reconnect_arm_later (h); } @@ -633,22 +453,47 @@ trigger_next_request (struct GNUNET_ARM_Handle *h, static int reconnect_arm (struct GNUNET_ARM_Handle *h) { - GNUNET_assert (NULL == h->client); - GNUNET_assert (GNUNET_YES == h->currently_down); - h->client = GNUNET_CLIENT_connect ("arm", h->cfg); - if (NULL == h->client) + GNUNET_MQ_hd_fixed_size (arm_result, + GNUNET_MESSAGE_TYPE_ARM_RESULT, + struct GNUNET_ARM_ResultMessage); + GNUNET_MQ_hd_var_size (arm_list_result, + GNUNET_MESSAGE_TYPE_ARM_LIST_RESULT, + struct GNUNET_ARM_ListResultMessage); + GNUNET_MQ_hd_fixed_size (confirm, + GNUNET_MESSAGE_TYPE_TEST, + struct GNUNET_MessageHeader); + struct GNUNET_MQ_MessageHandler handlers[] = { + make_arm_result_handler (h), + make_arm_list_result_handler (h), + make_confirm_handler (h), + GNUNET_MQ_handler_end () + }; + struct GNUNET_MessageHeader *test; + struct GNUNET_MQ_Envelope *env; + + if (NULL != h->mq) + return GNUNET_OK; + GNUNET_assert (GNUNET_NO == h->currently_up); + h->mq = GNUNET_CLIENT_connecT (h->cfg, + "arm", + handlers, + &mq_error_handler, + h); + if (NULL == h->mq) { LOG (GNUNET_ERROR_TYPE_DEBUG, - "arm_api, GNUNET_CLIENT_connect returned NULL\n"); + "GNUNET_CLIENT_connect returned NULL\n"); if (NULL != h->conn_status) h->conn_status (h->conn_status_cls, GNUNET_SYSERR); return GNUNET_SYSERR; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "arm_api, GNUNET_CLIENT_connect returned non-NULL\n"); - trigger_next_request (h, - GNUNET_YES); + "Sending TEST message to ARM\n"); + env = GNUNET_MQ_msg (test, + GNUNET_MESSAGE_TYPE_TEST); + GNUNET_MQ_send (h->mq, + env); return GNUNET_OK; } @@ -661,22 +506,20 @@ reconnect_arm (struct GNUNET_ARM_Handle *h) * the ARM service may internally use a different * configuration to determine how to start the service). * @param conn_status will be called when connecting/disconnecting - * @param cls closure for conn_status + * @param conn_status_cls closure for @a conn_status * @return context to use for further ARM operations, NULL on error. */ struct GNUNET_ARM_Handle * GNUNET_ARM_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, GNUNET_ARM_ConnectionStatusCallback conn_status, - void *cls) + void *conn_status_cls) { struct GNUNET_ARM_Handle *h; h = GNUNET_new (struct GNUNET_ARM_Handle); - h->cfg = GNUNET_CONFIGURATION_dup (cfg); - h->currently_down = GNUNET_YES; - h->reconnect_task = NULL; + h->cfg = cfg; h->conn_status = conn_status; - h->conn_status_cls = cls; + h->conn_status_cls = conn_status_cls; if (GNUNET_OK != reconnect_arm (h)) { GNUNET_free (h); @@ -692,113 +535,60 @@ GNUNET_ARM_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param h the handle that was being used */ void -GNUNET_ARM_disconnect_and_free (struct GNUNET_ARM_Handle *h) +GNUNET_ARM_disconnect (struct GNUNET_ARM_Handle *h) { - struct ARMControlMessage *cm; + struct GNUNET_ARM_Operation *op; LOG (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from ARM service\n"); - if (NULL != h->cth) + while (NULL != (op = h->operation_pending_head)) { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); - h->cth = NULL; - } - while ((NULL != (cm = h->control_pending_head)) - || (NULL != (cm = h->control_sent_head)) ) - { - if (NULL != h->control_pending_head) - GNUNET_CONTAINER_DLL_remove (h->control_pending_head, - h->control_pending_tail, - cm); - else - GNUNET_CONTAINER_DLL_remove (h->control_sent_head, - h->control_sent_tail, - cm); - GNUNET_assert (NULL != cm->timeout_task_id); - GNUNET_SCHEDULER_cancel (cm->timeout_task_id); - if (NULL != cm->result_cont) - cm->result_cont (cm->cont_cls, + GNUNET_CONTAINER_DLL_remove (h->operation_pending_head, + h->operation_pending_tail, + op); + if (NULL != op->result_cont) + op->result_cont (op->cont_cls, GNUNET_ARM_REQUEST_DISCONNECTED, - NULL, 0); - /* FIXME: What about list callback? */ - GNUNET_free_non_null (cm->msg); - GNUNET_free (cm); + if (NULL != op->list_cont) + op->list_cont (op->cont_cls, + GNUNET_ARM_REQUEST_DISCONNECTED, + 0, + NULL); + if (NULL != op->async) + { + GNUNET_SCHEDULER_cancel (op->async); + op->async = NULL; + } + GNUNET_free (op); } - if (NULL != h->client) + if (NULL != h->mq) { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; } if (NULL != h->reconnect_task) { GNUNET_SCHEDULER_cancel (h->reconnect_task); h->reconnect_task = NULL; } - if (GNUNET_NO == h->service_test_is_active) - { - GNUNET_CONFIGURATION_destroy (h->cfg); - GNUNET_free (h); - } -} - - -/** - * Message timed out. Remove it from the queue. - * - * @param cls the message (struct ARMControlMessage *) - */ -static void -control_message_timeout (void *cls) -{ - struct ARMControlMessage *cm = cls; - struct GNUNET_ARM_Message *arm_msg; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Control message timed out\n"); - arm_msg = cm->msg; - if ((NULL == arm_msg) || (0 == arm_msg->request_id)) - { - GNUNET_CONTAINER_DLL_remove (cm->h->control_pending_head, - cm->h->control_pending_tail, - cm); - } - else - { - GNUNET_CONTAINER_DLL_remove (cm->h->control_sent_head, - cm->h->control_sent_tail, - cm); - } - if (NULL != cm->result_cont) - cm->result_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_TIMEOUT, - NULL, 0); - else if (NULL != cm->list_cont) - cm->list_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_TIMEOUT, - 0, NULL); - GNUNET_free_non_null (cm->msg); - GNUNET_free (cm); + GNUNET_free (h); } /** * A client specifically requested starting of ARM itself. - * This function is called with information about whether - * or not ARM is running; if it is, report success. If - * it is not, start the ARM process. + * Starts the ARM service. * - * @param cls the context for the request that we will report on (struct ARMControlMessage *) - * @param result #GNUNET_YES if ARM is running + * @param h the handle with configuration details + * @param std_inheritance inheritance of std streams + * @return operation status code */ -static void -arm_service_report (void *cls, - int result) +static enum GNUNET_ARM_Result +start_arm_service (struct GNUNET_ARM_Handle *h, + enum GNUNET_OS_InheritStdioFlags std_inheritance) { - struct ARMControlMessage *cm = cls; - struct GNUNET_ARM_Handle *h; struct GNUNET_OS_Process *proc; - unsigned char test_is_active; char *cbinary; char *binary; char *quotedbinary; @@ -806,51 +596,20 @@ arm_service_report (void *cls, char *loprefix; char *lopostfix; - test_is_active = cm->h->service_test_is_active; - if ((GNUNET_YES == test_is_active) && - (GNUNET_YES == result)) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Looks like `%s' is already running.\n", - "gnunet-service-arm"); - /* arm is running! */ - if (cm->result_cont) - cm->result_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_SENT_OK, "arm", - GNUNET_ARM_RESULT_IS_STARTED_ALREADY); - } - if (GNUNET_NO == test_is_active) - { - /* User disconnected & destroyed ARM handle in the middle of - * the service test, so we kept the handle around until now. - */ - GNUNET_CONFIGURATION_destroy (cm->h->cfg); - GNUNET_free (cm->h); - } - if ((GNUNET_YES == result) || - (GNUNET_NO == test_is_active)) - { - GNUNET_free (cm); - return; - } - cm->h->service_test_is_active = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Looks like `%s' is not running, will start it.\n", - "gnunet-service-arm"); if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cm->h->cfg, + GNUNET_CONFIGURATION_get_value_string (h->cfg, "arm", "PREFIX", &loprefix)) loprefix = GNUNET_strdup (""); if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cm->h->cfg, + GNUNET_CONFIGURATION_get_value_string (h->cfg, "arm", "OPTIONS", &lopostfix)) lopostfix = GNUNET_strdup (""); if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cm->h->cfg, + GNUNET_CONFIGURATION_get_value_string (h->cfg, "arm", "BINARY", &cbinary)) @@ -858,18 +617,14 @@ arm_service_report (void *cls, GNUNET_log_config_missing (GNUNET_ERROR_TYPE_WARNING, "arm", "BINARY"); - if (cm->result_cont) - cm->result_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_SENT_OK, "arm", - GNUNET_ARM_RESULT_IS_NOT_KNOWN); - GNUNET_free (cm); GNUNET_free (loprefix); GNUNET_free (lopostfix); - return; + return GNUNET_ARM_RESULT_IS_NOT_KNOWN; } if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_filename (cm->h->cfg, - "arm", "CONFIG", + GNUNET_CONFIGURATION_get_value_filename (h->cfg, + "arm", + "CONFIG", &config)) config = NULL; binary = GNUNET_OS_get_libexec_binary_path (cbinary); @@ -878,15 +633,15 @@ arm_service_report (void *cls, binary); GNUNET_free (cbinary); if ( (GNUNET_YES == - GNUNET_CONFIGURATION_have_value (cm->h->cfg, + GNUNET_CONFIGURATION_have_value (h->cfg, "TESTING", "WEAKRANDOM")) && (GNUNET_YES == - GNUNET_CONFIGURATION_get_value_yesno (cm->h->cfg, + GNUNET_CONFIGURATION_get_value_yesno (h->cfg, "TESTING", "WEAKRANDOM")) && (GNUNET_NO == - GNUNET_CONFIGURATION_have_value (cm->h->cfg, + GNUNET_CONFIGURATION_have_value (h->cfg, "TESTING", "HOSTFILE"))) { @@ -894,39 +649,43 @@ arm_service_report (void *cls, /* we're clearly running a test, don't daemonize */ if (NULL == config) proc = GNUNET_OS_start_process_s (GNUNET_NO, - cm->std_inheritance, + std_inheritance, NULL, loprefix, quotedbinary, /* no daemonization! */ - lopostfix, NULL); + lopostfix, + NULL); else proc = GNUNET_OS_start_process_s (GNUNET_NO, - cm->std_inheritance, + std_inheritance, NULL, loprefix, quotedbinary, "-c", config, /* no daemonization! */ - lopostfix, NULL); + lopostfix, + NULL); } else { if (NULL == config) proc = GNUNET_OS_start_process_s (GNUNET_NO, - cm->std_inheritance, + std_inheritance, NULL, loprefix, quotedbinary, - "-d", lopostfix, NULL); + "-d", /* do daemonize */ + lopostfix, NULL); else proc = GNUNET_OS_start_process_s (GNUNET_NO, - cm->std_inheritance, + std_inheritance, NULL, loprefix, quotedbinary, "-c", config, - "-d", lopostfix, + "-d", /* do daemonize */ + lopostfix, NULL); } GNUNET_free (binary); @@ -935,22 +694,32 @@ arm_service_report (void *cls, GNUNET_free (loprefix); GNUNET_free (lopostfix); if (NULL == proc) + return GNUNET_ARM_RESULT_START_FAILED; + GNUNET_OS_process_destroy (proc); + return GNUNET_ARM_RESULT_STARTING; +} + + +/** + * Abort an operation. Only prevents the callback from being + * called, the operation may still complete. + * + * @param op operation to cancel + */ +void +GNUNET_ARM_operation_cancel (struct GNUNET_ARM_Operation *op) +{ + struct GNUNET_ARM_Handle *h = op->h; + + if (h->thm == op) { - if (cm->result_cont) - cm->result_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_SENT_OK, "arm", - GNUNET_ARM_RESULT_START_FAILED); - GNUNET_free (cm); + op->result_cont = NULL; return; } - if (cm->result_cont) - cm->result_cont (cm->cont_cls, - GNUNET_ARM_REQUEST_SENT_OK, "arm", - GNUNET_ARM_RESULT_STARTING); - GNUNET_OS_process_destroy (proc); - h = cm->h; - GNUNET_free (cm); - reconnect_arm (h); + GNUNET_CONTAINER_DLL_remove (h->operation_pending_head, + h->operation_pending_tail, + op); + GNUNET_free (op); } @@ -959,21 +728,21 @@ arm_service_report (void *cls, * * @param h handle to ARM * @param service_name name of the service - * @param timeout how long to wait before failing for good * @param cb callback to invoke when service is ready * @param cb_cls closure for @a cb * @param type type of the request + * @return handle to queue, NULL on error */ -static void +static struct GNUNET_ARM_Operation * change_service (struct GNUNET_ARM_Handle *h, const char *service_name, - struct GNUNET_TIME_Relative timeout, GNUNET_ARM_ResultCallback cb, void *cb_cls, uint16_t type) { - struct ARMControlMessage *cm; + struct GNUNET_ARM_Operation *op; size_t slen; + struct GNUNET_MQ_Envelope *env; struct GNUNET_ARM_Message *msg; slen = strlen (service_name) + 1; @@ -981,38 +750,81 @@ change_service (struct GNUNET_ARM_Handle *h, GNUNET_SERVER_MAX_MESSAGE_SIZE) { GNUNET_break (0); - if (cb != NULL) - cb (cb_cls, GNUNET_ARM_REQUEST_TOO_LONG, NULL, 0); - return; + return NULL; } - LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting %s of service `%s'.\n", - (GNUNET_MESSAGE_TYPE_ARM_START == type) ? "start" : "termination", - service_name); - cm = GNUNET_malloc (sizeof (struct ARMControlMessage) + slen); - cm->h = h; - cm->result_cont = cb; - cm->cont_cls = cb_cls; - cm->timeout = GNUNET_TIME_relative_to_absolute (timeout); - memcpy (&cm[1], service_name, slen); - msg = GNUNET_malloc (sizeof (struct GNUNET_ARM_Message) + slen); - msg->header.size = htons (sizeof (struct GNUNET_ARM_Message) + slen); - msg->header.type = htons (type); + if (0 == h->request_id_counter) + h->request_id_counter++; + op = GNUNET_new (struct GNUNET_ARM_Operation); + op->h = h; + op->result_cont = cb; + op->cont_cls = cb_cls; + op->id = h->request_id_counter++; + GNUNET_CONTAINER_DLL_insert_tail (h->operation_pending_head, + h->operation_pending_tail, + op); + env = GNUNET_MQ_msg_extra (msg, + slen, + type); msg->reserved = htonl (0); - memcpy (&msg[1], service_name, slen); - cm->msg = msg; + msg->request_id = GNUNET_htonll (op->id); + memcpy (&msg[1], + service_name, + slen); + GNUNET_MQ_send (h->mq, + env); + return op; +} + + +/** + * Task run to notify application that ARM is already up. + * + * @param cls the operation that asked ARM to be started + */ +static void +notify_running (void *cls) +{ + struct GNUNET_ARM_Operation *op = cls; + struct GNUNET_ARM_Handle *h = op->h; + + op->async = NULL; + GNUNET_CONTAINER_DLL_remove (h->operation_pending_head, + h->operation_pending_tail, + op); + if (NULL != op->result_cont) + op->result_cont (op->cont_cls, + GNUNET_ARM_REQUEST_SENT_OK, + GNUNET_ARM_RESULT_IS_STARTED_ALREADY); + if ( (GNUNET_YES == h->currently_up) && + (NULL != h->conn_status) ) + h->conn_status (h->conn_status_cls, + GNUNET_YES); + GNUNET_free (op); +} + + +/** + * Task run to notify application that ARM is being started. + * + * @param cls the operation that asked ARM to be started + */ +static void +notify_starting (void *cls) +{ + struct GNUNET_ARM_Operation *op = cls; + struct GNUNET_ARM_Handle *h = op->h; + + op->async = NULL; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Inserting a control message into the queue. Timeout is %s\n", - GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (cm->timeout), - GNUNET_NO)); - GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, - h->control_pending_tail, - cm); - cm->timeout_task_id = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining - (cm->timeout), - &control_message_timeout, - cm); - trigger_next_request (h, GNUNET_NO); + "Notifying client that we started the ARM service\n"); + GNUNET_CONTAINER_DLL_remove (h->operation_pending_head, + h->operation_pending_tail, + op); + if (NULL != op->result_cont) + op->result_cont (op->cont_cls, + GNUNET_ARM_REQUEST_SENT_OK, + op->starting_ret); + GNUNET_free (op); } @@ -1022,132 +834,116 @@ change_service (struct GNUNET_ARM_Handle *h, * @param h handle to ARM * @param service_name name of the service * @param std_inheritance inheritance of std streams - * @param timeout how long to wait before failing for good * @param cont callback to invoke after request is sent or not sent * @param cont_cls closure for @a cont + * @return handle for the operation, NULL on error */ -void +struct GNUNET_ARM_Operation * GNUNET_ARM_request_service_start (struct GNUNET_ARM_Handle *h, const char *service_name, enum GNUNET_OS_InheritStdioFlags std_inheritance, - struct GNUNET_TIME_Relative timeout, GNUNET_ARM_ResultCallback cont, void *cont_cls) { - struct ARMControlMessage *cm; - size_t slen; + struct GNUNET_ARM_Operation *op; + enum GNUNET_ARM_Result ret; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Asked to start service `%s' within %s\n", service_name, - GNUNET_STRINGS_relative_time_to_string (timeout, GNUNET_NO)); - if (0 == strcasecmp ("arm", service_name)) + "Starting service `%s'\n", + service_name); + if (0 != strcasecmp ("arm", + service_name)) + return change_service (h, + service_name, + cont, + cont_cls, + GNUNET_MESSAGE_TYPE_ARM_START); + + /* Possible cases: + * 1) We're connected to ARM already. Invoke the callback immediately. + * 2) We're not connected to ARM. + * Cancel any reconnection attempts temporarily, then perform + * a service test. + */ + if (GNUNET_YES == h->currently_up) { - /* Possible cases: - * 1) We're connected to ARM already. Invoke the callback immediately. - * 2) We're not connected to ARM. - * Cancel any reconnection attempts temporarily, then perform - * a service test. - */ - if (GNUNET_NO == h->currently_down) - { - LOG (GNUNET_ERROR_TYPE_DEBUG, - "ARM is already running\n"); - if (NULL != cont) - cont (cont_cls, - GNUNET_ARM_REQUEST_SENT_OK, - "arm", - GNUNET_ARM_RESULT_IS_STARTED_ALREADY); - } - else if (GNUNET_NO == h->service_test_is_active) - { - if (NULL != h->cth) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (h->cth); - h->cth = NULL; - } - if (NULL != h->client) - { - GNUNET_CLIENT_disconnect (h->client); - h->client = NULL; - } - if (NULL != h->reconnect_task) - { - GNUNET_SCHEDULER_cancel (h->reconnect_task); - h->reconnect_task = NULL; - } - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Not connected to ARM, will do a service test\n"); - - slen = strlen ("arm") + 1; - cm = GNUNET_malloc (sizeof (struct ARMControlMessage) + slen); - cm->h = h; - cm->result_cont = cont; - cm->cont_cls = cont_cls; - cm->timeout = GNUNET_TIME_relative_to_absolute (timeout); - cm->std_inheritance = std_inheritance; - memcpy (&cm[1], service_name, slen); - h->service_test_is_active = GNUNET_YES; - GNUNET_CLIENT_service_test ("arm", - h->cfg, - timeout, - &arm_service_report, - cm); - } - else - { - /* Service test is already running - tell user to chill out and try - * again later. - */ - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Service test is already in progress, we're busy\n"); - if (NULL != cont) - cont (cont_cls, - GNUNET_ARM_REQUEST_BUSY, - NULL, 0); - } - return; - } - change_service (h, - service_name, - timeout, - cont, cont_cls, - GNUNET_MESSAGE_TYPE_ARM_START); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "ARM is already running\n"); + op = GNUNET_new (struct GNUNET_ARM_Operation); + op->h = h; + op->result_cont = cont; + op->cont_cls = cont_cls; + GNUNET_CONTAINER_DLL_insert_tail (h->operation_pending_head, + h->operation_pending_tail, + op); + op->async = GNUNET_SCHEDULER_add_now (¬ify_running, + op); + return op; + } + /* This is an inherently uncertain choice, as it is of course + theoretically possible that ARM is up and we just did not + yet complete the MQ handshake. However, given that users + are unlikely to hammer 'gnunet-arm -s' on a busy system, + the above check should catch 99.99% of the cases where ARM + is already running. */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Starting ARM service\n"); + ret = start_arm_service (h, + std_inheritance); + if (GNUNET_ARM_RESULT_STARTING == ret) + reconnect_arm (h); + op = GNUNET_new (struct GNUNET_ARM_Operation); + op->h = h; + op->result_cont = cont; + op->cont_cls = cont_cls; + GNUNET_CONTAINER_DLL_insert_tail (h->operation_pending_head, + h->operation_pending_tail, + op); + op->starting_ret = ret; + op->async = GNUNET_SCHEDULER_add_now (¬ify_starting, + op); + return op; } /** - * Request a service to be stopped. - * Stopping arm itself will not invalidate its handle, and - * ARM API will try to restore connection to the ARM service, - * even if ARM connection was lost because you asked for ARM to be stopped. - * Call #GNUNET_ARM_disconnect_and_free() to free the handle and prevent + * Request a service to be stopped. Stopping arm itself will not + * invalidate its handle, and ARM API will try to restore connection + * to the ARM service, even if ARM connection was lost because you + * asked for ARM to be stopped. Call + * #GNUNET_ARM_disconnect() to free the handle and prevent * further connection attempts. * * @param h handle to ARM * @param service_name name of the service - * @param timeout how long to wait before failing for good * @param cont callback to invoke after request is sent or is not sent * @param cont_cls closure for @a cont + * @return handle for the operation, NULL on error */ -void +struct GNUNET_ARM_Operation * GNUNET_ARM_request_service_stop (struct GNUNET_ARM_Handle *h, const char *service_name, - struct GNUNET_TIME_Relative timeout, GNUNET_ARM_ResultCallback cont, void *cont_cls) { + struct GNUNET_ARM_Operation *op; + LOG (GNUNET_ERROR_TYPE_DEBUG, - "Stopping service `%s' within %s\n", - service_name, - GNUNET_STRINGS_relative_time_to_string (timeout, - GNUNET_NO)); - change_service (h, - service_name, - timeout, - cont, - cont_cls, - GNUNET_MESSAGE_TYPE_ARM_STOP); + "Stopping service `%s'\n", + service_name); + op = change_service (h, + service_name, + cont, + cont_cls, + GNUNET_MESSAGE_TYPE_ARM_STOP); + if (NULL == op) + return NULL; + /* If the service is ARM, set a flag as we will use MQ errors + to detect that the process is really gone. */ + if (0 == strcasecmp (service_name, + "arm")) + op->is_arm_stop = GNUNET_YES; + return op; } @@ -1155,43 +951,38 @@ GNUNET_ARM_request_service_stop (struct GNUNET_ARM_Handle *h, * Request a list of running services. * * @param h handle to ARM - * @param timeout how long to wait before failing for good * @param cont callback to invoke after request is sent or is not sent * @param cont_cls closure for @a cont + * @return handle for the operation, NULL on error */ -void +struct GNUNET_ARM_Operation * GNUNET_ARM_request_service_list (struct GNUNET_ARM_Handle *h, - struct GNUNET_TIME_Relative timeout, GNUNET_ARM_ServiceListCallback cont, void *cont_cls) { - struct ARMControlMessage *cm; + struct GNUNET_ARM_Operation *op; + struct GNUNET_MQ_Envelope *env; struct GNUNET_ARM_Message *msg; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Requesting LIST from ARM service with timeout: %s\n", - GNUNET_STRINGS_relative_time_to_string (timeout, - GNUNET_YES)); - cm = GNUNET_new (struct ARMControlMessage); - cm->h = h; - cm->list_cont = cont; - cm->cont_cls = cont_cls; - cm->timeout = GNUNET_TIME_relative_to_absolute (timeout); - msg = GNUNET_malloc (sizeof (struct GNUNET_ARM_Message)); - msg->header.size = htons (sizeof (struct GNUNET_ARM_Message)); - msg->header.type = htons (GNUNET_MESSAGE_TYPE_ARM_LIST); + "Requesting LIST from ARM service\n"); + if (0 == h->request_id_counter) + h->request_id_counter++; + op = GNUNET_new (struct GNUNET_ARM_Operation); + op->h = h; + op->list_cont = cont; + op->cont_cls = cont_cls; + op->id = h->request_id_counter++; + GNUNET_CONTAINER_DLL_insert_tail (h->operation_pending_head, + h->operation_pending_tail, + op); + env = GNUNET_MQ_msg (msg, + GNUNET_MESSAGE_TYPE_ARM_LIST); msg->reserved = htonl (0); - cm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (h->control_pending_head, - h->control_pending_tail, - cm); - cm->timeout_task_id = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining - (cm->timeout), - &control_message_timeout, - cm); - trigger_next_request (h, - GNUNET_NO); + msg->request_id = GNUNET_htonll (op->id); + GNUNET_MQ_send (h->mq, + env); + return op; } |