diff options
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 1295 |
1 files changed, 221 insertions, 1074 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 7ec9d21b72..bfb6f43fb2 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -37,29 +37,11 @@ #include "gnunet_env_lib.h" #include "gnunet_multicast_service.h" #include "gnunet_psyc_service.h" +#include "gnunet_psyc_util_lib.h" #include "psyc.h" #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) -struct MessageQueue -{ - struct MessageQueue *prev; - struct MessageQueue *next; - /* Followed by struct GNUNET_MessageHeader msg */ -}; - - -/** - * Handle for a pending PSYC transmission operation. - */ -struct GNUNET_PSYC_ChannelTransmitHandle -{ - struct GNUNET_PSYC_Channel *ch; - GNUNET_PSYC_TransmitNotifyModifier notify_mod; - GNUNET_PSYC_TransmitNotifyData notify_data; - void *notify_cls; - enum MessageState state; -}; /** * Handle to access PSYC channel operations for both the master and slaves. @@ -67,109 +49,29 @@ struct GNUNET_PSYC_ChannelTransmitHandle struct GNUNET_PSYC_Channel { /** - * Transmission handle; - */ - struct GNUNET_PSYC_ChannelTransmitHandle tmit; - - /** * Configuration to use. */ const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Currently pending transmission request, or NULL for none. + * Client connection to the service. */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_CLIENT_MANAGER_Connection *client; /** - * Head of messages to transmit to the service. - */ - struct MessageQueue *tmit_head; - - /** - * Tail of operations to transmit to the service. + * Transmission handle; */ - struct MessageQueue *tmit_tail; + struct GNUNET_PSYC_TransmitHandle *tmit; /** - * Message currently being transmitted to the service. + * Receipt handle; */ - struct MessageQueue *tmit_msg; + struct GNUNET_PSYC_ReceiveHandle *recv; /** * Message to send on reconnect. */ - struct GNUNET_MessageHeader *reconnect_msg; - - /** - * Task doing exponential back-off trying to reconnect. - */ - GNUNET_SCHEDULER_TaskIdentifier reconnect_task; - - /** - * Time for next connect retry. - */ - struct GNUNET_TIME_Relative reconnect_delay; - - /** - * Message part callback. - */ - GNUNET_PSYC_MessageCallback message_cb; - - /** - * Message part callback for historic message. - */ - GNUNET_PSYC_MessageCallback hist_message_cb; - - /** - * Closure for @a message_cb. - */ - void *cb_cls; - - /** - * ID of the message being received from the PSYC service. - */ - uint64_t recv_message_id; - - /** - * Public key of the slave from which a message is being received. - */ - struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key; - - /** - * State of the currently being received message from the PSYC service. - */ - enum MessageState recv_state; - - /** - * Flags for the currently being received message from the PSYC service. - */ - enum GNUNET_PSYC_MessageFlags recv_flags; - - /** - * Expected value size for the modifier being received from the PSYC service. - */ - uint32_t recv_mod_value_size_expected; - - /** - * Actual value size for the modifier being received from the PSYC service. - */ - uint32_t recv_mod_value_size; - - /** - * Is transmission paused? - */ - uint8_t tmit_paused; - - /** - * Are we still waiting for a PSYC_TRANSMIT_ACK? - */ - uint8_t tmit_ack_pending; + struct GNUNET_MessageHeader *connect_msg; /** * Are we polling for incoming messages right now? @@ -177,11 +79,6 @@ struct GNUNET_PSYC_Channel uint8_t in_receive; /** - * Are we currently transmitting a message? - */ - uint8_t in_transmit; - - /** * Is this a master or slave channel? */ uint8_t is_master; @@ -193,7 +90,7 @@ struct GNUNET_PSYC_Channel */ struct GNUNET_PSYC_Master { - struct GNUNET_PSYC_Channel ch; + struct GNUNET_PSYC_Channel chn; GNUNET_PSYC_MasterStartCallback start_cb; @@ -201,6 +98,11 @@ struct GNUNET_PSYC_Master * Join request callback. */ GNUNET_PSYC_JoinRequestCallback join_req_cb; + + /** + * Closure for the callbacks. + */ + void *cb_cls; }; @@ -209,11 +111,16 @@ struct GNUNET_PSYC_Master */ struct GNUNET_PSYC_Slave { - struct GNUNET_PSYC_Channel ch; + struct GNUNET_PSYC_Channel chn; GNUNET_PSYC_SlaveConnectCallback connect_cb; GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; + + /** + * Closure for the callbacks. + */ + void *cb_cls; }; @@ -258,934 +165,170 @@ struct GNUNET_PSYC_StateQuery static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - - -static void -channel_transmit_data (struct GNUNET_PSYC_Channel *ch); - - -/** - * Reschedule a connect attempt to the service. - * - * @param ch Channel to reconnect. - */ -static void -reschedule_connect (struct GNUNET_PSYC_Channel *ch) +channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn) { - GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK); - - if (NULL != ch->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); - ch->th = NULL; - } - if (NULL != ch->client) - { - GNUNET_CLIENT_disconnect (ch->client); - ch->client = NULL; - } - ch->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to PSYC service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES)); - ch->reconnect_task = - GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch); - ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay); + uint16_t cmsg_size = ntohs (chn->connect_msg->size); + struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); + memcpy (cmsg, chn->connect_msg, cmsg_size); + GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg); } -/** - * Schedule transmission of the next message from our queue. - * - * @param ch PSYC channel handle - */ -static void -transmit_next (struct GNUNET_PSYC_Channel *ch); - - -/** - * Reset stored data related to the last received message. - */ static void -recv_reset (struct GNUNET_PSYC_Channel *ch) +channel_recv_disconnect (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - ch->recv_state = MSG_STATE_START; - ch->recv_flags = 0; - ch->recv_message_id = 0; - //FIXME: ch->recv_slave_key = { 0 }; - ch->recv_mod_value_size = 0; - ch->recv_mod_value_size_expected = 0; + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + GNUNET_CLIENT_MANAGER_reconnect (client); + channel_send_connect_msg (chn); } static void -recv_error (struct GNUNET_PSYC_Channel *ch) +channel_recv_message (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - GNUNET_PSYC_MessageCallback message_cb - = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC - ? ch->hist_message_cb - : ch->message_cb; - - if (NULL != message_cb) - message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); - - recv_reset (ch); + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + GNUNET_PSYC_receive_message (chn->recv, + (const struct GNUNET_PSYC_MessageHeader *) msg); } -/** - * Queue a message part for transmission to the PSYC service. - * - * The message part is added to the current message buffer. - * When this buffer is full, it is added to the transmission queue. - * - * @param ch Channel struct for the client. - * @param msg Modifier message part, or NULL when there's no more modifiers. - * @param end End of message. - */ static void -queue_message (struct GNUNET_PSYC_Channel *ch, - const struct GNUNET_MessageHeader *msg, - uint8_t end) +channel_recv_message_ack (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - uint16_t size = msg ? ntohs (msg->size) : 0; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %u and size %u (end: %u)).\n", - ntohs (msg->type), size, end); - - struct MessageQueue *mq = ch->tmit_msg; - struct GNUNET_MessageHeader *qmsg = NULL; - if (NULL != mq) - { - qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - if (NULL == msg - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size) - { - /* End of message or buffer is full, add it to transmission queue - * and start with empty buffer */ - qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - qmsg->size = htons (qmsg->size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); - ch->tmit_msg = mq = NULL; - ch->tmit_ack_pending++; - } - else - { - /* Message fits in current buffer, append */ - ch->tmit_msg - = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size); - qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - memcpy ((char *) qmsg + qmsg->size, msg, size); - qmsg->size += size; - } - } - - if (NULL == mq && NULL != msg) - { - /* Empty buffer, copy over message. */ - ch->tmit_msg - = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size); - qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - qmsg->size = sizeof (*qmsg) + size; - memcpy (&qmsg[1], msg, size); - } - - if (NULL != mq - && (GNUNET_YES == end - || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD - < qmsg->size + sizeof (struct GNUNET_MessageHeader)))) - { - /* End of message or buffer is full, add it to transmission queue. */ - qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - qmsg->size = htons (qmsg->size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); - ch->tmit_msg = mq = NULL; - ch->tmit_ack_pending++; - } - - if (GNUNET_YES == end) - ch->in_transmit = GNUNET_NO; - - transmit_next (ch); + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + GNUNET_PSYC_transmit_got_ack (chn->tmit); } -/** - * Request a modifier from a client to transmit. - * - * @param mst Master handle. - */ static void -channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) +master_recv_start_ack (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - uint16_t max_data_size, data_size; - char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; - struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; - int notify_ret; - - switch (ch->tmit.state) - { - case MSG_STATE_MODIFIER: - { - struct GNUNET_PSYC_MessageModifier *mod - = (struct GNUNET_PSYC_MessageModifier *) msg; - max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); - msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); - notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1], - &mod->oper, &mod->value_size); - mod->name_size = strnlen ((char *) &mod[1], data_size); - if (mod->name_size < data_size) - { - mod->value_size = htonl (mod->value_size); - mod->name_size = htons (mod->name_size); - } - else if (0 < data_size) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); - notify_ret = GNUNET_SYSERR; - } - break; - } - case MSG_STATE_MOD_CONT: - { - max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); - msg->size = sizeof (struct GNUNET_MessageHeader); - notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, - &data_size, &msg[1], NULL, NULL); - break; - } - default: - GNUNET_assert (0); - } - - switch (notify_ret) - { - case GNUNET_NO: - if (0 == data_size) - { /* Transmission paused, nothing to send. */ - ch->tmit_paused = GNUNET_YES; - return; - } - ch->tmit.state = MSG_STATE_MOD_CONT; - break; - - case GNUNET_YES: - if (0 == data_size) - { - /* End of modifiers. */ - ch->tmit.state = MSG_STATE_DATA; - if (0 == ch->tmit_ack_pending) - channel_transmit_data (ch); - - return; - } - ch->tmit.state = MSG_STATE_MODIFIER; - break; - - default: - LOG (GNUNET_ERROR_TYPE_ERROR, - "MasterTransmitNotifyModifier returned error " - "when requesting a modifier.\n"); - - ch->tmit.state = MSG_STATE_CANCEL; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); - msg->size = htons (sizeof (*msg)); - - queue_message (ch, msg, GNUNET_YES); - return; - } - - if (0 < data_size) - { - GNUNET_assert (data_size <= max_data_size); - msg->size = htons (msg->size + data_size); - queue_message (ch, msg, GNUNET_NO); - } - - channel_transmit_mod (ch); -} + struct GNUNET_PSYC_Master * + mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, + sizeof (struct GNUNET_PSYC_Channel)); - -/** - * Request data from a client to transmit. - * - * @param mst Master handle. - */ -static void -channel_transmit_data (struct GNUNET_PSYC_Channel *ch) -{ - uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; - char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; - struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; - - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); - - int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls, - &data_size, &msg[1]); - switch (notify_ret) - { - case GNUNET_NO: - if (0 == data_size) - { - /* Transmission paused, nothing to send. */ - ch->tmit_paused = GNUNET_YES; - return; - } - break; - - case GNUNET_YES: - ch->tmit.state = MSG_STATE_END; - break; - - default: - LOG (GNUNET_ERROR_TYPE_ERROR, - "MasterTransmitNotify returned error when requesting data.\n"); - - ch->tmit.state = MSG_STATE_CANCEL; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); - msg->size = htons (sizeof (*msg)); - queue_message (ch, msg, GNUNET_YES); - return; - } - - if (0 < data_size) - { - GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); - msg->size = htons (sizeof (*msg) + data_size); - queue_message (ch, msg, !notify_ret); - } - - /* End of message. */ - if (GNUNET_YES == notify_ret) - { - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); - msg->size = htons (sizeof (*msg)); - queue_message (ch, msg, GNUNET_YES); - } + struct CountersResult *cres = (struct CountersResult *) msg; + if (NULL != mst->start_cb) + mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id)); } -/** - * Send a message to a channel. - * - * @param ch Handle to the PSYC channel. - * @param method_name Which method should be invoked. - * @param notify_mod Function to call to obtain modifiers. - * @param notify_data Function to call to obtain fragments of the data. - * @param notify_cls Closure for @a notify_mod and @a notify_data. - * @param flags Flags for the message being transmitted. - * - * @return Transmission handle, NULL on error (i.e. more than one request queued). - */ -static struct GNUNET_PSYC_ChannelTransmitHandle * -channel_transmit (struct GNUNET_PSYC_Channel *ch, - const char *method_name, - GNUNET_PSYC_TransmitNotifyModifier notify_mod, - GNUNET_PSYC_TransmitNotifyData notify_data, - void *notify_cls, - uint32_t flags) -{ - if (GNUNET_NO != ch->in_transmit) - return NULL; - ch->in_transmit = GNUNET_YES; - - size_t size = strlen (method_name) + 1; - struct GNUNET_PSYC_MessageMethod *pmeth; - struct GNUNET_MessageHeader *qmsg; - struct MessageQueue * - mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) - + sizeof (*pmeth) + size); - qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size; - - pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1]; - pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); - pmeth->header.size = htons (sizeof (*pmeth) + size); - pmeth->flags = htonl (flags); - memcpy (&pmeth[1], method_name, size); - - ch->tmit.ch = ch; - ch->tmit.notify_mod = notify_mod; - ch->tmit.notify_data = notify_data; - ch->tmit.notify_cls = notify_cls; - ch->tmit.state = MSG_STATE_MODIFIER; - - channel_transmit_mod (ch); - return &ch->tmit; -} - - -/** - * Resume transmission to the channel. - * - * @param th Handle of the request that is being resumed. - */ static void -channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th) +master_recv_join_request (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_PSYC_Channel *ch = th->ch; - if (0 == ch->tmit_ack_pending) - { - ch->tmit_paused = GNUNET_NO; - channel_transmit_data (ch); - } -} + struct GNUNET_PSYC_Master * + mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, + sizeof (struct GNUNET_PSYC_Channel)); + const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg; -/** - * Abort transmission request to channel. - * - * @param th Handle of the request that is being aborted. - */ -static void -channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th) -{ - struct GNUNET_PSYC_Channel *ch = th->ch; - if (GNUNET_NO == ch->in_transmit) - return; -} + struct GNUNET_PSYC_MessageHeader *pmsg = NULL; + if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg)) + pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; + struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + jh->mst = mst; + jh->slave_key = req->slave_key; -/** - * Handle incoming message from the PSYC service. - * - * @param ch The channel the message is sent to. - * @param pmsg The message. - */ -static void -handle_psyc_message (struct GNUNET_PSYC_Channel *ch, - const struct GNUNET_PSYC_MessageHeader *msg) -{ - uint16_t size = ntohs (msg->header.size); - uint32_t flags = ntohl (msg->flags); - - GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, - (struct GNUNET_MessageHeader *) msg); - - if (MSG_STATE_START == ch->recv_state) - { - ch->recv_message_id = GNUNET_ntohll (msg->message_id); - ch->recv_flags = flags; - ch->recv_slave_key = msg->slave_key; - ch->recv_mod_value_size = 0; - ch->recv_mod_value_size_expected = 0; - } - else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) - { - // FIXME - LOG (GNUNET_ERROR_TYPE_WARNING, - "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", - GNUNET_ntohll (msg->message_id), ch->recv_message_id); - GNUNET_break_op (0); - recv_error (ch); - return; - } - else if (flags != ch->recv_flags) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Unexpected message flags. Got: %lu, expected: %lu\n", - flags, ch->recv_flags); - GNUNET_break_op (0); - recv_error (ch); - return; - } - - uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; - - for (pos = 0; sizeof (*msg) + pos < size; pos += psize) - { - const struct GNUNET_MessageHeader *pmsg - = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); - psize = ntohs (pmsg->size); - ptype = ntohs (pmsg->type); - size_eq = size_min = 0; - - if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Dropping message of type %u with invalid size %u.\n", - ptype, psize); - recv_error (ch); - return; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message part from PSYC.\n"); - GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); - - switch (ptype) - { - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: - size_min = sizeof (struct GNUNET_PSYC_MessageMethod); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: - size_min = sizeof (struct GNUNET_PSYC_MessageModifier); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: - size_min = sizeof (struct GNUNET_MessageHeader); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: - size_eq = sizeof (struct GNUNET_MessageHeader); - break; - default: - GNUNET_break_op (0); - recv_error (ch); - return; - } - - if (! ((0 < size_eq && psize == size_eq) - || (0 < size_min && size_min <= psize))) - { - GNUNET_break_op (0); - recv_error (ch); - return; - } - - switch (ptype) - { - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: - { - struct GNUNET_PSYC_MessageMethod *meth - = (struct GNUNET_PSYC_MessageMethod *) pmsg; - - if (MSG_STATE_START != ch->recv_state) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping out of order message method (%u).\n", - ch->recv_state); - /* It is normal to receive an incomplete message right after connecting, - * but should not happen later. - * FIXME: add a check for this condition. - */ - GNUNET_break_op (0); - recv_error (ch); - return; - } - - if ('\0' != *((char *) meth + psize - 1)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping message with malformed method. " - "Message ID: %" PRIu64 "\n", ch->recv_message_id); - GNUNET_break_op (0); - recv_error (ch); - return; - } - ch->recv_state = MSG_STATE_METHOD; - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: - { - if (!(MSG_STATE_METHOD == ch->recv_state - || MSG_STATE_MODIFIER == ch->recv_state - || MSG_STATE_MOD_CONT == ch->recv_state)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping out of order message modifier (%u).\n", - ch->recv_state); - GNUNET_break_op (0); - recv_error (ch); - return; - } - - struct GNUNET_PSYC_MessageModifier *mod - = (struct GNUNET_PSYC_MessageModifier *) pmsg; - - uint16_t name_size = ntohs (mod->name_size); - ch->recv_mod_value_size_expected = ntohl (mod->value_size); - ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; - - if (psize < sizeof (*mod) + name_size + 1 - || '\0' != *((char *) &mod[1] + name_size) - || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); - GNUNET_break_op (0); - recv_error (ch); - return; - } - ch->recv_state = MSG_STATE_MODIFIER; - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - { - ch->recv_mod_value_size += psize - sizeof (*pmsg); - - if (!(MSG_STATE_MODIFIER == ch->recv_state - || MSG_STATE_MOD_CONT == ch->recv_state) - || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping out of order message modifier continuation " - "!(%u == %u || %u == %u) || %lu < %lu.\n", - MSG_STATE_MODIFIER, ch->recv_state, - MSG_STATE_MOD_CONT, ch->recv_state, - ch->recv_mod_value_size_expected, ch->recv_mod_value_size); - GNUNET_break_op (0); - recv_error (ch); - return; - } - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: - { - if (ch->recv_state < MSG_STATE_METHOD - || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping out of order message data fragment " - "(%u < %u || %lu != %lu).\n", - ch->recv_state, MSG_STATE_METHOD, - ch->recv_mod_value_size_expected, ch->recv_mod_value_size); - - GNUNET_break_op (0); - recv_error (ch); - return; - } - ch->recv_state = MSG_STATE_DATA; - break; - } - } - - GNUNET_PSYC_MessageCallback message_cb - = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC - ? ch->hist_message_cb - : ch->message_cb; - - if (NULL != message_cb) - message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg); - - switch (ptype) - { - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: - recv_reset (ch); - break; - } - } + if (NULL != mst->join_req_cb) + mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh); } -/** - * Handle incoming message acknowledgement from the PSYC service. - * - * @param ch The channel the acknowledgement is sent to. - */ static void -handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) +slave_recv_join_ack (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - if (0 == ch->tmit_ack_pending) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); - GNUNET_break (0); - return; - } - ch->tmit_ack_pending--; - - switch (ch->tmit.state) - { - case MSG_STATE_MODIFIER: - case MSG_STATE_MOD_CONT: - if (GNUNET_NO == ch->tmit_paused) - channel_transmit_mod (ch); - break; - - case MSG_STATE_DATA: - if (GNUNET_NO == ch->tmit_paused) - channel_transmit_data (ch); - break; - - case MSG_STATE_END: - case MSG_STATE_CANCEL: - break; - - default: - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring message ACK in state %u.\n", ch->tmit.state); - } + struct GNUNET_PSYC_Slave * + slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, + sizeof (struct GNUNET_PSYC_Channel)); + struct CountersResult *cres = (struct CountersResult *) msg; + if (NULL != slv->connect_cb) + slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id)); } static void -handle_psyc_join_request (struct GNUNET_PSYC_Master *mst, - const struct MasterJoinRequest *req) +slave_recv_join_decision (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_PSYC_MessageHeader *msg = NULL; - if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg)) - msg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; + struct GNUNET_PSYC_Slave * + slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, + sizeof (struct GNUNET_PSYC_Channel)); + const struct SlaveJoinDecision * + dcsn = (const struct SlaveJoinDecision *) msg; - struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); - jh->mst = mst; - jh->slave_key = req->slave_key; + struct GNUNET_PSYC_MessageHeader *pmsg = NULL; + if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) + pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; - if (NULL != mst->join_req_cb) - mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh); + struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + if (NULL != slv->join_dcsn_cb) + slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg); } -static void -handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv, - const struct SlaveJoinDecision *dcsn) +static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = { - struct GNUNET_PSYC_MessageHeader *msg = NULL; - if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg)) - msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; + { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); - if (NULL != slv->join_dcsn_cb) - slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg); -} + { &channel_recv_message, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, + sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, + { &channel_recv_message_ack, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, + sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, - const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_PSYC_Channel *ch = cls; - struct GNUNET_PSYC_Master *mst = cls; - struct GNUNET_PSYC_Slave *slv = cls; - - if (NULL == msg) - { - // timeout / disconnected from service, reconnect - reschedule_connect (ch); - return; - } - uint16_t size_eq = 0; - uint16_t size_min = 0; - uint16_t size = ntohs (msg->size); - uint16_t type = ntohs (msg->type); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d and size %u from PSYC service\n", - type, size); - - switch (type) - { - case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: - case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: - size_eq = sizeof (struct CountersResult); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: - size_min = sizeof (struct GNUNET_PSYC_MessageHeader); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: - size_eq = sizeof (struct GNUNET_MessageHeader); - break; - case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: - size_min = sizeof (struct MasterJoinRequest); - break; - case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: - size_min = sizeof (struct SlaveJoinDecision); - break; - default: - GNUNET_break_op (0); - return; - } - - if (! ((0 < size_eq && size == size_eq) - || (0 < size_min && size_min <= size))) - { - GNUNET_break_op (0); - return; - } - - switch (type) - { - case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: - { - struct CountersResult *cres = (struct CountersResult *) msg; - if (NULL != mst->start_cb) - mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: - { - struct CountersResult *cres = (struct CountersResult *) msg; - if (NULL != slv->connect_cb) - slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: - { - handle_psyc_message_ack (ch); - break; - } - - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: - handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); - break; - - case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: - handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch, - (const struct MasterJoinRequest *) msg); - break; - - case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: - handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch, - (const struct SlaveJoinDecision *) msg); - break; - } - - if (NULL != ch->client) - { - GNUNET_CLIENT_receive (ch->client, &message_handler, ch, - GNUNET_TIME_UNIT_FOREVER_REL); - } -} + { &master_recv_start_ack, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, + sizeof (struct CountersResult), GNUNET_NO }, + { &master_recv_join_request, NULL, + GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, + sizeof (struct MasterJoinRequest), GNUNET_YES }, -/** - * Transmit next message to service. - * - * @param cls The struct GNUNET_PSYC_Channel. - * @param size Number of bytes available in @a buf. - * @param buf Where to copy the message. - * - * @return Number of bytes copied to @a buf. - */ -static size_t -send_next_message (void *cls, size_t size, void *buf) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); - struct GNUNET_PSYC_Channel *ch = cls; - struct MessageQueue *mq = ch->tmit_head; - if (NULL == mq) - return 0; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - size_t ret = ntohs (qmsg->size); - ch->th = NULL; - if (ret > size) - { - reschedule_connect (ch); - return 0; - } - memcpy (buf, qmsg, ret); - - GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq); - GNUNET_free (mq); - - if (NULL != ch->tmit_head) - transmit_next (ch); - - if (GNUNET_NO == ch->in_receive) - { - ch->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (ch->client, &message_handler, ch, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return ret; -} + { NULL, NULL, 0, 0, GNUNET_NO } +}; -/** - * Schedule transmission of the next message from our queue. - * - * @param ch PSYC handle. - */ -static void -transmit_next (struct GNUNET_PSYC_Channel *ch) +static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = { - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); - if (NULL != ch->th || NULL == ch->client) - return; - - struct MessageQueue *mq = ch->tmit_head; - if (NULL == mq) - return; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - - ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, - ntohs (qmsg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &send_next_message, - ch); -} + { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, + { &channel_recv_message, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, + sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, -/** - * Try again to connect to the PSYC service. - * - * @param cls Channel handle. - * @param tc Scheduler context. - */ -static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_PSYC_Channel *ch = cls; - - recv_reset (ch); - ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to PSYC service.\n"); - GNUNET_assert (NULL == ch->client); - ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); - GNUNET_assert (NULL != ch->client); - uint16_t reconn_size = ntohs (ch->reconnect_msg->size); - - if (NULL == ch->tmit_head || - 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size)) - { - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); - memcpy (&mq[1], ch->reconnect_msg, reconn_size); - GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq); - } - transmit_next (ch); -} + { &channel_recv_message_ack, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, + sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, + { &slave_recv_join_ack, NULL, + GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, + sizeof (struct CountersResult), GNUNET_NO }, -/** - * Disconnect from the PSYC service. - * - * @param c Channel handle to disconnect. - */ -static void -disconnect (void *c) -{ - struct GNUNET_PSYC_Channel *ch = c; - - GNUNET_assert (NULL != ch); - if (ch->tmit_head != ch->tmit_tail) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Disconnecting while there are still outstanding messages!\n"); - GNUNET_break (0); - } - if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (ch->reconnect_task); - ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != ch->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); - ch->th = NULL; - } - if (NULL != ch->client) - { - GNUNET_CLIENT_disconnect (ch->client); - ch->client = NULL; - } - if (NULL != ch->reconnect_msg) - { - GNUNET_free (ch->reconnect_msg); - ch->reconnect_msg = NULL; - } -} + { &slave_recv_join_decision, NULL, + GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, + sizeof (struct SlaveJoinDecision), GNUNET_YES }, + + { NULL, NULL, 0, 0, GNUNET_NO } +}; /** @@ -1227,24 +370,29 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls) { struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); - struct GNUNET_PSYC_Channel *ch = &mst->ch; - struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); + struct GNUNET_PSYC_Channel *chn = &mst->chn; + struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); req->header.size = htons (sizeof (*req)); req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); req->channel_key = *channel_key; req->policy = policy; + chn->connect_msg = (struct GNUNET_MessageHeader *) req; + chn->cfg = cfg; + chn->is_master = GNUNET_YES; + mst->start_cb = start_cb; mst->join_req_cb = join_request_cb; - ch->message_cb = message_cb; - ch->cb_cls = cls; - ch->cfg = cfg; - ch->is_master = GNUNET_YES; - ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; - ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); + mst->cb_cls = cls; + + chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers); + GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); + + chn->tmit = GNUNET_PSYC_transmit_create (chn->client); + chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); + channel_send_connect_msg (chn); return mst; } @@ -1253,12 +401,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, * Stop a PSYC master channel. * * @param master PSYC channel master to stop. + * @param keep_active FIXME */ void -GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) +GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) { - disconnect (master); - GNUNET_free (master); + GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES); + GNUNET_free (mst); } @@ -1292,7 +441,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_PSYC_MessageHeader *join_resp) { - struct GNUNET_PSYC_Channel *ch = &jh->mst->ch; + struct GNUNET_PSYC_Channel *chn = &jh->mst->chn; struct MasterJoinDecision *dcsn; uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; @@ -1302,9 +451,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, < sizeof (*dcsn) + relay_size + join_resp_size) return GNUNET_SYSERR; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) - + relay_size + join_resp_size); - dcsn = (struct MasterJoinDecision *) &mq[1]; + dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size); dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); dcsn->is_admitted = htonl (is_admitted); @@ -1313,8 +460,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, if (0 < join_resp_size) memcpy (&dcsn[1], join_resp, join_resp_size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); - transmit_next (ch); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); return GNUNET_OK; } @@ -1332,40 +478,59 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, * @return Transmission handle, NULL on error (i.e. more than one request queued). */ struct GNUNET_PSYC_MasterTransmitHandle * -GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, +GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst, const char *method_name, GNUNET_PSYC_TransmitNotifyModifier notify_mod, GNUNET_PSYC_TransmitNotifyData notify_data, void *notify_cls, enum GNUNET_PSYC_MasterTransmitFlags flags) { - return (struct GNUNET_PSYC_MasterTransmitHandle *) - channel_transmit (&master->ch, method_name, notify_mod, notify_data, - notify_cls, flags); + if (GNUNET_OK + == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL, + notify_mod, notify_data, notify_cls, + flags)) + return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit; + else + return NULL; } /** * Resume transmission to the channel. * - * @param th Handle of the request that is being resumed. + * @param tmit Handle of the request that is being resumed. */ void -GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) +GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *tmit) { - channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); + GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit); } /** * Abort transmission request to the channel. * - * @param th Handle of the request that is being aborted. + * @param tmit Handle of the request that is being aborted. */ void -GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) +GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *tmit) { - channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); + GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit); +} + + +/** + * Convert a channel @a master to a @e channel handle to access the @e channel + * APIs. + * + * @param master Channel master handle. + * + * @return Channel handle, valid for as long as @a master is valid. + */ +struct GNUNET_PSYC_Channel * +GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) +{ + return &master->chn; } @@ -1420,7 +585,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, uint16_t data_size) { struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); - struct GNUNET_PSYC_Channel *ch = &slv->ch; + struct GNUNET_PSYC_Channel *chn = &slv->chn; struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); req->header.size = htons (sizeof (*req) @@ -1432,17 +597,21 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, req->relay_count = htonl (relay_count); memcpy (&req[1], relays, relay_count * sizeof (*relays)); + chn->connect_msg = (struct GNUNET_MessageHeader *) req; + chn->cfg = cfg; + chn->is_master = GNUNET_NO; + slv->connect_cb = connect_cb; slv->join_dcsn_cb = join_decision_cb; - ch->message_cb = message_cb; - ch->cb_cls = cls; + slv->cb_cls = cls; - ch->cfg = cfg; - ch->is_master = GNUNET_NO; - ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; - ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); + chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); + GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); + chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); + chn->tmit = GNUNET_PSYC_transmit_create (chn->client); + + channel_send_connect_msg (chn); return slv; } @@ -1456,10 +625,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param slave Slave handle. */ void -GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) +GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv) { - disconnect (slave); - GNUNET_free (slave); + GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES); + GNUNET_free (slv); } @@ -1477,69 +646,59 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) * queued). */ struct GNUNET_PSYC_SlaveTransmitHandle * -GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, +GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv, const char *method_name, GNUNET_PSYC_TransmitNotifyModifier notify_mod, GNUNET_PSYC_TransmitNotifyData notify_data, void *notify_cls, enum GNUNET_PSYC_SlaveTransmitFlags flags) + { - return (struct GNUNET_PSYC_SlaveTransmitHandle *) - channel_transmit (&slave->ch, method_name, - notify_mod, notify_data, notify_cls, flags); + if (GNUNET_OK + == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL, + notify_mod, notify_data, notify_cls, + flags)) + return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit; + else + return NULL; } /** * Resume transmission to the master. * - * @param th Handle of the request that is being resumed. + * @param tmit Handle of the request that is being resumed. */ void -GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th) +GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *tmit) { - channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); + GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit); } /** * Abort transmission request to master. * - * @param th Handle of the request that is being aborted. + * @param tmit Handle of the request that is being aborted. */ void -GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) +GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *tmit) { - channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); -} - - -/** - * Convert a channel @a master to a @e channel handle to access the @e channel - * APIs. - * - * @param master Channel master handle. - * - * @return Channel handle, valid for as long as @a master is valid. - */ -struct GNUNET_PSYC_Channel * -GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) -{ - return &master->ch; + GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit); } /** * Convert @a slave to a @e channel handle to access the @e channel APIs. * - * @param slave Slave handle. + * @param slv Slave handle. * * @return Channel handle, valid for as long as @a slave is valid. */ struct GNUNET_PSYC_Channel * -GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) +GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv) { - return &slave->ch; + return &slv->chn; } @@ -1565,23 +724,17 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) * @param effective_since Addition of slave is in effect since this message ID. */ void -GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, +GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, uint64_t announced_at, uint64_t effective_since) { - struct ChannelSlaveAdd *slvadd; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd)); - - slvadd = (struct ChannelSlaveAdd *) &mq[1]; - slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); - slvadd->header.size = htons (sizeof (*slvadd)); - slvadd->announced_at = GNUNET_htonll (announced_at); - slvadd->effective_since = GNUNET_htonll (effective_since); - GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, - channel->tmit_tail, - mq); - transmit_next (channel); + struct ChannelSlaveAdd *add = GNUNET_malloc (sizeof (*add)); + add->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); + add->header.size = htons (sizeof (*add)); + add->announced_at = GNUNET_htonll (announced_at); + add->effective_since = GNUNET_htonll (effective_since); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &add->header); } @@ -1607,21 +760,15 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, * @param announced_at ID of the message that announced the membership change. */ void -GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, +GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, uint64_t announced_at) { - struct ChannelSlaveRemove *slvrm; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm)); - - slvrm = (struct ChannelSlaveRemove *) &mq[1]; - slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); - slvrm->header.size = htons (sizeof (*slvrm)); - slvrm->announced_at = GNUNET_htonll (announced_at); - GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, - channel->tmit_tail, - mq); - transmit_next (channel); + struct ChannelSlaveRemove *rm = GNUNET_malloc (sizeof (*rm)); + rm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); + rm->header.size = htons (sizeof (*rm)); + rm->announced_at = GNUNET_htonll (announced_at); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &rm->header); } |