diff options
author | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
commit | 9955561e1b204ccf23fbf841f409bd3ef79be88c (patch) | |
tree | 0271c23ae9f1dad72266a0e6073d696e5afca027 | |
parent | a5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff) |
psyc, multicast: reorg code, use new client manager & psyc util lib
-rw-r--r-- | src/include/gnunet_multicast_service.h | 6 | ||||
-rw-r--r-- | src/include/gnunet_psyc_service.h | 4 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 614 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 791 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 1295 | ||||
-rw-r--r-- | src/psyc/psyc_util_lib.c | 1 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 129 |
7 files changed, 832 insertions, 2008 deletions
diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h index 5079a087b6..bb109f4de0 100644 --- a/src/include/gnunet_multicast_service.h +++ b/src/include/gnunet_multicast_service.h @@ -368,9 +368,7 @@ typedef void */ typedef void (*GNUNET_MULTICAST_RequestCallback) (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, - const struct GNUNET_MessageHeader *req, - enum GNUNET_MULTICAST_MessageFlags flags); + const struct GNUNET_MULTICAST_RequestHeader *req); /** @@ -394,7 +392,7 @@ typedef void */ typedef void (*GNUNET_MULTICAST_MessageCallback) (void *cls, - const struct GNUNET_MessageHeader *msg); + const struct GNUNET_MULTICAST_MessageHeader *msg); /** diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index e6f2a78c14..4806767fbb 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h @@ -471,7 +471,7 @@ typedef int * Only needed during the first call to this callback at the beginning * of the modifier. In case of subsequent calls asking for value * continuations @a oper is set to #NULL. - * @param[out] value_size Where to write the full size of the value. + * @param[out] full_value_size Where to write the full size of the value. * Only needed during the first call to this callback at the beginning * of the modifier. In case of subsequent calls asking for value * continuations @a value_size is set to #NULL. @@ -489,7 +489,7 @@ typedef int uint16_t *data_size, void *data, uint8_t *oper, - uint32_t *value_size); + uint32_t *full_value_size); /** * Flags for transmitting messages to a channel by the master. diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 36d564d524..b6d51896d7 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -24,6 +24,7 @@ * @author Christian Grothoff * @author Gabor X Toth */ + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_multicast_service.h" @@ -33,26 +34,6 @@ /** - * Started origins. - * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin - */ -static struct GNUNET_CONTAINER_MultiHashMap *origins; - -/** - * Joined members. - * group_key_hash -> struct GNUNET_MULTICAST_Member - */ -static struct GNUNET_CONTAINER_MultiHashMap *members; - - -struct MessageQueue -{ - struct MessageQueue *prev; - struct MessageQueue *next; -}; - - -/** * Handle for a request to send a message to all multicast group members * (from the origin). */ @@ -90,47 +71,14 @@ struct GNUNET_MULTICAST_Group const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Client connection to the service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Currently pending transmission request, or NULL for none. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - - /** - * Head of operations to transmit. - */ - struct MessageQueue *tmit_head; - - /** - * Tail of operations to transmit. - */ - struct MessageQueue *tmit_tail; - - /** - * Message being transmitted to the Multicast service. - */ - struct MessageQueue *tmit_msg; + struct GNUNET_CLIENT_MANAGER_Connection *client; /** * Message to send on reconnect. */ - struct GNUNET_MessageHeader *reconnect_msg; - - /** - * Task doing exponential back-off trying to reconnect. - */ - GNUNET_SCHEDULER_TaskIdentifier reconnect_task; - - /** - * Time for next connect retry. - */ - struct GNUNET_TIME_Relative reconnect_delay; - - struct GNUNET_CRYPTO_EddsaPublicKey pub_key; - struct GNUNET_HashCode pub_key_hash; + struct GNUNET_MessageHeader *connect_msg; GNUNET_MULTICAST_JoinRequestCallback join_req_cb; GNUNET_MULTICAST_MembershipTestCallback member_test_cb; @@ -140,11 +88,6 @@ struct GNUNET_MULTICAST_Group void *cb_cls; /** - * Are we polling for incoming messages right now? - */ - uint8_t in_receive; - - /** * Are we currently transmitting a message? */ uint8_t in_transmit; @@ -163,7 +106,6 @@ struct GNUNET_MULTICAST_Origin { struct GNUNET_MULTICAST_Group grp; struct GNUNET_MULTICAST_OriginTransmitHandle tmit; - struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; GNUNET_MULTICAST_RequestCallback request_cb; }; @@ -229,294 +171,125 @@ struct GNUNET_MULTICAST_MemberReplayHandle }; -static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - - -static void -reschedule_connect (struct GNUNET_MULTICAST_Group *grp); - - /** - * Schedule transmission of the next message from our queue. - * - * @param grp PSYC channel handle + * Send first message to the service after connecting. */ static void -transmit_next (struct GNUNET_MULTICAST_Group *grp); - - -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg); - - -/** - * Reschedule a connect attempt to the service. - * - * @param c channel to reconnect - */ -static void -reschedule_connect (struct GNUNET_MULTICAST_Group *grp) +group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp) { - GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK); - - if (NULL != grp->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th); - grp->th = NULL; - } - if (NULL != grp->client) - { - GNUNET_CLIENT_disconnect (grp->client); - grp->client = NULL; - } - grp->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to Multicast service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES)); - grp->reconnect_task = - GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp); - grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); + uint16_t cmsg_size = ntohs (grp->connect_msg->size); + struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); + memcpy (cmsg, grp->connect_msg, cmsg_size); + GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg); } /** - * Reset stored data related to the last received message. + * Got disconnected from service. Reconnect. */ static void -recv_reset (struct GNUNET_MULTICAST_Group *grp) -{ -} - - -static void -recv_error (struct GNUNET_MULTICAST_Group *grp) -{ - if (NULL != grp->message_cb) - grp->message_cb (grp->cb_cls, NULL); - - recv_reset (grp); -} - - -/** - * Transmit next message to service. - * - * @param cls The struct GNUNET_MULTICAST_Group. - * @param size Number of bytes available in @a buf. - * @param buf Where to copy the message. - * - * @return Number of bytes copied to @a buf. - */ -static size_t -send_next_message (void *cls, size_t size, void *buf) +group_recv_disconnect (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); - struct GNUNET_MULTICAST_Group *grp = cls; - struct MessageQueue *mq = grp->tmit_head; - if (NULL == mq) - return 0; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - size_t ret = ntohs (qmsg->size); - grp->th = NULL; - if (ret > size) - { - reschedule_connect (grp); - return 0; - } - memcpy (buf, qmsg, ret); - - GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq); - GNUNET_free (mq); - - if (NULL != grp->tmit_head) - transmit_next (grp); - - if (GNUNET_NO == grp->in_receive) - { - grp->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (grp->client, &message_handler, grp, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return ret; + struct GNUNET_MULTICAST_Group * + grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + GNUNET_CLIENT_MANAGER_reconnect (client); + group_send_connect_msg (grp); } /** - * Schedule transmission of the next message from our queue. - * - * @param grp Multicast group handle. + * Receive join request from service. */ static void -transmit_next (struct GNUNET_MULTICAST_Group *grp) +group_recv_join_request (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); - if (NULL != grp->th || NULL == grp->client) - return; - - struct MessageQueue *mq = grp->tmit_head; - if (NULL == mq) - return; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - - grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client, - ntohs (qmsg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &send_next_message, - grp); -} + struct GNUNET_MULTICAST_Group * + grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + const struct MulticastJoinRequestMessage * + jreq = (const struct MulticastJoinRequestMessage *) msg; -/** - * Try again to connect to the Multicast service. - * - * @param cls Channel handle. - * @param tc Scheduler context. - */ -static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_MULTICAST_Group *grp = cls; - - recv_reset (grp); - grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to Multicast service.\n"); - GNUNET_assert (NULL == grp->client); - grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg); - GNUNET_assert (NULL != grp->client); - uint16_t reconn_size = ntohs (grp->reconnect_msg->size); - - if (NULL == grp->tmit_head || - 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size)) - { - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); - memcpy (&mq[1], grp->reconnect_msg, reconn_size); - GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq); - } - transmit_next (grp); -} + struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + jh->group = grp; + jh->member_key = jreq->member_key; + jh->member_peer = jreq->member_peer; + const struct GNUNET_MessageHeader *jmsg = NULL; + if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) + jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; -/** - * Disconnect from the Multicast service. - * - * @param g Group handle to disconnect. - */ -static void -disconnect (void *g) -{ - struct GNUNET_MULTICAST_Group *grp = g; - - GNUNET_assert (NULL != grp); - if (grp->tmit_head != grp->tmit_tail) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Disconnecting while there are still outstanding messages!\n"); - GNUNET_break (0); - } - if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (grp->reconnect_task); - grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != grp->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th); - grp->th = NULL; - } - if (NULL != grp->client) - { - GNUNET_CLIENT_disconnect (grp->client); - grp->client = NULL; - } - if (NULL != grp->reconnect_msg) - { - GNUNET_free (grp->reconnect_msg); - grp->reconnect_msg = NULL; - } + if (NULL != grp->join_req_cb) + grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh); } /** - * Iterator callback for calling message callbacks for all groups. + * Receive multicast message from service. */ -static int -message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group) +static void +group_recv_message (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - const struct GNUNET_MessageHeader *msg = cls; - struct GNUNET_MULTICAST_Group *grp = group; + struct GNUNET_MULTICAST_Group * + grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + struct GNUNET_MULTICAST_MessageHeader * + mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling message callback with a message " - "of type %u and size %u.\n", - ntohs (msg->type), ntohs (msg->size)); + "Calling message callback with a message of size %u.\n", + ntohs (mmsg->header.size)); if (NULL != grp->message_cb) - grp->message_cb (grp->cb_cls, msg); - - return GNUNET_YES; + grp->message_cb (grp->cb_cls, mmsg); } /** - * Iterator callback for calling request callbacks of origins. + * Origin receives uniquest request from a member. */ -static int -request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) -{ - const struct GNUNET_MULTICAST_RequestHeader *req = cls; - struct GNUNET_MULTICAST_Origin *orig = origin; +static void +origin_recv_request (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Origin * + orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + grp = &orig->grp; + struct GNUNET_MULTICAST_RequestHeader * + req = (struct GNUNET_MULTICAST_RequestHeader *) msg; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling request callback for a request of type %u and size %u.\n", - ntohs (req->header.type), ntohs (req->header.size)); + "Calling request callback with a request of size %u.\n", + ntohs (req->header.size)); if (NULL != orig->request_cb) - orig->request_cb (orig->grp.cb_cls, &req->member_key, - (const struct GNUNET_MessageHeader *) req, 0); - return GNUNET_YES; + orig->request_cb (grp->cb_cls, req); } /** - * Iterator callback for calling join request callbacks of origins. + * Member receives join decision. */ -static int -join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *group) +static void +member_recv_join_decision (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - const struct MulticastJoinRequestMessage *req = cls; - struct GNUNET_MULTICAST_Group *grp = group; - - struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); - jh->group = grp; - jh->member_key = req->member_key; - jh->member_peer = req->member_peer; - - const struct GNUNET_MessageHeader *msg = NULL; - if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size)) - msg = (const struct GNUNET_MessageHeader *) &req[1]; - - if (NULL != grp->join_req_cb) - grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh); - return GNUNET_YES; -} - + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member * + mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + grp = &mem->grp; -/** - * Iterator callback for calling join decision callbacks of members. - */ -static int -join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *member) -{ - const struct MulticastJoinDecisionMessageHeader *hdcsn = cls; + const struct MulticastJoinDecisionMessageHeader * + hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg; const struct MulticastJoinDecisionMessage * dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; - struct GNUNET_MULTICAST_Member *mem = member; - struct GNUNET_MULTICAST_Group *grp = &mem->grp; uint16_t dcsn_size = ntohs (dcsn->header.size); int is_admitted = ntohl (dcsn->is_admitted); @@ -549,116 +322,53 @@ join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, if (GNUNET_YES != is_admitted) GNUNET_MULTICAST_member_part (mem); - - return GNUNET_YES; } + /** - * Function called when we receive a message from the service. - * - * @param cls struct GNUNET_MULTICAST_Group - * @param msg Message received, NULL on timeout or fatal error. + * Message handlers for an origin. */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = { - struct GNUNET_MULTICAST_Group *grp = cls; + { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - if (NULL == msg) - { - // timeout / disconnected from service, reconnect - reschedule_connect (grp); - return; - } + { &group_recv_message, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, - uint16_t size_eq = 0; - uint16_t size_min = 0; - uint16_t size = ntohs (msg->size); - uint16_t type = ntohs (msg->type); + { &origin_recv_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d and size %u from Multicast service\n", - type, size); + { &group_recv_join_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, - switch (type) - { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: - size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: - size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader); - break; + { NULL, NULL, 0, 0, GNUNET_NO } +}; - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: - size_min = sizeof (struct MulticastJoinRequestMessage); - break; - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: - size_min = sizeof (struct MulticastJoinDecisionMessage); - break; +/** + * Message handlers for a member. + */ +static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] = +{ + { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - default: - GNUNET_break_op (0); - type = 0; - } + { &group_recv_message, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, - if (! ((0 < size_eq && size == size_eq) - || (0 < size_min && size_min <= size))) - { - GNUNET_break_op (0); - type = 0; - } + { &group_recv_join_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, - switch (type) - { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: - if (origins != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - message_cb, (void *) msg); - if (members != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - message_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: - if (GNUNET_YES != grp->is_origin) - { - GNUNET_break (0); - break; - } - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - request_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - join_request_cb, (void *) msg); - if (NULL != members) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - join_request_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: - if (GNUNET_NO != grp->is_origin) - { - GNUNET_break (0); - break; - } - if (NULL != members) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - join_decision_cb, (void *) msg); - break; - } + { &member_recv_join_decision, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES }, - if (NULL != grp->client) - { - GNUNET_CLIENT_receive (grp->client, &message_handler, grp, - GNUNET_TIME_UNIT_FOREVER_REL); - } -} + { NULL, NULL, 0, 0, GNUNET_NO } +}; /** @@ -667,7 +377,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) * Must be called once and only once in response to an invocation of the * #GNUNET_MULTICAST_JoinRequestCallback. * - * @param jh Join request handle. + * @param join Join request handle. * @param is_admitted #GNUNET_YES if the join is approved, * #GNUNET_NO if it is disapproved, * #GNUNET_SYSERR if we cannot answer the request. @@ -685,27 +395,25 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) * peer that issued the request even if admission is denied. */ struct GNUNET_MULTICAST_ReplayHandle * -GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, +GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, int is_admitted, uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_resp) { - struct GNUNET_MULTICAST_Group *grp = jh->group; + struct GNUNET_MULTICAST_Group *grp = join->group; uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; uint16_t relay_size = relay_count * sizeof (*relays); + struct MulticastJoinDecisionMessageHeader * hdcsn; struct MulticastJoinDecisionMessage *dcsn; - struct MessageQueue * - mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn) - + relay_size + join_resp_size); - - hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1]; - hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn) + + relay_size + join_resp_size); hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) - + relay_size + join_resp_size); - hdcsn->member_key = jh->member_key; - hdcsn->peer = jh->member_peer; + + relay_size + join_resp_size); + hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn->member_key = join->member_key; + hdcsn->peer = join->member_peer; dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); @@ -717,10 +425,8 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, if (0 < join_resp_size) memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); - transmit_next (grp); - - GNUNET_free (jh); + GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); + GNUNET_free (join); return NULL; } @@ -832,7 +538,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, start->max_fragment_id = max_fragment_id; memcpy (&start->group_key, priv_key, sizeof (*priv_key)); - grp->reconnect_msg = (struct GNUNET_MessageHeader *) start; + grp->connect_msg = (struct GNUNET_MessageHeader *) start; grp->is_origin = GNUNET_YES; grp->cfg = cfg; @@ -844,20 +550,10 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, grp->message_cb = message_cb; orig->request_cb = request_cb; - orig->priv_key = *priv_key; - - GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key); - GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), - &grp->pub_key_hash); - - if (NULL == origins) - origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); + grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers); + GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp)); + group_send_connect_msg (grp); return orig; } @@ -871,8 +567,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) { - disconnect (&orig->grp); - GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig); + GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES); GNUNET_free (orig); } @@ -885,26 +580,22 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); - - struct GNUNET_MULTICAST_MessageHeader * - msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1]; + struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, "OriginTransmitNotify() returned error or invalid message size.\n"); /* FIXME: handle error */ - GNUNET_free (mq); + GNUNET_free (msg); return; } if (GNUNET_NO == ret && 0 == buf_size) { - GNUNET_free (mq); + GNUNET_free (msg); return; /* Transmission paused. */ } @@ -915,7 +606,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); tmit->fragment_offset += sizeof (*msg) + buf_size; - transmit_next (grp); + GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); } @@ -939,6 +630,12 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, GNUNET_MULTICAST_OriginTransmitNotify notify, void *notify_cls) { +/* FIXME + if (GNUNET_YES == orig->grp.in_transmit) + return NULL; + orig->grp.in_transmit = GNUNET_YES; +*/ + struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; tmit->origin = orig; tmit->message_id = message_id; @@ -1047,10 +744,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, if (0 < join_msg_size) memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); - grp->reconnect_msg = (struct GNUNET_MessageHeader *) join; + grp->connect_msg = (struct GNUNET_MessageHeader *) join; grp->is_origin = GNUNET_NO; grp->cfg = cfg; - grp->pub_key = *group_key; mem->join_dcsn_cb = join_decision_cb; grp->join_req_cb = join_request_cb; @@ -1059,17 +755,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, grp->message_cb = message_cb; grp->cb_cls = cls; - GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key); - GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); - - if (NULL == members) - members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); + grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers); + GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp)); + group_send_connect_msg (grp); return mem; } @@ -1088,8 +776,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) { - disconnect (&mem->grp); - GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem); + GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES); GNUNET_free (mem); } @@ -1162,25 +849,26 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) struct GNUNET_MULTICAST_Group *grp = &mem->grp; struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; - size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); - - struct GNUNET_MULTICAST_RequestHeader * - req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1]; + size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; + struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, "MemberTransmitNotify() returned error or invalid message size.\n"); /* FIXME: handle error */ + GNUNET_free (req); return; } if (GNUNET_NO == ret && 0 == buf_size) - return; /* Transmission paused. */ + { + /* Transmission paused. */ + GNUNET_free (req); + return; + } req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); req->header.size = htons (sizeof (*req) + buf_size); @@ -1188,7 +876,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); tmit->fragment_offset += sizeof (*req) + buf_size; - transmit_next (grp); + GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); } @@ -1207,6 +895,12 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, GNUNET_MULTICAST_MemberTransmitNotify notify, void *notify_cls) { +/* FIXME + if (GNUNET_YES == mem->grp.in_transmit) + return NULL; + mem->grp.in_transmit = GNUNET_YES; +*/ + struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; tmit->member = mem; tmit->request_id = request_id; diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 13f908b6cd..9dcf40e0f3 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -34,6 +34,7 @@ #include "gnunet_multicast_service.h" #include "gnunet_psycstore_service.h" #include "gnunet_psyc_service.h" +#include "gnunet_psyc_util_lib.h" #include "psyc.h" @@ -174,10 +175,10 @@ struct FragmentQueue /** * List of connected clients. */ -struct ClientList +struct ClientListItem { - struct ClientList *prev; - struct ClientList *next; + struct ClientListItem *prev; + struct ClientListItem *next; struct GNUNET_SERVER_Client *client; }; @@ -187,8 +188,8 @@ struct ClientList */ struct Channel { - struct ClientList *clients_head; - struct ClientList *clients_tail; + struct ClientListItem *clients_head; + struct ClientListItem *clients_tail; struct TransmitMessage *tmit_head; struct TransmitMessage *tmit_tail; @@ -282,7 +283,7 @@ struct Master /** * Channel struct common for Master and Slave */ - struct Channel ch; + struct Channel chn; /** * Private key of the channel. @@ -339,7 +340,7 @@ struct Slave /** * Channel struct common for Master and Slave */ - struct Channel ch; + struct Channel chn; /** * Private key of the slave. @@ -399,11 +400,11 @@ struct Slave static inline void -transmit_message (struct Channel *ch); +transmit_message (struct Channel *chn); static uint64_t -message_queue_drop (struct Channel *ch); +message_queue_drop (struct Channel *chn); /** @@ -434,12 +435,12 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void cleanup_master (struct Master *mst) { - struct Channel *ch = &mst->ch; + struct Channel *chn = &mst->chn; if (NULL != mst->origin) GNUNET_MULTICAST_origin_stop (mst->origin); GNUNET_CONTAINER_multihashmap_destroy (mst->join_reqs); - GNUNET_CONTAINER_multihashmap_remove (masters, &ch->pub_key_hash, ch); + GNUNET_CONTAINER_multihashmap_remove (masters, &chn->pub_key_hash, chn); } @@ -449,20 +450,20 @@ cleanup_master (struct Master *mst) static void cleanup_slave (struct Slave *slv) { - struct Channel *ch = &slv->ch; + struct Channel *chn = &slv->chn; struct GNUNET_CONTAINER_MultiHashMap * - ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, - &ch->pub_key_hash); - GNUNET_assert (NULL != ch_slv); - GNUNET_CONTAINER_multihashmap_remove (ch_slv, &slv->pub_key_hash, slv); + chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, + &chn->pub_key_hash); + GNUNET_assert (NULL != chn_slv); + GNUNET_CONTAINER_multihashmap_remove (chn_slv, &slv->pub_key_hash, slv); - if (0 == GNUNET_CONTAINER_multihashmap_size (ch_slv)) + if (0 == GNUNET_CONTAINER_multihashmap_size (chn_slv)) { - GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &ch->pub_key_hash, - ch_slv); - GNUNET_CONTAINER_multihashmap_destroy (ch_slv); + GNUNET_CONTAINER_multihashmap_remove (channel_slaves, &chn->pub_key_hash, + chn_slv); + GNUNET_CONTAINER_multihashmap_destroy (chn_slv); } - GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, slv); + GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, slv); if (NULL != slv->join_req) GNUNET_free (slv->join_req); @@ -470,7 +471,7 @@ cleanup_slave (struct Slave *slv) GNUNET_free (slv->relays); if (NULL != slv->member) GNUNET_MULTICAST_member_part (slv->member); - GNUNET_CONTAINER_multihashmap_remove (slaves, &ch->pub_key_hash, ch); + GNUNET_CONTAINER_multihashmap_remove (slaves, &chn->pub_key_hash, chn); } @@ -478,18 +479,18 @@ cleanup_slave (struct Slave *slv) * Clean up channel data structures after a client disconnected. */ static void -cleanup_channel (struct Channel *ch) +cleanup_channel (struct Channel *chn) { - message_queue_drop (ch); - GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash); + message_queue_drop (chn); + GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &chn->pub_key_hash); - if (NULL != ch->store_op) - GNUNET_PSYCSTORE_operation_cancel (ch->store_op); + if (NULL != chn->store_op) + GNUNET_PSYCSTORE_operation_cancel (chn->store_op); - (GNUNET_YES == ch->is_master) - ? cleanup_master ((struct Master *) ch) - : cleanup_slave ((struct Slave *) ch); - GNUNET_free (ch); + (GNUNET_YES == chn->is_master) + ? cleanup_master ((struct Master *) chn) + : cleanup_slave ((struct Slave *) chn); + GNUNET_free (chn); } @@ -507,41 +508,41 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) return; struct Channel * - ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Client (%s) disconnected from channel %s\n", - ch, (GNUNET_YES == ch->is_master) ? "master" : "slave", - GNUNET_h2s (&ch->pub_key_hash)); + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); - if (NULL == ch) + if (NULL == chn) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "%p User context is NULL in client_disconnect()\n", ch); - GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p User context is NULL in client_disconnect()\n", chn); return; } - struct ClientList *cl = ch->clients_head; - while (NULL != cl) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Client (%s) disconnected from channel %s\n", + chn, (GNUNET_YES == chn->is_master) ? "master" : "slave", + GNUNET_h2s (&chn->pub_key_hash)); + + struct ClientListItem *cli = chn->clients_head; + while (NULL != cli) { - if (cl->client == client) + if (cli->client == client) { - GNUNET_CONTAINER_DLL_remove (ch->clients_head, ch->clients_tail, cl); - GNUNET_free (cl); + GNUNET_CONTAINER_DLL_remove (chn->clients_head, chn->clients_tail, cli); + GNUNET_free (cli); break; } - cl = cl->next; + cli = cli->next; } - if (NULL == ch->clients_head) + if (NULL == chn->clients_head) { /* Last client disconnected. */ - if (NULL != ch->tmit_head) + if (NULL != chn->tmit_head) { /* Send pending messages to multicast before cleanup. */ - transmit_message (ch); + transmit_message (chn); } else { - cleanup_channel (ch); + cleanup_channel (chn); } } } @@ -551,18 +552,18 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) * Send message to all clients connected to the channel. */ static void -msg_to_clients (const struct Channel *ch, - const struct GNUNET_MessageHeader *msg) +client_send_msg (const struct Channel *chn, + const struct GNUNET_MessageHeader *msg) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Sending message to clients.\n", ch); + "%p Sending message to clients.\n", chn); - struct ClientList *cl = ch->clients_head; - while (NULL != cl) + struct ClientListItem *cli = chn->clients_head; + while (NULL != cli) { - GNUNET_SERVER_notification_context_add (nc, cl->client); - GNUNET_SERVER_notification_context_unicast (nc, cl->client, msg, GNUNET_NO); - cl = cl->next; + GNUNET_SERVER_notification_context_add (nc, cli->client); + GNUNET_SERVER_notification_context_unicast (nc, cli->client, msg, GNUNET_NO); + cli = cli->next; } } @@ -573,7 +574,7 @@ msg_to_clients (const struct Channel *ch, struct JoinMemTestClosure { struct GNUNET_CRYPTO_EddsaPublicKey slave_key; - struct Channel *ch; + struct Channel *chn; struct GNUNET_MULTICAST_JoinHandle *jh; struct MasterJoinRequest *master_join_req; }; @@ -587,15 +588,15 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) { struct JoinMemTestClosure *jcls = cls; - if (GNUNET_NO == result && GNUNET_YES == jcls->ch->is_master) + if (GNUNET_NO == result && GNUNET_YES == jcls->chn->is_master) { /* Pass on join request to client if this is a master channel */ - struct Master *mst = (struct Master *) jcls->ch; + struct Master *mst = (struct Master *) jcls->chn; struct GNUNET_HashCode slave_key_hash; GNUNET_CRYPTO_hash (&jcls->slave_key, sizeof (jcls->slave_key), &slave_key_hash); GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - msg_to_clients (jcls->ch, &jcls->master_join_req->header); + client_send_msg (jcls->chn, &jcls->master_join_req->header); } else { @@ -611,13 +612,13 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) * Incoming join request from multicast. */ static void -mcast_join_request_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - const struct GNUNET_MessageHeader *join_msg, - struct GNUNET_MULTICAST_JoinHandle *jh) +mcast_recv_join_request (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) { - struct Channel *ch = cls; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch); + struct Channel *chn = cls; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", chn); uint16_t join_msg_size = 0; if (NULL != join_msg) @@ -630,7 +631,7 @@ mcast_join_request_cb (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%p Got join message with invalid type %u.\n", - ch, ntohs (join_msg->type)); + chn, ntohs (join_msg->type)); } } @@ -643,12 +644,12 @@ mcast_join_request_cb (void *cls, struct JoinMemTestClosure *jcls = GNUNET_malloc (sizeof (*jcls)); jcls->slave_key = *slave_key; - jcls->ch = ch; + jcls->chn = chn; jcls->jh = jh; jcls->master_join_req = req; - GNUNET_PSYCSTORE_membership_test (store, &ch->pub_key, slave_key, - ch->max_message_id, 0, + GNUNET_PSYCSTORE_membership_test (store, &chn->pub_key, slave_key, + chn->max_message_id, 0, &join_mem_test_cb, jcls); } @@ -657,14 +658,14 @@ mcast_join_request_cb (void *cls, * Join decision received from multicast. */ static void -mcast_join_decision_cb (void *cls, int is_admitted, +mcast_recv_join_decision (void *cls, int is_admitted, const struct GNUNET_PeerIdentity *peer, uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_resp) { struct Slave *slv = cls; - struct Channel *ch = &slv->ch; + struct Channel *chn = &slv->chn; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision: %d\n", slv, is_admitted); @@ -677,11 +678,11 @@ mcast_join_decision_cb (void *cls, int is_admitted, if (0 < join_resp_size) memcpy (&dcsn[1], join_resp, join_resp_size); - msg_to_clients (ch, &dcsn->header); + client_send_msg (chn, &dcsn->header); if (GNUNET_YES == is_admitted) { - ch->ready = GNUNET_YES; + chn->ready = GNUNET_YES; } else { @@ -691,20 +692,20 @@ mcast_join_decision_cb (void *cls, int is_admitted, static void -mcast_membership_test_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - uint64_t message_id, uint64_t group_generation, - struct GNUNET_MULTICAST_MembershipTestHandle *mth) +mcast_recv_membership_test (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + uint64_t message_id, uint64_t group_generation, + struct GNUNET_MULTICAST_MembershipTestHandle *mth) { } static void -mcast_replay_fragment_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - uint64_t fragment_id, uint64_t flags, - struct GNUNET_MULTICAST_ReplayHandle *rh) +mcast_recv_replay_fragment (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + uint64_t fragment_id, uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) { @@ -712,25 +713,17 @@ mcast_replay_fragment_cb (void *cls, static void -mcast_replay_message_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - uint64_t message_id, - uint64_t fragment_offset, - uint64_t flags, - struct GNUNET_MULTICAST_ReplayHandle *rh) +mcast_recv_replay_message (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) { } -static void -fragment_store_result (void *cls, int64_t result, const char *err_msg) -{ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "fragment_store() returned %l (%s)\n", result, err_msg); -} - - /** * Convert an uint64_t in network byte order to a HashCode * that can be used as key in a MultiHashMap @@ -772,17 +765,17 @@ hash_key_from_hll (struct GNUNET_HashCode *key, uint64_t n) * Send multicast message to all clients connected to the channel. */ static void -mmsg_to_clients (struct Channel *ch, - const struct GNUNET_MULTICAST_MessageHeader *mmsg) +client_send_mcast_msg (struct Channel *chn, + const struct GNUNET_MULTICAST_MessageHeader *mmsg) { - uint16_t size = ntohs (mmsg->header.size); struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t size = ntohs (mmsg->header.size); uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Sending message to client. " + "%p Sending multicast message to client. " "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", - ch, GNUNET_ntohll (mmsg->fragment_id), + chn, GNUNET_ntohll (mmsg->fragment_id), GNUNET_ntohll (mmsg->message_id)); pmsg = GNUNET_malloc (psize); @@ -791,7 +784,38 @@ mmsg_to_clients (struct Channel *ch, pmsg->message_id = mmsg->message_id; memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); - msg_to_clients (ch, &pmsg->header); + client_send_msg (chn, &pmsg->header); + GNUNET_free (pmsg); +} + + +/** + * Send multicast request to all clients connected to the channel. + */ +static void +client_send_mcast_req (struct Master *mst, + const struct GNUNET_MULTICAST_RequestHeader *req) +{ + struct Channel *chn = &mst->chn; + + struct GNUNET_PSYC_MessageHeader *pmsg; + uint16_t size = ntohs (req->header.size); + uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Sending multicast request to client. " + "fragment_id: %" PRIu64 ", message_id: %" PRIu64 "\n", + chn, GNUNET_ntohll (req->fragment_id), + GNUNET_ntohll (req->request_id)); + + pmsg = GNUNET_malloc (psize); + pmsg->header.size = htons (psize); + pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + pmsg->message_id = req->request_id; + pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); + + memcpy (&pmsg[1], &req[1], size - sizeof (*req)); + client_send_msg (chn, &pmsg->header); GNUNET_free (pmsg); } @@ -799,14 +823,14 @@ mmsg_to_clients (struct Channel *ch, /** * Insert a multicast message fragment into the queue belonging to the message. * - * @param ch Channel. + * @param chn Channel. * @param mmsg Multicast message fragment. * @param msg_id_hash Message ID of @a mmsg in a struct GNUNET_HashCode. * @param first_ptype First PSYC message part type in @a mmsg. * @param last_ptype Last PSYC message part type in @a mmsg. */ static void -fragment_queue_insert (struct Channel *ch, +fragment_queue_insert (struct Channel *chn, const struct GNUNET_MULTICAST_MessageHeader *mmsg, uint16_t first_ptype, uint16_t last_ptype) { @@ -814,13 +838,13 @@ fragment_queue_insert (struct Channel *ch, const uint64_t frag_offset = GNUNET_ntohll (mmsg->fragment_offset); struct GNUNET_CONTAINER_MultiHashMap *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, - &ch->pub_key_hash); + &chn->pub_key_hash); struct GNUNET_HashCode msg_id_hash; hash_key_from_nll (&msg_id_hash, mmsg->message_id); struct FragmentQueue - *fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); + *fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); if (NULL == fragq) { @@ -829,13 +853,13 @@ fragment_queue_insert (struct Channel *ch, fragq->fragments = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - GNUNET_CONTAINER_multihashmap_put (ch->recv_frags, &msg_id_hash, fragq, + GNUNET_CONTAINER_multihashmap_put (chn->recv_frags, &msg_id_hash, fragq, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); if (NULL == chan_msgs) { chan_msgs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - GNUNET_CONTAINER_multihashmap_put (recv_cache, &ch->pub_key_hash, chan_msgs, + GNUNET_CONTAINER_multihashmap_put (recv_cache, &chn->pub_key_hash, chan_msgs, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } } @@ -850,7 +874,7 @@ fragment_queue_insert (struct Channel *ch, "%p Adding message fragment to cache. " "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", " "header_size: %" PRIu64 " + %u).\n", - ch, GNUNET_ntohll (mmsg->message_id), + chn, GNUNET_ntohll (mmsg->message_id), GNUNET_ntohll (mmsg->fragment_id), fragq->header_size, size); cache_entry = GNUNET_new (struct RecvCacheEntry); @@ -867,7 +891,7 @@ fragment_queue_insert (struct Channel *ch, "%p Message fragment is already in cache. " "message_id: %" PRIu64 ", fragment_id: %" PRIu64 ", ref_count: %u\n", - ch, GNUNET_ntohll (mmsg->message_id), + chn, GNUNET_ntohll (mmsg->message_id), GNUNET_ntohll (mmsg->fragment_id), cache_entry->ref_count); } @@ -890,11 +914,11 @@ fragment_queue_insert (struct Channel *ch, { /* header is now complete */ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Header of message %" PRIu64 " is complete.\n", - ch, GNUNET_ntohll (mmsg->message_id)); + chn, GNUNET_ntohll (mmsg->message_id)); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Adding message %" PRIu64 " to queue.\n", - ch, GNUNET_ntohll (mmsg->message_id)); + chn, GNUNET_ntohll (mmsg->message_id)); fragq->state = MSG_FRAG_STATE_DATA; } else @@ -902,7 +926,7 @@ fragment_queue_insert (struct Channel *ch, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Header of message %" PRIu64 " is NOT complete yet: " "%" PRIu64 " != %" PRIu64 "\n", - ch, GNUNET_ntohll (mmsg->message_id), frag_offset, + chn, GNUNET_ntohll (mmsg->message_id), frag_offset, fragq->header_size); } } @@ -916,7 +940,7 @@ fragment_queue_insert (struct Channel *ch, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Message %" PRIu64 " is NOT complete yet: " "%" PRIu64 " != %" PRIu64 "\n", - ch, GNUNET_ntohll (mmsg->message_id), frag_offset, + chn, GNUNET_ntohll (mmsg->message_id), frag_offset, fragq->size); break; @@ -935,7 +959,7 @@ fragment_queue_insert (struct Channel *ch, case MSG_FRAG_STATE_CANCEL: if (GNUNET_NO == fragq->queued) { - GNUNET_CONTAINER_heap_insert (ch->recv_msgs, NULL, + GNUNET_CONTAINER_heap_insert (chn->recv_msgs, NULL, GNUNET_ntohll (mmsg->message_id)); fragq->queued = GNUNET_YES; } @@ -953,24 +977,24 @@ fragment_queue_insert (struct Channel *ch, * Send fragments of a message in order to client, after all modifiers arrived * from multicast. * - * @param ch Channel. + * @param chn Channel. * @param msg_id ID of the message @a fragq belongs to. * @param fragq Fragment queue of the message. * @param drop Drop message without delivering to client? * #GNUNET_YES or #GNUNET_NO. */ static void -fragment_queue_run (struct Channel *ch, uint64_t msg_id, +fragment_queue_run (struct Channel *chn, uint64_t msg_id, struct FragmentQueue *fragq, uint8_t drop) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Running message fragment queue for message %" PRIu64 " (state: %u).\n", - ch, msg_id, fragq->state); + chn, msg_id, fragq->state); struct GNUNET_CONTAINER_MultiHashMap *chan_msgs = GNUNET_CONTAINER_multihashmap_get (recv_cache, - &ch->pub_key_hash); + &chn->pub_key_hash); GNUNET_assert (NULL != chan_msgs); uint64_t frag_id; @@ -985,7 +1009,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id, { if (GNUNET_NO == drop) { - mmsg_to_clients (ch, cache_entry->mmsg); + client_send_mcast_msg (chn, cache_entry->mmsg); } if (cache_entry->ref_count <= 1) { @@ -1014,7 +1038,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id, struct GNUNET_HashCode msg_id_hash; hash_key_from_nll (&msg_id_hash, msg_id); - GNUNET_CONTAINER_multihashmap_remove (ch->recv_frags, &msg_id_hash, fragq); + GNUNET_CONTAINER_multihashmap_remove (chn->recv_frags, &msg_id_hash, fragq); GNUNET_CONTAINER_heap_destroy (fragq->fragments); GNUNET_free (fragq); } @@ -1034,33 +1058,33 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id, * - A stateful message is only sent if the previous stateful message * has already been delivered to the client. * - * @param ch Channel. + * @param chn Channel. * * @return Number of messages removed from queue and sent to client. */ static uint64_t -message_queue_run (struct Channel *ch) +message_queue_run (struct Channel *chn) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Running message queue.\n", ch); + "%p Running message queue.\n", chn); uint64_t n = 0; uint64_t msg_id; - while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, &msg_id)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Processing message %" PRIu64 " in queue.\n", ch, msg_id); + "%p Processing message %" PRIu64 " in queue.\n", chn, msg_id); struct GNUNET_HashCode msg_id_hash; hash_key_from_hll (&msg_id_hash, msg_id); struct FragmentQueue * - fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); + fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); if (NULL == fragq || fragq->state <= MSG_FRAG_STATE_HEADER) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p No fragq (%p) or header not complete.\n", - ch, fragq); + chn, fragq); break; } @@ -1070,40 +1094,40 @@ message_queue_run (struct Channel *ch) if (GNUNET_PSYC_STATE_NOT_MODIFIED == fragq->state_delta) { if (!(fragq->flags & GNUNET_PSYC_MESSAGE_ORDER_ANY) - && msg_id - 1 != ch->max_message_id) + && msg_id - 1 != chn->max_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Out of order message. " "(%" PRIu64 " - 1 != %" PRIu64 ")\n", - ch, msg_id, ch->max_message_id); + chn, msg_id, chn->max_message_id); break; } } else { - if (msg_id - fragq->state_delta != ch->max_state_message_id) + if (msg_id - fragq->state_delta != chn->max_state_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "%p Out of order stateful message. " "(%" PRIu64 " - %" PRIu64 " != %" PRIu64 ")\n", - ch, msg_id, fragq->state_delta, ch->max_state_message_id); + chn, msg_id, fragq->state_delta, chn->max_state_message_id); break; } #if TODO /* FIXME: apply modifiers to state in PSYCstore */ - GNUNET_PSYCSTORE_state_modify (store, &ch->pub_key, message_id, - state_modify_result_cb, cls); + GNUNET_PSYCSTORE_state_modify (store, &chn->pub_key, message_id, + store_recv_state_modify_result, cls); #endif - ch->max_state_message_id = msg_id; + chn->max_state_message_id = msg_id; } - ch->max_message_id = msg_id; + chn->max_message_id = msg_id; } - fragment_queue_run (ch, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); - GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); + fragment_queue_run (chn, msg_id, fragq, MSG_FRAG_STATE_DROP == fragq->state); + GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); n++; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Removed %" PRIu64 " messages from queue.\n", ch, n); + "%p Removed %" PRIu64 " messages from queue.\n", chn, n); return n; } @@ -1113,107 +1137,88 @@ message_queue_run (struct Channel *ch) * * Remove all messages in queue without sending it to clients. * - * @param ch Channel. + * @param chn Channel. * * @return Number of messages removed from queue. */ static uint64_t -message_queue_drop (struct Channel *ch) +message_queue_drop (struct Channel *chn) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Dropping message queue.\n", ch); + "%p Dropping message queue.\n", chn); uint64_t n = 0; uint64_t msg_id; - while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (chn->recv_msgs, NULL, &msg_id)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id); + "%p Dropping message %" PRIu64 " from queue.\n", chn, msg_id); struct GNUNET_HashCode msg_id_hash; hash_key_from_hll (&msg_id_hash, msg_id); struct FragmentQueue * - fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); + fragq = GNUNET_CONTAINER_multihashmap_get (chn->recv_frags, &msg_id_hash); - fragment_queue_run (ch, msg_id, fragq, GNUNET_YES); - GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); + fragment_queue_run (chn, msg_id, fragq, GNUNET_YES); + GNUNET_CONTAINER_heap_remove_root (chn->recv_msgs); n++; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Removed %" PRIu64 " messages from queue.\n", ch, n); + "%p Removed %" PRIu64 " messages from queue.\n", chn, n); return n; } /** - * Handle incoming message from multicast. - * - * @param ch Channel. - * @param mmsg Multicast message. - * - * @return #GNUNET_OK or #GNUNET_SYSERR + * Handle the result of a GNUNET_PSYCSTORE_fragment_store() operation. */ -static int -client_multicast_message (struct Channel *ch, - const struct GNUNET_MULTICAST_MessageHeader *mmsg) +static void +store_recv_fragment_store_result (void *cls, int64_t result, const char *err_msg) { - GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL); - - uint16_t size = ntohs (mmsg->header.size); - uint16_t first_ptype = 0, last_ptype = 0; - - if (GNUNET_SYSERR - == GNUNET_PSYC_check_message_parts (size - sizeof (*mmsg), - (const char *) &mmsg[1], - &first_ptype, &last_ptype)) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Received message with invalid parts from multicast. " - "Dropping message.\n", ch); - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - + struct Channel *chn = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Message parts: first: type %u, last: type %u\n", - first_ptype, last_ptype); - - fragment_queue_insert (ch, mmsg, first_ptype, last_ptype); - message_queue_run (ch); - - return GNUNET_OK; + "%p GNUNET_PSYCSTORE_fragment_store() returned %" PRId64 " (%s)\n", + chn, result, err_msg); } /** - * Incoming message fragment from multicast. + * Handle incoming message fragment from multicast. * - * Store it using PSYCstore and send it to the client of the channel. + * Store it using PSYCstore and send it to the clients of the channel in order. */ static void -mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) +mcast_recv_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *mmsg) { - struct Channel *ch = cls; - uint16_t type = ntohs (msg->type); - uint16_t size = ntohs (msg->size); + struct Channel *chn = cls; + uint16_t size = ntohs (mmsg->header.size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message of type %u and size %u from multicast.\n", - ch, type, size); + "%p Received multicast message of size %u.\n", + chn, size); - switch (type) - { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: + GNUNET_PSYCSTORE_fragment_store (store, &chn->pub_key, mmsg, 0, + &store_recv_fragment_store_result, chn); + + uint16_t first_ptype = 0, last_ptype = 0; + if (GNUNET_SYSERR + == GNUNET_PSYC_receive_check_parts (size - sizeof (*mmsg), + (const char *) &mmsg[1], + &first_ptype, &last_ptype)) { - client_multicast_message (ch, (const struct - GNUNET_MULTICAST_MessageHeader *) msg); - break; - } - default: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Dropping unknown message of type %u and size %u.\n", - ch, type, size); + "%p Dropping incoming multicast message with invalid parts.\n", + chn); + GNUNET_break_op (0); + return; } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Message parts: first: type %u, last: type %u\n", + first_ptype, last_ptype); + + fragment_queue_insert (chn, mmsg, first_ptype, last_ptype); + message_queue_run (chn); } @@ -1226,59 +1231,35 @@ mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) * @param flags Request flags. */ static void -mcast_request_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - const struct GNUNET_MessageHeader *msg, - enum GNUNET_MULTICAST_MessageFlags flags) +mcast_recv_request (void *cls, + const struct GNUNET_MULTICAST_RequestHeader *req) { struct Master *mst = cls; - struct Channel *ch = &mst->ch; - - uint16_t type = ntohs (msg->type); - uint16_t size = ntohs (msg->size); + uint16_t size = ntohs (req->header.size); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received request of type %u and size %u from multicast.\n", - ch, type, size); + "%p Received multicast request of size %u.\n", + mst, size); - switch (type) - { - case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: + uint16_t first_ptype = 0, last_ptype = 0; + if (GNUNET_SYSERR + == GNUNET_PSYC_receive_check_parts (size - sizeof (*req), + (const char *) &req[1], + &first_ptype, &last_ptype)) { - const struct GNUNET_MULTICAST_RequestHeader *req - = (const struct GNUNET_MULTICAST_RequestHeader *) msg; - - /* FIXME: see message_cb() */ - if (GNUNET_SYSERR == GNUNET_PSYC_check_message_parts (size - sizeof (*req), - (const char *) &req[1], - NULL, NULL)) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Dropping request with invalid parts " - "received from multicast.\n", ch); - GNUNET_break_op (0); - break; - } - - struct GNUNET_PSYC_MessageHeader *pmsg; - uint16_t psize = sizeof (*pmsg) + size - sizeof (*req); - pmsg = GNUNET_malloc (psize); - pmsg->header.size = htons (psize); - pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - pmsg->message_id = req->request_id; - pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); - - memcpy (&pmsg[1], &req[1], size - sizeof (*req)); - msg_to_clients (ch, &pmsg->header); - GNUNET_free (pmsg); - break; - } - default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Dropping unknown request of type %u and size %u.\n", - ch, type, size); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Dropping incoming multicast request with invalid parts.\n", + mst); GNUNET_break_op (0); + return; } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Message parts: first: type %u, last: type %u\n", + first_ptype, last_ptype); + + /* FIXME: in-order delivery */ + client_send_mcast_req (mst, req); } @@ -1286,13 +1267,13 @@ mcast_request_cb (void *cls, * Response from PSYCstore with the current counter values for a channel master. */ static void -master_counters_cb (void *cls, int result, uint64_t max_fragment_id, - uint64_t max_message_id, uint64_t max_group_generation, - uint64_t max_state_message_id) +store_recv_master_counters (void *cls, int result, uint64_t max_fragment_id, + uint64_t max_message_id, uint64_t max_group_generation, + uint64_t max_state_message_id) { struct Master *mst = cls; - struct Channel *ch = &mst->ch; - ch->store_op = NULL; + struct Channel *chn = &mst->chn; + chn->store_op = NULL; struct CountersResult res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); @@ -1303,28 +1284,28 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, if (GNUNET_OK == result || GNUNET_NO == result) { mst->max_message_id = max_message_id; - ch->max_message_id = max_message_id; - ch->max_state_message_id = max_state_message_id; + chn->max_message_id = max_message_id; + chn->max_state_message_id = max_state_message_id; mst->max_group_generation = max_group_generation; mst->origin = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, - &mcast_join_request_cb, - &mcast_membership_test_cb, - &mcast_replay_fragment_cb, - &mcast_replay_message_cb, - &mcast_request_cb, - &mcast_message_cb, ch); - ch->ready = GNUNET_YES; + &mcast_recv_join_request, + &mcast_recv_membership_test, + &mcast_recv_replay_fragment, + &mcast_recv_replay_message, + &mcast_recv_request, + &mcast_recv_message, chn); + chn->ready = GNUNET_YES; } else { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p GNUNET_PSYCSTORE_counters_get() " "returned %d for channel %s.\n", - ch, result, GNUNET_h2s (&ch->pub_key_hash)); + chn, result, GNUNET_h2s (&chn->pub_key_hash)); } - msg_to_clients (ch, &res.header); + client_send_msg (chn, &res.header); } @@ -1332,13 +1313,13 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, * Response from PSYCstore with the current counter values for a channel slave. */ void -slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, - uint64_t max_message_id, uint64_t max_group_generation, - uint64_t max_state_message_id) +store_recv_slave_counters (void *cls, int result, uint64_t max_fragment_id, + uint64_t max_message_id, uint64_t max_group_generation, + uint64_t max_state_message_id) { struct Slave *slv = cls; - struct Channel *ch = &slv->ch; - ch->store_op = NULL; + struct Channel *chn = &slv->chn; + chn->store_op = NULL; struct CountersResult res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); @@ -1348,38 +1329,38 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, if (GNUNET_OK == result || GNUNET_NO == result) { - ch->max_message_id = max_message_id; - ch->max_state_message_id = max_state_message_id; + chn->max_message_id = max_message_id; + chn->max_state_message_id = max_state_message_id; slv->member - = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key, + = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, &slv->origin, slv->relay_count, slv->relays, slv->join_req, - &mcast_join_request_cb, - &mcast_join_decision_cb, - &mcast_membership_test_cb, - &mcast_replay_fragment_cb, - &mcast_replay_message_cb, - &mcast_message_cb, ch); + &mcast_recv_join_request, + &mcast_recv_join_decision, + &mcast_recv_membership_test, + &mcast_recv_replay_fragment, + &mcast_recv_replay_message, + &mcast_recv_message, chn); } else { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p GNUNET_PSYCSTORE_counters_get() " "returned %d for channel %s.\n", - ch, result, GNUNET_h2s (&ch->pub_key_hash)); + chn, result, GNUNET_h2s (&chn->pub_key_hash)); } - msg_to_clients (ch, &res.header); + client_send_msg (chn, &res.header); } static void -channel_init (struct Channel *ch) +channel_init (struct Channel *chn) { - ch->recv_msgs + chn->recv_msgs = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - ch->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + chn->recv_frags = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); } @@ -1387,8 +1368,8 @@ channel_init (struct Channel *ch) * Handle a connecting client starting a channel master. */ static void -client_master_start (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_master_start (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { const struct MasterStartRequest *req = (const struct MasterStartRequest *) msg; @@ -1401,7 +1382,7 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client, struct Master * mst = GNUNET_CONTAINER_multihashmap_get (masters, &pub_key_hash); - struct Channel *ch; + struct Channel *chn; if (NULL == mst) { @@ -1410,20 +1391,20 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client, mst->priv_key = req->channel_key; mst->join_reqs = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); - ch = &mst->ch; - ch->is_master = GNUNET_YES; - ch->pub_key = pub_key; - ch->pub_key_hash = pub_key_hash; - channel_init (ch); + chn = &mst->chn; + chn->is_master = GNUNET_YES; + chn->pub_key = pub_key; + chn->pub_key_hash = pub_key_hash; + channel_init (chn); - GNUNET_CONTAINER_multihashmap_put (masters, &ch->pub_key_hash, ch, + GNUNET_CONTAINER_multihashmap_put (masters, &chn->pub_key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, - master_counters_cb, mst); + chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, + store_recv_master_counters, mst); } else { - ch = &mst->ch; + chn = &mst->chn; struct CountersResult res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); @@ -1438,13 +1419,13 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client connected as master to channel %s.\n", - mst, GNUNET_h2s (&ch->pub_key_hash)); + mst, GNUNET_h2s (&chn->pub_key_hash)); - struct ClientList *cl = GNUNET_new (struct ClientList); - cl->client = client; - GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl); + struct ClientListItem *cli = GNUNET_new (struct ClientListItem); + cli->client = client; + GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); - GNUNET_SERVER_client_set_user_context (client, ch); + GNUNET_SERVER_client_set_user_context (client, chn); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1453,8 +1434,8 @@ client_master_start (void *cls, struct GNUNET_SERVER_Client *client, * Handle a connecting client joining as a channel slave. */ static void -client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_slave_join (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { const struct SlaveJoinRequest *req = (const struct SlaveJoinRequest *) msg; @@ -1467,13 +1448,13 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_CRYPTO_hash (&req->channel_key, sizeof (req->channel_key), &pub_key_hash); struct GNUNET_CONTAINER_MultiHashMap * - ch_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); + chn_slv = GNUNET_CONTAINER_multihashmap_get (channel_slaves, &pub_key_hash); struct Slave *slv = NULL; - struct Channel *ch; + struct Channel *chn; - if (NULL != ch_slv) + if (NULL != chn_slv) { - slv = GNUNET_CONTAINER_multihashmap_get (ch_slv, &slv_pub_key_hash); + slv = GNUNET_CONTAINER_multihashmap_get (chn_slv, &slv_pub_key_hash); } if (NULL == slv) { @@ -1494,34 +1475,34 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); } - ch = &slv->ch; - ch->is_master = GNUNET_NO; - ch->pub_key = req->channel_key; - ch->pub_key_hash = pub_key_hash; - channel_init (ch); + chn = &slv->chn; + chn->is_master = GNUNET_NO; + chn->pub_key = req->channel_key; + chn->pub_key_hash = pub_key_hash; + channel_init (chn); - if (NULL == ch_slv) + if (NULL == chn_slv) { - ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, ch_slv, + chn_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + GNUNET_CONTAINER_multihashmap_put (channel_slaves, &chn->pub_key_hash, chn_slv, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } - GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch, + GNUNET_CONTAINER_multihashmap_put (chn_slv, &slv->pub_key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch, + GNUNET_CONTAINER_multihashmap_put (slaves, &chn->pub_key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - ch->store_op = GNUNET_PSYCSTORE_counters_get (store, &ch->pub_key, - slave_counters_cb, slv); + chn->store_op = GNUNET_PSYCSTORE_counters_get (store, &chn->pub_key, + &store_recv_slave_counters, slv); } else { - ch = &slv->ch; + chn = &slv->chn; struct CountersResult res; res.header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); res.header.size = htons (sizeof (res)); res.result_code = htonl (GNUNET_OK); - res.max_message_id = GNUNET_htonll (ch->max_message_id); + res.max_message_id = GNUNET_htonll (chn->max_message_id); GNUNET_SERVER_notification_context_add (nc, client); GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, @@ -1530,16 +1511,16 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, if (NULL == slv->member) { slv->member - = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key, + = GNUNET_MULTICAST_member_join (cfg, &chn->pub_key, &slv->priv_key, &slv->origin, slv->relay_count, slv->relays, slv->join_req, - &mcast_join_request_cb, - &mcast_join_decision_cb, - &mcast_membership_test_cb, - &mcast_replay_fragment_cb, - &mcast_replay_message_cb, - &mcast_message_cb, ch); + &mcast_recv_join_request, + &mcast_recv_join_decision, + &mcast_recv_membership_test, + &mcast_recv_replay_fragment, + &mcast_recv_replay_message, + &mcast_recv_message, chn); } else if (NULL != slv->join_dcsn) @@ -1553,13 +1534,13 @@ client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Client connected as slave to channel %s.\n", - slv, GNUNET_h2s (&ch->pub_key_hash)); + slv, GNUNET_h2s (&chn->pub_key_hash)); - struct ClientList *cl = GNUNET_new (struct ClientList); - cl->client = client; - GNUNET_CONTAINER_DLL_insert (ch->clients_head, ch->clients_tail, cl); + struct ClientListItem *cli = GNUNET_new (struct ClientListItem); + cli->client = client; + GNUNET_CONTAINER_DLL_insert (chn->clients_head, chn->clients_tail, cli); - GNUNET_SERVER_client_set_user_context (client, &slv->ch); + GNUNET_SERVER_client_set_user_context (client, &slv->chn); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1575,8 +1556,8 @@ struct JoinDecisionClosure * Iterator callback for responding to join requests of a slave. */ static int -send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *jh) +mcast_send_join_decision (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *jh) { struct JoinDecisionClosure *jcls = cls; // FIXME: add relays @@ -1589,13 +1570,13 @@ send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, * Join decision from client. */ static void -client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_join_decision (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { struct Channel * - ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_assert (GNUNET_YES == ch->is_master); - struct Master *mst = (struct Master *) ch; + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (GNUNET_YES == chn->is_master); + struct Master *mst = (struct Master *) chn; struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg; struct JoinDecisionClosure jcls; @@ -1612,13 +1593,13 @@ client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision (%d) from client for channel %s..\n", - mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash)); + mst, jcls.is_admitted, GNUNET_h2s (&chn->pub_key_hash)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p ..and slave %s.\n", mst, GNUNET_h2s (&slave_key_hash)); GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash, - &send_join_decision_cb, &jcls); + &mcast_send_join_decision, &jcls); GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1629,10 +1610,10 @@ client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, * * Sent after a message fragment has been passed on to multicast. * - * @param ch The channel struct for the client. + * @param chn The channel struct for the client. */ static void -send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client) +send_message_ack (struct Channel *chn, struct GNUNET_SERVER_Client *client) { struct GNUNET_MessageHeader res; res.size = htons (sizeof (res)); @@ -1650,40 +1631,40 @@ send_message_ack (struct Channel *ch, struct GNUNET_SERVER_Client *client) static int transmit_notify (void *cls, size_t *data_size, void *data) { - struct Channel *ch = cls; - struct TransmitMessage *tmit_msg = ch->tmit_head; + struct Channel *chn = cls; + struct TransmitMessage *tmit_msg = chn->tmit_head; if (NULL == tmit_msg || *data_size < tmit_msg->size) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p transmit_notify: nothing to send.\n", ch); + "%p transmit_notify: nothing to send.\n", chn); *data_size = 0; return GNUNET_NO; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p transmit_notify: sending %u bytes.\n", ch, tmit_msg->size); + "%p transmit_notify: sending %u bytes.\n", chn, tmit_msg->size); *data_size = tmit_msg->size; memcpy (data, &tmit_msg[1], *data_size); - int ret = (MSG_STATE_END < ch->tmit_state) ? GNUNET_NO : GNUNET_YES; + int ret = (MSG_STATE_END < chn->tmit_state) ? GNUNET_NO : GNUNET_YES; if (NULL != tmit_msg->client) - send_message_ack (ch, tmit_msg->client); + send_message_ack (chn, tmit_msg->client); - GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, tmit_msg); + GNUNET_CONTAINER_DLL_remove (chn->tmit_head, chn->tmit_tail, tmit_msg); GNUNET_free (tmit_msg); - if (0 == ch->tmit_task) + if (0 == chn->tmit_task) { - if (NULL != ch->tmit_head) + if (NULL != chn->tmit_head) { - transmit_message (ch); + transmit_message (chn); } - else if (ch->disconnected) + else if (chn->disconnected) { /* FIXME: handle partial message (when still in_transmit) */ - cleanup_channel (ch); + cleanup_channel (chn); } } @@ -1732,7 +1713,7 @@ static void master_transmit_message (struct Master *mst) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p master_transmit_message()\n", mst); - mst->ch.tmit_task = 0; + mst->chn.tmit_task = 0; if (NULL == mst->tmit_handle) { mst->tmit_handle @@ -1753,7 +1734,7 @@ master_transmit_message (struct Master *mst) static void slave_transmit_message (struct Slave *slv) { - slv->ch.tmit_task = 0; + slv->chn.tmit_task = 0; if (NULL == slv->tmit_handle) { slv->tmit_handle @@ -1768,11 +1749,11 @@ slave_transmit_message (struct Slave *slv) static inline void -transmit_message (struct Channel *ch) +transmit_message (struct Channel *chn) { - ch->is_master - ? master_transmit_message ((struct Master *) ch) - : slave_transmit_message ((struct Slave *) ch); + chn->is_master + ? master_transmit_message ((struct Master *) chn) + : slave_transmit_message ((struct Slave *) chn); } @@ -1828,7 +1809,7 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, /** * Queue PSYC message parts for sending to multicast. * - * @param ch Channel to send to. + * @param chn Channel to send to. * @param client Client the message originates from. * @param data_size Size of @a data. * @param data Concatenated message parts. @@ -1836,7 +1817,7 @@ slave_queue_message (struct Slave *slv, struct TransmitMessage *tmit_msg, * @param last_ptype Last message part type in @a data. */ static void -queue_message (struct Channel *ch, +queue_message (struct Channel *chn, struct GNUNET_SERVER_Client *client, size_t data_size, const void *data, @@ -1847,14 +1828,14 @@ queue_message (struct Channel *ch, memcpy (&tmit_msg[1], data, data_size); tmit_msg->client = client; tmit_msg->size = data_size; - tmit_msg->state = ch->tmit_state; + tmit_msg->state = chn->tmit_state; - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); + GNUNET_CONTAINER_DLL_insert_tail (chn->tmit_head, chn->tmit_tail, tmit_msg); - ch->is_master - ? master_queue_message ((struct Master *) ch, tmit_msg, + chn->is_master + ? master_queue_message ((struct Master *) chn, tmit_msg, first_ptype, last_ptype) - : slave_queue_message ((struct Slave *) ch, tmit_msg, + : slave_queue_message ((struct Slave *) chn, tmit_msg, first_ptype, last_ptype); } @@ -1862,11 +1843,11 @@ queue_message (struct Channel *ch, /** * Cancel transmission of current message. * - * @param ch Channel to send to. + * @param chn Channel to send to. * @param client Client the message originates from. */ static void -transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client) +transmit_cancel (struct Channel *chn, struct GNUNET_SERVER_Client *client) { uint16_t type = GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL; @@ -1874,8 +1855,8 @@ transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client) msg.size = htons (sizeof (msg)); msg.type = htons (type); - queue_message (ch, client, sizeof (msg), &msg, type, type); - transmit_message (ch); + queue_message (chn, client, sizeof (msg), &msg, type, type); + transmit_message (chn); /* FIXME: cleanup */ } @@ -1885,21 +1866,21 @@ transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client) * Incoming message from a master or slave client. */ static void -client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { struct Channel * - ch = GNUNET_SERVER_client_get_user_context (client, struct Channel); - GNUNET_assert (NULL != ch); + chn = GNUNET_SERVER_client_get_user_context (client, struct Channel); + GNUNET_assert (NULL != chn); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Received message from client.\n", ch); + "%p Received message from client.\n", chn); GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, msg); - if (GNUNET_YES != ch->ready) + if (GNUNET_YES != chn->ready) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Channel is not ready, dropping message from client.\n", ch); + "%p Channel is not ready, dropping message from client.\n", chn); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } @@ -1907,30 +1888,30 @@ client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, uint16_t size = ntohs (msg->size); if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", ch); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", chn); GNUNET_break (0); - transmit_cancel (ch, client); + transmit_cancel (chn, client); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } uint16_t first_ptype = 0, last_ptype = 0; if (GNUNET_SYSERR - == GNUNET_PSYC_check_message_parts (size - sizeof (*msg), + == GNUNET_PSYC_receive_check_parts (size - sizeof (*msg), (const char *) &msg[1], &first_ptype, &last_ptype)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "%p Received invalid message part from client.\n", ch); + "%p Received invalid message part from client.\n", chn); GNUNET_break (0); - transmit_cancel (ch, client); + transmit_cancel (chn, client); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - queue_message (ch, client, size - sizeof (*msg), &msg[1], + queue_message (chn, client, size - sizeof (*msg), &msg[1], first_ptype, last_ptype); - transmit_message (ch); + transmit_message (chn); GNUNET_SERVER_receive_done (client, GNUNET_OK); }; @@ -1940,8 +1921,8 @@ client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, * Client requests to add a slave to the membership database. */ static void -client_slave_add (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_slave_add (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { } @@ -1951,8 +1932,8 @@ client_slave_add (void *cls, struct GNUNET_SERVER_Client *client, * Client requests to remove a slave from the membership database. */ static void -client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { } @@ -1962,8 +1943,8 @@ client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, * Client requests channel history from PSYCstore. */ static void -client_story_request (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_story_request (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { } @@ -1973,8 +1954,8 @@ client_story_request (void *cls, struct GNUNET_SERVER_Client *client, * Client requests best matching state variable from PSYCstore. */ static void -client_state_get (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_state_get (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { } @@ -1984,8 +1965,8 @@ client_state_get (void *cls, struct GNUNET_SERVER_Client *client, * Client requests state variables with a given prefix from PSYCstore. */ static void -client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *msg) +client_recv_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) { } @@ -2003,32 +1984,34 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { static const struct GNUNET_SERVER_MessageHandler handlers[] = { - { &client_master_start, NULL, + { &client_recv_master_start, NULL, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 }, - { &client_slave_join, NULL, + { &client_recv_slave_join, NULL, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, - { &client_join_decision, NULL, + { &client_recv_join_decision, NULL, GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 }, - { &client_psyc_message, NULL, + { &client_recv_psyc_message, NULL, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, - { &client_slave_add, NULL, + { &client_recv_slave_add, NULL, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 }, - { &client_slave_remove, NULL, + { &client_recv_slave_remove, NULL, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 }, - { &client_story_request, NULL, + { &client_recv_story_request, NULL, GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, - { &client_state_get, NULL, + { &client_recv_state_get, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, - { &client_state_get_prefix, NULL, - GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 } + { &client_recv_state_get_prefix, NULL, + GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }, + + { NULL, NULL, 0, 0 } }; cfg = c; diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index 7ec9d21b72..bfb6f43fb2 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -37,29 +37,11 @@ #include "gnunet_env_lib.h" #include "gnunet_multicast_service.h" #include "gnunet_psyc_service.h" +#include "gnunet_psyc_util_lib.h" #include "psyc.h" #define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) -struct MessageQueue -{ - struct MessageQueue *prev; - struct MessageQueue *next; - /* Followed by struct GNUNET_MessageHeader msg */ -}; - - -/** - * Handle for a pending PSYC transmission operation. - */ -struct GNUNET_PSYC_ChannelTransmitHandle -{ - struct GNUNET_PSYC_Channel *ch; - GNUNET_PSYC_TransmitNotifyModifier notify_mod; - GNUNET_PSYC_TransmitNotifyData notify_data; - void *notify_cls; - enum MessageState state; -}; /** * Handle to access PSYC channel operations for both the master and slaves. @@ -67,109 +49,29 @@ struct GNUNET_PSYC_ChannelTransmitHandle struct GNUNET_PSYC_Channel { /** - * Transmission handle; - */ - struct GNUNET_PSYC_ChannelTransmitHandle tmit; - - /** * Configuration to use. */ const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). - */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Currently pending transmission request, or NULL for none. + * Client connection to the service. */ - struct GNUNET_CLIENT_TransmitHandle *th; + struct GNUNET_CLIENT_MANAGER_Connection *client; /** - * Head of messages to transmit to the service. - */ - struct MessageQueue *tmit_head; - - /** - * Tail of operations to transmit to the service. + * Transmission handle; */ - struct MessageQueue *tmit_tail; + struct GNUNET_PSYC_TransmitHandle *tmit; /** - * Message currently being transmitted to the service. + * Receipt handle; */ - struct MessageQueue *tmit_msg; + struct GNUNET_PSYC_ReceiveHandle *recv; /** * Message to send on reconnect. */ - struct GNUNET_MessageHeader *reconnect_msg; - - /** - * Task doing exponential back-off trying to reconnect. - */ - GNUNET_SCHEDULER_TaskIdentifier reconnect_task; - - /** - * Time for next connect retry. - */ - struct GNUNET_TIME_Relative reconnect_delay; - - /** - * Message part callback. - */ - GNUNET_PSYC_MessageCallback message_cb; - - /** - * Message part callback for historic message. - */ - GNUNET_PSYC_MessageCallback hist_message_cb; - - /** - * Closure for @a message_cb. - */ - void *cb_cls; - - /** - * ID of the message being received from the PSYC service. - */ - uint64_t recv_message_id; - - /** - * Public key of the slave from which a message is being received. - */ - struct GNUNET_CRYPTO_EddsaPublicKey recv_slave_key; - - /** - * State of the currently being received message from the PSYC service. - */ - enum MessageState recv_state; - - /** - * Flags for the currently being received message from the PSYC service. - */ - enum GNUNET_PSYC_MessageFlags recv_flags; - - /** - * Expected value size for the modifier being received from the PSYC service. - */ - uint32_t recv_mod_value_size_expected; - - /** - * Actual value size for the modifier being received from the PSYC service. - */ - uint32_t recv_mod_value_size; - - /** - * Is transmission paused? - */ - uint8_t tmit_paused; - - /** - * Are we still waiting for a PSYC_TRANSMIT_ACK? - */ - uint8_t tmit_ack_pending; + struct GNUNET_MessageHeader *connect_msg; /** * Are we polling for incoming messages right now? @@ -177,11 +79,6 @@ struct GNUNET_PSYC_Channel uint8_t in_receive; /** - * Are we currently transmitting a message? - */ - uint8_t in_transmit; - - /** * Is this a master or slave channel? */ uint8_t is_master; @@ -193,7 +90,7 @@ struct GNUNET_PSYC_Channel */ struct GNUNET_PSYC_Master { - struct GNUNET_PSYC_Channel ch; + struct GNUNET_PSYC_Channel chn; GNUNET_PSYC_MasterStartCallback start_cb; @@ -201,6 +98,11 @@ struct GNUNET_PSYC_Master * Join request callback. */ GNUNET_PSYC_JoinRequestCallback join_req_cb; + + /** + * Closure for the callbacks. + */ + void *cb_cls; }; @@ -209,11 +111,16 @@ struct GNUNET_PSYC_Master */ struct GNUNET_PSYC_Slave { - struct GNUNET_PSYC_Channel ch; + struct GNUNET_PSYC_Channel chn; GNUNET_PSYC_SlaveConnectCallback connect_cb; GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; + + /** + * Closure for the callbacks. + */ + void *cb_cls; }; @@ -258,934 +165,170 @@ struct GNUNET_PSYC_StateQuery static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - - -static void -channel_transmit_data (struct GNUNET_PSYC_Channel *ch); - - -/** - * Reschedule a connect attempt to the service. - * - * @param ch Channel to reconnect. - */ -static void -reschedule_connect (struct GNUNET_PSYC_Channel *ch) +channel_send_connect_msg (struct GNUNET_PSYC_Channel *chn) { - GNUNET_assert (ch->reconnect_task == GNUNET_SCHEDULER_NO_TASK); - - if (NULL != ch->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); - ch->th = NULL; - } - if (NULL != ch->client) - { - GNUNET_CLIENT_disconnect (ch->client); - ch->client = NULL; - } - ch->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to PSYC service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (ch->reconnect_delay, GNUNET_YES)); - ch->reconnect_task = - GNUNET_SCHEDULER_add_delayed (ch->reconnect_delay, &reconnect, ch); - ch->reconnect_delay = GNUNET_TIME_STD_BACKOFF (ch->reconnect_delay); + uint16_t cmsg_size = ntohs (chn->connect_msg->size); + struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); + memcpy (cmsg, chn->connect_msg, cmsg_size); + GNUNET_CLIENT_MANAGER_transmit_now (chn->client, cmsg); } -/** - * Schedule transmission of the next message from our queue. - * - * @param ch PSYC channel handle - */ -static void -transmit_next (struct GNUNET_PSYC_Channel *ch); - - -/** - * Reset stored data related to the last received message. - */ static void -recv_reset (struct GNUNET_PSYC_Channel *ch) +channel_recv_disconnect (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - ch->recv_state = MSG_STATE_START; - ch->recv_flags = 0; - ch->recv_message_id = 0; - //FIXME: ch->recv_slave_key = { 0 }; - ch->recv_mod_value_size = 0; - ch->recv_mod_value_size_expected = 0; + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + GNUNET_CLIENT_MANAGER_reconnect (client); + channel_send_connect_msg (chn); } static void -recv_error (struct GNUNET_PSYC_Channel *ch) +channel_recv_message (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - GNUNET_PSYC_MessageCallback message_cb - = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC - ? ch->hist_message_cb - : ch->message_cb; - - if (NULL != message_cb) - message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, NULL); - - recv_reset (ch); + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + GNUNET_PSYC_receive_message (chn->recv, + (const struct GNUNET_PSYC_MessageHeader *) msg); } -/** - * Queue a message part for transmission to the PSYC service. - * - * The message part is added to the current message buffer. - * When this buffer is full, it is added to the transmission queue. - * - * @param ch Channel struct for the client. - * @param msg Modifier message part, or NULL when there's no more modifiers. - * @param end End of message. - */ static void -queue_message (struct GNUNET_PSYC_Channel *ch, - const struct GNUNET_MessageHeader *msg, - uint8_t end) +channel_recv_message_ack (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - uint16_t size = msg ? ntohs (msg->size) : 0; - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Queueing message of type %u and size %u (end: %u)).\n", - ntohs (msg->type), size, end); - - struct MessageQueue *mq = ch->tmit_msg; - struct GNUNET_MessageHeader *qmsg = NULL; - if (NULL != mq) - { - qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - if (NULL == msg - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < qmsg->size + size) - { - /* End of message or buffer is full, add it to transmission queue - * and start with empty buffer */ - qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - qmsg->size = htons (qmsg->size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); - ch->tmit_msg = mq = NULL; - ch->tmit_ack_pending++; - } - else - { - /* Message fits in current buffer, append */ - ch->tmit_msg - = mq = GNUNET_realloc (mq, sizeof (*mq) + qmsg->size + size); - qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - memcpy ((char *) qmsg + qmsg->size, msg, size); - qmsg->size += size; - } - } - - if (NULL == mq && NULL != msg) - { - /* Empty buffer, copy over message. */ - ch->tmit_msg - = mq = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) + size); - qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - qmsg->size = sizeof (*qmsg) + size; - memcpy (&qmsg[1], msg, size); - } - - if (NULL != mq - && (GNUNET_YES == end - || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD - < qmsg->size + sizeof (struct GNUNET_MessageHeader)))) - { - /* End of message or buffer is full, add it to transmission queue. */ - qmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); - qmsg->size = htons (qmsg->size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); - ch->tmit_msg = mq = NULL; - ch->tmit_ack_pending++; - } - - if (GNUNET_YES == end) - ch->in_transmit = GNUNET_NO; - - transmit_next (ch); + struct GNUNET_PSYC_Channel * + chn = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*chn)); + GNUNET_PSYC_transmit_got_ack (chn->tmit); } -/** - * Request a modifier from a client to transmit. - * - * @param mst Master handle. - */ static void -channel_transmit_mod (struct GNUNET_PSYC_Channel *ch) +master_recv_start_ack (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - uint16_t max_data_size, data_size; - char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; - struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; - int notify_ret; - - switch (ch->tmit.state) - { - case MSG_STATE_MODIFIER: - { - struct GNUNET_PSYC_MessageModifier *mod - = (struct GNUNET_PSYC_MessageModifier *) msg; - max_data_size = data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); - msg->size = sizeof (struct GNUNET_PSYC_MessageModifier); - notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, &data_size, &mod[1], - &mod->oper, &mod->value_size); - mod->name_size = strnlen ((char *) &mod[1], data_size); - if (mod->name_size < data_size) - { - mod->value_size = htonl (mod->value_size); - mod->name_size = htons (mod->name_size); - } - else if (0 < data_size) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n"); - notify_ret = GNUNET_SYSERR; - } - break; - } - case MSG_STATE_MOD_CONT: - { - max_data_size = data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT); - msg->size = sizeof (struct GNUNET_MessageHeader); - notify_ret = ch->tmit.notify_mod (ch->tmit.notify_cls, - &data_size, &msg[1], NULL, NULL); - break; - } - default: - GNUNET_assert (0); - } - - switch (notify_ret) - { - case GNUNET_NO: - if (0 == data_size) - { /* Transmission paused, nothing to send. */ - ch->tmit_paused = GNUNET_YES; - return; - } - ch->tmit.state = MSG_STATE_MOD_CONT; - break; - - case GNUNET_YES: - if (0 == data_size) - { - /* End of modifiers. */ - ch->tmit.state = MSG_STATE_DATA; - if (0 == ch->tmit_ack_pending) - channel_transmit_data (ch); - - return; - } - ch->tmit.state = MSG_STATE_MODIFIER; - break; - - default: - LOG (GNUNET_ERROR_TYPE_ERROR, - "MasterTransmitNotifyModifier returned error " - "when requesting a modifier.\n"); - - ch->tmit.state = MSG_STATE_CANCEL; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); - msg->size = htons (sizeof (*msg)); - - queue_message (ch, msg, GNUNET_YES); - return; - } - - if (0 < data_size) - { - GNUNET_assert (data_size <= max_data_size); - msg->size = htons (msg->size + data_size); - queue_message (ch, msg, GNUNET_NO); - } - - channel_transmit_mod (ch); -} + struct GNUNET_PSYC_Master * + mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, + sizeof (struct GNUNET_PSYC_Channel)); - -/** - * Request data from a client to transmit. - * - * @param mst Master handle. - */ -static void -channel_transmit_data (struct GNUNET_PSYC_Channel *ch) -{ - uint16_t data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD; - char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = ""; - struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data; - - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); - - int notify_ret = ch->tmit.notify_data (ch->tmit.notify_cls, - &data_size, &msg[1]); - switch (notify_ret) - { - case GNUNET_NO: - if (0 == data_size) - { - /* Transmission paused, nothing to send. */ - ch->tmit_paused = GNUNET_YES; - return; - } - break; - - case GNUNET_YES: - ch->tmit.state = MSG_STATE_END; - break; - - default: - LOG (GNUNET_ERROR_TYPE_ERROR, - "MasterTransmitNotify returned error when requesting data.\n"); - - ch->tmit.state = MSG_STATE_CANCEL; - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL); - msg->size = htons (sizeof (*msg)); - queue_message (ch, msg, GNUNET_YES); - return; - } - - if (0 < data_size) - { - GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD); - msg->size = htons (sizeof (*msg) + data_size); - queue_message (ch, msg, !notify_ret); - } - - /* End of message. */ - if (GNUNET_YES == notify_ret) - { - msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END); - msg->size = htons (sizeof (*msg)); - queue_message (ch, msg, GNUNET_YES); - } + struct CountersResult *cres = (struct CountersResult *) msg; + if (NULL != mst->start_cb) + mst->start_cb (mst->cb_cls, GNUNET_ntohll (cres->max_message_id)); } -/** - * Send a message to a channel. - * - * @param ch Handle to the PSYC channel. - * @param method_name Which method should be invoked. - * @param notify_mod Function to call to obtain modifiers. - * @param notify_data Function to call to obtain fragments of the data. - * @param notify_cls Closure for @a notify_mod and @a notify_data. - * @param flags Flags for the message being transmitted. - * - * @return Transmission handle, NULL on error (i.e. more than one request queued). - */ -static struct GNUNET_PSYC_ChannelTransmitHandle * -channel_transmit (struct GNUNET_PSYC_Channel *ch, - const char *method_name, - GNUNET_PSYC_TransmitNotifyModifier notify_mod, - GNUNET_PSYC_TransmitNotifyData notify_data, - void *notify_cls, - uint32_t flags) -{ - if (GNUNET_NO != ch->in_transmit) - return NULL; - ch->in_transmit = GNUNET_YES; - - size_t size = strlen (method_name) + 1; - struct GNUNET_PSYC_MessageMethod *pmeth; - struct GNUNET_MessageHeader *qmsg; - struct MessageQueue * - mq = ch->tmit_msg = GNUNET_malloc (sizeof (*mq) + sizeof (*qmsg) - + sizeof (*pmeth) + size); - qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - qmsg->size = sizeof (*qmsg) + sizeof (*pmeth) + size; - - pmeth = (struct GNUNET_PSYC_MessageMethod *) &qmsg[1]; - pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); - pmeth->header.size = htons (sizeof (*pmeth) + size); - pmeth->flags = htonl (flags); - memcpy (&pmeth[1], method_name, size); - - ch->tmit.ch = ch; - ch->tmit.notify_mod = notify_mod; - ch->tmit.notify_data = notify_data; - ch->tmit.notify_cls = notify_cls; - ch->tmit.state = MSG_STATE_MODIFIER; - - channel_transmit_mod (ch); - return &ch->tmit; -} - - -/** - * Resume transmission to the channel. - * - * @param th Handle of the request that is being resumed. - */ static void -channel_transmit_resume (struct GNUNET_PSYC_ChannelTransmitHandle *th) +master_recv_join_request (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_PSYC_Channel *ch = th->ch; - if (0 == ch->tmit_ack_pending) - { - ch->tmit_paused = GNUNET_NO; - channel_transmit_data (ch); - } -} + struct GNUNET_PSYC_Master * + mst = GNUNET_CLIENT_MANAGER_get_user_context_ (client, + sizeof (struct GNUNET_PSYC_Channel)); + const struct MasterJoinRequest *req = (const struct MasterJoinRequest *) msg; -/** - * Abort transmission request to channel. - * - * @param th Handle of the request that is being aborted. - */ -static void -channel_transmit_cancel (struct GNUNET_PSYC_ChannelTransmitHandle *th) -{ - struct GNUNET_PSYC_Channel *ch = th->ch; - if (GNUNET_NO == ch->in_transmit) - return; -} + struct GNUNET_PSYC_MessageHeader *pmsg = NULL; + if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*pmsg)) + pmsg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; + struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + jh->mst = mst; + jh->slave_key = req->slave_key; -/** - * Handle incoming message from the PSYC service. - * - * @param ch The channel the message is sent to. - * @param pmsg The message. - */ -static void -handle_psyc_message (struct GNUNET_PSYC_Channel *ch, - const struct GNUNET_PSYC_MessageHeader *msg) -{ - uint16_t size = ntohs (msg->header.size); - uint32_t flags = ntohl (msg->flags); - - GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, - (struct GNUNET_MessageHeader *) msg); - - if (MSG_STATE_START == ch->recv_state) - { - ch->recv_message_id = GNUNET_ntohll (msg->message_id); - ch->recv_flags = flags; - ch->recv_slave_key = msg->slave_key; - ch->recv_mod_value_size = 0; - ch->recv_mod_value_size_expected = 0; - } - else if (GNUNET_ntohll (msg->message_id) != ch->recv_message_id) - { - // FIXME - LOG (GNUNET_ERROR_TYPE_WARNING, - "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n", - GNUNET_ntohll (msg->message_id), ch->recv_message_id); - GNUNET_break_op (0); - recv_error (ch); - return; - } - else if (flags != ch->recv_flags) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Unexpected message flags. Got: %lu, expected: %lu\n", - flags, ch->recv_flags); - GNUNET_break_op (0); - recv_error (ch); - return; - } - - uint16_t pos = 0, psize = 0, ptype, size_eq, size_min; - - for (pos = 0; sizeof (*msg) + pos < size; pos += psize) - { - const struct GNUNET_MessageHeader *pmsg - = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos); - psize = ntohs (pmsg->size); - ptype = ntohs (pmsg->type); - size_eq = size_min = 0; - - if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Dropping message of type %u with invalid size %u.\n", - ptype, psize); - recv_error (ch); - return; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received message part from PSYC.\n"); - GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg); - - switch (ptype) - { - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: - size_min = sizeof (struct GNUNET_PSYC_MessageMethod); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: - size_min = sizeof (struct GNUNET_PSYC_MessageModifier); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: - size_min = sizeof (struct GNUNET_MessageHeader); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: - size_eq = sizeof (struct GNUNET_MessageHeader); - break; - default: - GNUNET_break_op (0); - recv_error (ch); - return; - } - - if (! ((0 < size_eq && psize == size_eq) - || (0 < size_min && size_min <= psize))) - { - GNUNET_break_op (0); - recv_error (ch); - return; - } - - switch (ptype) - { - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: - { - struct GNUNET_PSYC_MessageMethod *meth - = (struct GNUNET_PSYC_MessageMethod *) pmsg; - - if (MSG_STATE_START != ch->recv_state) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping out of order message method (%u).\n", - ch->recv_state); - /* It is normal to receive an incomplete message right after connecting, - * but should not happen later. - * FIXME: add a check for this condition. - */ - GNUNET_break_op (0); - recv_error (ch); - return; - } - - if ('\0' != *((char *) meth + psize - 1)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping message with malformed method. " - "Message ID: %" PRIu64 "\n", ch->recv_message_id); - GNUNET_break_op (0); - recv_error (ch); - return; - } - ch->recv_state = MSG_STATE_METHOD; - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: - { - if (!(MSG_STATE_METHOD == ch->recv_state - || MSG_STATE_MODIFIER == ch->recv_state - || MSG_STATE_MOD_CONT == ch->recv_state)) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping out of order message modifier (%u).\n", - ch->recv_state); - GNUNET_break_op (0); - recv_error (ch); - return; - } - - struct GNUNET_PSYC_MessageModifier *mod - = (struct GNUNET_PSYC_MessageModifier *) pmsg; - - uint16_t name_size = ntohs (mod->name_size); - ch->recv_mod_value_size_expected = ntohl (mod->value_size); - ch->recv_mod_value_size = psize - sizeof (*mod) - name_size - 1; - - if (psize < sizeof (*mod) + name_size + 1 - || '\0' != *((char *) &mod[1] + name_size) - || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n"); - GNUNET_break_op (0); - recv_error (ch); - return; - } - ch->recv_state = MSG_STATE_MODIFIER; - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: - { - ch->recv_mod_value_size += psize - sizeof (*pmsg); - - if (!(MSG_STATE_MODIFIER == ch->recv_state - || MSG_STATE_MOD_CONT == ch->recv_state) - || ch->recv_mod_value_size_expected < ch->recv_mod_value_size) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping out of order message modifier continuation " - "!(%u == %u || %u == %u) || %lu < %lu.\n", - MSG_STATE_MODIFIER, ch->recv_state, - MSG_STATE_MOD_CONT, ch->recv_state, - ch->recv_mod_value_size_expected, ch->recv_mod_value_size); - GNUNET_break_op (0); - recv_error (ch); - return; - } - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: - { - if (ch->recv_state < MSG_STATE_METHOD - || ch->recv_mod_value_size_expected != ch->recv_mod_value_size) - { - LOG (GNUNET_ERROR_TYPE_WARNING, - "Dropping out of order message data fragment " - "(%u < %u || %lu != %lu).\n", - ch->recv_state, MSG_STATE_METHOD, - ch->recv_mod_value_size_expected, ch->recv_mod_value_size); - - GNUNET_break_op (0); - recv_error (ch); - return; - } - ch->recv_state = MSG_STATE_DATA; - break; - } - } - - GNUNET_PSYC_MessageCallback message_cb - = ch->recv_flags & GNUNET_PSYC_MESSAGE_HISTORIC - ? ch->hist_message_cb - : ch->message_cb; - - if (NULL != message_cb) - message_cb (ch->cb_cls, ch->recv_message_id, ch->recv_flags, pmsg); - - switch (ptype) - { - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL: - recv_reset (ch); - break; - } - } + if (NULL != mst->join_req_cb) + mst->join_req_cb (mst->cb_cls, &req->slave_key, pmsg, jh); } -/** - * Handle incoming message acknowledgement from the PSYC service. - * - * @param ch The channel the acknowledgement is sent to. - */ static void -handle_psyc_message_ack (struct GNUNET_PSYC_Channel *ch) +slave_recv_join_ack (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - if (0 == ch->tmit_ack_pending) - { - LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n"); - GNUNET_break (0); - return; - } - ch->tmit_ack_pending--; - - switch (ch->tmit.state) - { - case MSG_STATE_MODIFIER: - case MSG_STATE_MOD_CONT: - if (GNUNET_NO == ch->tmit_paused) - channel_transmit_mod (ch); - break; - - case MSG_STATE_DATA: - if (GNUNET_NO == ch->tmit_paused) - channel_transmit_data (ch); - break; - - case MSG_STATE_END: - case MSG_STATE_CANCEL: - break; - - default: - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring message ACK in state %u.\n", ch->tmit.state); - } + struct GNUNET_PSYC_Slave * + slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, + sizeof (struct GNUNET_PSYC_Channel)); + struct CountersResult *cres = (struct CountersResult *) msg; + if (NULL != slv->connect_cb) + slv->connect_cb (slv->cb_cls, GNUNET_ntohll (cres->max_message_id)); } static void -handle_psyc_join_request (struct GNUNET_PSYC_Master *mst, - const struct MasterJoinRequest *req) +slave_recv_join_decision (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - struct GNUNET_PSYC_MessageHeader *msg = NULL; - if (ntohs (req->header.size) <= sizeof (*req) + sizeof (*msg)) - msg = (struct GNUNET_PSYC_MessageHeader *) &req[1]; + struct GNUNET_PSYC_Slave * + slv = GNUNET_CLIENT_MANAGER_get_user_context_ (client, + sizeof (struct GNUNET_PSYC_Channel)); + const struct SlaveJoinDecision * + dcsn = (const struct SlaveJoinDecision *) msg; - struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); - jh->mst = mst; - jh->slave_key = req->slave_key; + struct GNUNET_PSYC_MessageHeader *pmsg = NULL; + if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*pmsg)) + pmsg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; - if (NULL != mst->join_req_cb) - mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh); + struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + if (NULL != slv->join_dcsn_cb) + slv->join_dcsn_cb (slv->cb_cls, ntohl (dcsn->is_admitted), pmsg); } -static void -handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv, - const struct SlaveJoinDecision *dcsn) +static struct GNUNET_CLIENT_MANAGER_MessageHandler master_handlers[] = { - struct GNUNET_PSYC_MessageHeader *msg = NULL; - if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg)) - msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; + { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); - if (NULL != slv->join_dcsn_cb) - slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg); -} + { &channel_recv_message, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, + sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, + { &channel_recv_message_ack, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, + sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, -/** - * Type of a function to call when we receive a message - * from the service. - * - * @param cls closure - * @param msg message received, NULL on timeout or fatal error - */ -static void -message_handler (void *cls, - const struct GNUNET_MessageHeader *msg) -{ - struct GNUNET_PSYC_Channel *ch = cls; - struct GNUNET_PSYC_Master *mst = cls; - struct GNUNET_PSYC_Slave *slv = cls; - - if (NULL == msg) - { - // timeout / disconnected from service, reconnect - reschedule_connect (ch); - return; - } - uint16_t size_eq = 0; - uint16_t size_min = 0; - uint16_t size = ntohs (msg->size); - uint16_t type = ntohs (msg->type); - - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d and size %u from PSYC service\n", - type, size); - - switch (type) - { - case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: - case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: - size_eq = sizeof (struct CountersResult); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: - size_min = sizeof (struct GNUNET_PSYC_MessageHeader); - break; - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: - size_eq = sizeof (struct GNUNET_MessageHeader); - break; - case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: - size_min = sizeof (struct MasterJoinRequest); - break; - case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: - size_min = sizeof (struct SlaveJoinDecision); - break; - default: - GNUNET_break_op (0); - return; - } - - if (! ((0 < size_eq && size == size_eq) - || (0 < size_min && size_min <= size))) - { - GNUNET_break_op (0); - return; - } - - switch (type) - { - case GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK: - { - struct CountersResult *cres = (struct CountersResult *) msg; - if (NULL != mst->start_cb) - mst->start_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: - { - struct CountersResult *cres = (struct CountersResult *) msg; - if (NULL != slv->connect_cb) - slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); - break; - } - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: - { - handle_psyc_message_ack (ch); - break; - } - - case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE: - handle_psyc_message (ch, (const struct GNUNET_PSYC_MessageHeader *) msg); - break; - - case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: - handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch, - (const struct MasterJoinRequest *) msg); - break; - - case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: - handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch, - (const struct SlaveJoinDecision *) msg); - break; - } - - if (NULL != ch->client) - { - GNUNET_CLIENT_receive (ch->client, &message_handler, ch, - GNUNET_TIME_UNIT_FOREVER_REL); - } -} + { &master_recv_start_ack, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK, + sizeof (struct CountersResult), GNUNET_NO }, + { &master_recv_join_request, NULL, + GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST, + sizeof (struct MasterJoinRequest), GNUNET_YES }, -/** - * Transmit next message to service. - * - * @param cls The struct GNUNET_PSYC_Channel. - * @param size Number of bytes available in @a buf. - * @param buf Where to copy the message. - * - * @return Number of bytes copied to @a buf. - */ -static size_t -send_next_message (void *cls, size_t size, void *buf) -{ - LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); - struct GNUNET_PSYC_Channel *ch = cls; - struct MessageQueue *mq = ch->tmit_head; - if (NULL == mq) - return 0; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - size_t ret = ntohs (qmsg->size); - ch->th = NULL; - if (ret > size) - { - reschedule_connect (ch); - return 0; - } - memcpy (buf, qmsg, ret); - - GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, mq); - GNUNET_free (mq); - - if (NULL != ch->tmit_head) - transmit_next (ch); - - if (GNUNET_NO == ch->in_receive) - { - ch->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (ch->client, &message_handler, ch, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return ret; -} + { NULL, NULL, 0, 0, GNUNET_NO } +}; -/** - * Schedule transmission of the next message from our queue. - * - * @param ch PSYC handle. - */ -static void -transmit_next (struct GNUNET_PSYC_Channel *ch) +static struct GNUNET_CLIENT_MANAGER_MessageHandler slave_handlers[] = { - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); - if (NULL != ch->th || NULL == ch->client) - return; - - struct MessageQueue *mq = ch->tmit_head; - if (NULL == mq) - return; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - - ch->th = GNUNET_CLIENT_notify_transmit_ready (ch->client, - ntohs (qmsg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &send_next_message, - ch); -} + { &channel_recv_disconnect, NULL, 0, 0, GNUNET_NO }, + { &channel_recv_message, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, + sizeof (struct GNUNET_PSYC_MessageHeader), GNUNET_YES }, -/** - * Try again to connect to the PSYC service. - * - * @param cls Channel handle. - * @param tc Scheduler context. - */ -static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_PSYC_Channel *ch = cls; - - recv_reset (ch); - ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to PSYC service.\n"); - GNUNET_assert (NULL == ch->client); - ch->client = GNUNET_CLIENT_connect ("psyc", ch->cfg); - GNUNET_assert (NULL != ch->client); - uint16_t reconn_size = ntohs (ch->reconnect_msg->size); - - if (NULL == ch->tmit_head || - 0 != memcmp (&ch->tmit_head[1], ch->reconnect_msg, reconn_size)) - { - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); - memcpy (&mq[1], ch->reconnect_msg, reconn_size); - GNUNET_CONTAINER_DLL_insert (ch->tmit_head, ch->tmit_tail, mq); - } - transmit_next (ch); -} + { &channel_recv_message_ack, NULL, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK, + sizeof (struct GNUNET_MessageHeader), GNUNET_NO }, + { &slave_recv_join_ack, NULL, + GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK, + sizeof (struct CountersResult), GNUNET_NO }, -/** - * Disconnect from the PSYC service. - * - * @param c Channel handle to disconnect. - */ -static void -disconnect (void *c) -{ - struct GNUNET_PSYC_Channel *ch = c; - - GNUNET_assert (NULL != ch); - if (ch->tmit_head != ch->tmit_tail) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Disconnecting while there are still outstanding messages!\n"); - GNUNET_break (0); - } - if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (ch->reconnect_task); - ch->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != ch->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (ch->th); - ch->th = NULL; - } - if (NULL != ch->client) - { - GNUNET_CLIENT_disconnect (ch->client); - ch->client = NULL; - } - if (NULL != ch->reconnect_msg) - { - GNUNET_free (ch->reconnect_msg); - ch->reconnect_msg = NULL; - } -} + { &slave_recv_join_decision, NULL, + GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, + sizeof (struct SlaveJoinDecision), GNUNET_YES }, + + { NULL, NULL, 0, 0, GNUNET_NO } +}; /** @@ -1227,24 +370,29 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, void *cls) { struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); - struct GNUNET_PSYC_Channel *ch = &mst->ch; - struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); + struct GNUNET_PSYC_Channel *chn = &mst->chn; + struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); req->header.size = htons (sizeof (*req)); req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); req->channel_key = *channel_key; req->policy = policy; + chn->connect_msg = (struct GNUNET_MessageHeader *) req; + chn->cfg = cfg; + chn->is_master = GNUNET_YES; + mst->start_cb = start_cb; mst->join_req_cb = join_request_cb; - ch->message_cb = message_cb; - ch->cb_cls = cls; - ch->cfg = cfg; - ch->is_master = GNUNET_YES; - ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; - ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); + mst->cb_cls = cls; + + chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", master_handlers); + GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, mst, sizeof (*chn)); + + chn->tmit = GNUNET_PSYC_transmit_create (chn->client); + chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); + channel_send_connect_msg (chn); return mst; } @@ -1253,12 +401,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, * Stop a PSYC master channel. * * @param master PSYC channel master to stop. + * @param keep_active FIXME */ void -GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) +GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) { - disconnect (master); - GNUNET_free (master); + GNUNET_CLIENT_MANAGER_disconnect (mst->chn.client, GNUNET_YES); + GNUNET_free (mst); } @@ -1292,7 +441,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_PSYC_MessageHeader *join_resp) { - struct GNUNET_PSYC_Channel *ch = &jh->mst->ch; + struct GNUNET_PSYC_Channel *chn = &jh->mst->chn; struct MasterJoinDecision *dcsn; uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; @@ -1302,9 +451,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, < sizeof (*dcsn) + relay_size + join_resp_size) return GNUNET_SYSERR; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) - + relay_size + join_resp_size); - dcsn = (struct MasterJoinDecision *) &mq[1]; + dcsn = GNUNET_malloc (sizeof (*dcsn) + relay_size + join_resp_size); dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); dcsn->is_admitted = htonl (is_admitted); @@ -1313,8 +460,7 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, if (0 < join_resp_size) memcpy (&dcsn[1], join_resp, join_resp_size); - GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); - transmit_next (ch); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &dcsn->header); return GNUNET_OK; } @@ -1332,40 +478,59 @@ GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, * @return Transmission handle, NULL on error (i.e. more than one request queued). */ struct GNUNET_PSYC_MasterTransmitHandle * -GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *master, +GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst, const char *method_name, GNUNET_PSYC_TransmitNotifyModifier notify_mod, GNUNET_PSYC_TransmitNotifyData notify_data, void *notify_cls, enum GNUNET_PSYC_MasterTransmitFlags flags) { - return (struct GNUNET_PSYC_MasterTransmitHandle *) - channel_transmit (&master->ch, method_name, notify_mod, notify_data, - notify_cls, flags); + if (GNUNET_OK + == GNUNET_PSYC_transmit_message (mst->chn.tmit, method_name, NULL, + notify_mod, notify_data, notify_cls, + flags)) + return (struct GNUNET_PSYC_MasterTransmitHandle *) mst->chn.tmit; + else + return NULL; } /** * Resume transmission to the channel. * - * @param th Handle of the request that is being resumed. + * @param tmit Handle of the request that is being resumed. */ void -GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) +GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *tmit) { - channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); + GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit); } /** * Abort transmission request to the channel. * - * @param th Handle of the request that is being aborted. + * @param tmit Handle of the request that is being aborted. */ void -GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) +GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *tmit) { - channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); + GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit); +} + + +/** + * Convert a channel @a master to a @e channel handle to access the @e channel + * APIs. + * + * @param master Channel master handle. + * + * @return Channel handle, valid for as long as @a master is valid. + */ +struct GNUNET_PSYC_Channel * +GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) +{ + return &master->chn; } @@ -1420,7 +585,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, uint16_t data_size) { struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); - struct GNUNET_PSYC_Channel *ch = &slv->ch; + struct GNUNET_PSYC_Channel *chn = &slv->chn; struct SlaveJoinRequest *req = GNUNET_malloc (sizeof (*req) + relay_count * sizeof (*relays)); req->header.size = htons (sizeof (*req) @@ -1432,17 +597,21 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, req->relay_count = htonl (relay_count); memcpy (&req[1], relays, relay_count * sizeof (*relays)); + chn->connect_msg = (struct GNUNET_MessageHeader *) req; + chn->cfg = cfg; + chn->is_master = GNUNET_NO; + slv->connect_cb = connect_cb; slv->join_dcsn_cb = join_decision_cb; - ch->message_cb = message_cb; - ch->cb_cls = cls; + slv->cb_cls = cls; - ch->cfg = cfg; - ch->is_master = GNUNET_NO; - ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; - ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); + chn->client = GNUNET_CLIENT_MANAGER_connect (cfg, "psyc", slave_handlers); + GNUNET_CLIENT_MANAGER_set_user_context_ (chn->client, slv, sizeof (*chn)); + chn->recv = GNUNET_PSYC_receive_create (message_cb, message_cb, cls); + chn->tmit = GNUNET_PSYC_transmit_create (chn->client); + + channel_send_connect_msg (chn); return slv; } @@ -1456,10 +625,10 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, * @param slave Slave handle. */ void -GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) +GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slv) { - disconnect (slave); - GNUNET_free (slave); + GNUNET_CLIENT_MANAGER_disconnect (slv->chn.client, GNUNET_YES); + GNUNET_free (slv); } @@ -1477,69 +646,59 @@ GNUNET_PSYC_slave_part (struct GNUNET_PSYC_Slave *slave) * queued). */ struct GNUNET_PSYC_SlaveTransmitHandle * -GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, +GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slv, const char *method_name, GNUNET_PSYC_TransmitNotifyModifier notify_mod, GNUNET_PSYC_TransmitNotifyData notify_data, void *notify_cls, enum GNUNET_PSYC_SlaveTransmitFlags flags) + { - return (struct GNUNET_PSYC_SlaveTransmitHandle *) - channel_transmit (&slave->ch, method_name, - notify_mod, notify_data, notify_cls, flags); + if (GNUNET_OK + == GNUNET_PSYC_transmit_message (slv->chn.tmit, method_name, NULL, + notify_mod, notify_data, notify_cls, + flags)) + return (struct GNUNET_PSYC_SlaveTransmitHandle *) slv->chn.tmit; + else + return NULL; } /** * Resume transmission to the master. * - * @param th Handle of the request that is being resumed. + * @param tmit Handle of the request that is being resumed. */ void -GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *th) +GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_SlaveTransmitHandle *tmit) { - channel_transmit_resume ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); + GNUNET_PSYC_transmit_resume ((struct GNUNET_PSYC_TransmitHandle *) tmit); } /** * Abort transmission request to master. * - * @param th Handle of the request that is being aborted. + * @param tmit Handle of the request that is being aborted. */ void -GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *th) +GNUNET_PSYC_slave_transmit_cancel (struct GNUNET_PSYC_SlaveTransmitHandle *tmit) { - channel_transmit_cancel ((struct GNUNET_PSYC_ChannelTransmitHandle *) th); -} - - -/** - * Convert a channel @a master to a @e channel handle to access the @e channel - * APIs. - * - * @param master Channel master handle. - * - * @return Channel handle, valid for as long as @a master is valid. - */ -struct GNUNET_PSYC_Channel * -GNUNET_PSYC_master_get_channel (struct GNUNET_PSYC_Master *master) -{ - return &master->ch; + GNUNET_PSYC_transmit_cancel ((struct GNUNET_PSYC_TransmitHandle *) tmit); } /** * Convert @a slave to a @e channel handle to access the @e channel APIs. * - * @param slave Slave handle. + * @param slv Slave handle. * * @return Channel handle, valid for as long as @a slave is valid. */ struct GNUNET_PSYC_Channel * -GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) +GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slv) { - return &slave->ch; + return &slv->chn; } @@ -1565,23 +724,17 @@ GNUNET_PSYC_slave_get_channel (struct GNUNET_PSYC_Slave *slave) * @param effective_since Addition of slave is in effect since this message ID. */ void -GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, +GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *chn, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, uint64_t announced_at, uint64_t effective_since) { - struct ChannelSlaveAdd *slvadd; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvadd)); - - slvadd = (struct ChannelSlaveAdd *) &mq[1]; - slvadd->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); - slvadd->header.size = htons (sizeof (*slvadd)); - slvadd->announced_at = GNUNET_htonll (announced_at); - slvadd->effective_since = GNUNET_htonll (effective_since); - GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, - channel->tmit_tail, - mq); - transmit_next (channel); + struct ChannelSlaveAdd *add = GNUNET_malloc (sizeof (*add)); + add->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD); + add->header.size = htons (sizeof (*add)); + add->announced_at = GNUNET_htonll (announced_at); + add->effective_since = GNUNET_htonll (effective_since); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &add->header); } @@ -1607,21 +760,15 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *channel, * @param announced_at ID of the message that announced the membership change. */ void -GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *channel, +GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *chn, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, uint64_t announced_at) { - struct ChannelSlaveRemove *slvrm; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*slvrm)); - - slvrm = (struct ChannelSlaveRemove *) &mq[1]; - slvrm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); - slvrm->header.size = htons (sizeof (*slvrm)); - slvrm->announced_at = GNUNET_htonll (announced_at); - GNUNET_CONTAINER_DLL_insert_tail (channel->tmit_head, - channel->tmit_tail, - mq); - transmit_next (channel); + struct ChannelSlaveRemove *rm = GNUNET_malloc (sizeof (*rm)); + rm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM); + rm->header.size = htons (sizeof (*rm)); + rm->announced_at = GNUNET_htonll (announced_at); + GNUNET_CLIENT_MANAGER_transmit (chn->client, &rm->header); } diff --git a/src/psyc/psyc_util_lib.c b/src/psyc/psyc_util_lib.c index 6dd9681900..0104c93e85 100644 --- a/src/psyc/psyc_util_lib.c +++ b/src/psyc/psyc_util_lib.c @@ -28,6 +28,7 @@ #include "platform.h" #include "gnunet_util_lib.h" +#include "gnunet_env_lib.h" #include "gnunet_psyc_service.h" #include "gnunet_psyc_util_lib.h" diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 35e80868c6..6468b8a2b8 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -68,6 +68,7 @@ struct TransmitClosure struct GNUNET_PSYC_MasterTransmitHandle *mst_tmit; struct GNUNET_PSYC_SlaveTransmitHandle *slv_tmit; struct GNUNET_ENV_Environment *env; + struct GNUNET_ENV_Modifier *mod; char *data[16]; const char *mod_value; size_t mod_value_size; @@ -79,7 +80,7 @@ struct TransmitClosure struct TransmitClosure *tmit; -static int join_req_count; +static uint8_t join_req_count; enum { @@ -183,7 +184,7 @@ master_message_cb (void *cls, uint64_t message_id, uint32_t flags, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master got message part of type %u and size %u " - "belonging to message ID %llu with flags %xu\n", + "belonging to message ID %llu with flags %x\n", type, size, message_id, flags); switch (test) @@ -227,7 +228,7 @@ slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave got message part of type %u and size %u " - "belonging to message ID %llu with flags %xu\n", + "belonging to message ID %llu with flags %x\n", type, size, message_id, flags); switch (test) @@ -256,6 +257,48 @@ transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static int +tmit_notify_data (void *cls, uint16_t *data_size, void *data) +{ + struct TransmitClosure *tmit = cls; + if (0 == tmit->data_count) + { + *data_size = 0; + return GNUNET_YES; + } + + uint16_t size = strlen (tmit->data[tmit->n]); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmit notify data: %u bytes available, " + "processing fragment %u/%u (size %u).\n", + *data_size, tmit->n + 1, tmit->data_count, size); + if (*data_size < size) + { + *data_size = 0; + GNUNET_assert (0); + return GNUNET_SYSERR; + } + + if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); + tmit->paused = GNUNET_YES; + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + tmit->data_delay[tmit->n]), + &transmit_resume, tmit); + *data_size = 0; + return GNUNET_NO; + } + tmit->paused = GNUNET_NO; + + *data_size = size; + memcpy (data, tmit->data[tmit->n], size); + + return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES; +} + + +static int tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, uint32_t *full_value_size) { @@ -265,41 +308,39 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, "%u modifiers left to process.\n", *data_size, GNUNET_ENV_environment_get_count (tmit->env)); - enum GNUNET_ENV_Operator op = 0; - const char *name = NULL; - const char *value = NULL; uint16_t name_size = 0; size_t value_size = 0; + const char *value = NULL; - if (NULL != oper) + if (NULL != oper && NULL != tmit->mod) { /* New modifier */ - if (GNUNET_NO == GNUNET_ENV_environment_shift (tmit->env, &op, &name, - (void *) &value, &value_size)) + tmit->mod = tmit->mod->next; + if (NULL == tmit->mod) { /* No more modifiers, continue with data */ *data_size = 0; return GNUNET_YES; } - GNUNET_assert (value_size < UINT32_MAX); - *full_value_size = value_size; - *oper = op; - name_size = strlen (name); + GNUNET_assert (tmit->mod->value_size < UINT32_MAX); + *full_value_size = tmit->mod->value_size; + *oper = tmit->mod->oper; + name_size = strlen (tmit->mod->name); - if (name_size + 1 + value_size <= *data_size) + if (name_size + 1 + tmit->mod->value_size <= *data_size) { - *data_size = name_size + 1 + value_size; + *data_size = name_size + 1 + tmit->mod->value_size; } else { - tmit->mod_value_size = value_size; + tmit->mod_value_size = tmit->mod->value_size; value_size = *data_size - name_size - 1; tmit->mod_value_size -= value_size; - tmit->mod_value = value + value_size; + tmit->mod_value = tmit->mod->value + value_size; } - memcpy (data, name, name_size); + memcpy (data, tmit->mod->name, name_size); ((char *)data)[name_size] = '\0'; - memcpy ((char *)data + name_size + 1, value, value_size); + memcpy ((char *)data + name_size + 1, tmit->mod->value, value_size); } else if (NULL != tmit->mod_value && 0 < tmit->mod_value_size) { /* Modifier continuation */ @@ -333,48 +374,6 @@ tmit_notify_mod (void *cls, uint16_t *data_size, void *data, uint8_t *oper, } -static int -tmit_notify_data (void *cls, uint16_t *data_size, void *data) -{ - struct TransmitClosure *tmit = cls; - if (0 == tmit->data_count) - { - *data_size = 0; - return GNUNET_YES; - } - - uint16_t size = strlen (tmit->data[tmit->n]); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmit notify data: %u bytes available, " - "processing fragment %u/%u (size %u).\n", - *data_size, tmit->n + 1, tmit->data_count, size); - if (*data_size < size) - { - *data_size = 0; - GNUNET_assert (0); - return GNUNET_SYSERR; - } - - if (GNUNET_YES != tmit->paused && 0 < tmit->data_delay[tmit->n]) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission paused.\n"); - tmit->paused = GNUNET_YES; - GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - tmit->data_delay[tmit->n]), - &transmit_resume, tmit); - *data_size = 0; - return GNUNET_NO; - } - tmit->paused = GNUNET_NO; - - *data_size = size; - memcpy (data, tmit->data[tmit->n], size); - - return ++tmit->n < tmit->data_count ? GNUNET_NO : GNUNET_YES; -} - - static void slave_join (); @@ -388,7 +387,7 @@ join_decision_cb (void *cls, int is_admitted, if (GNUNET_YES != is_admitted) { /* First join request is refused, retry. */ - //GNUNET_assert (1 == join_req_count); + GNUNET_assert (1 == join_req_count); slave_join (); return; } @@ -403,6 +402,7 @@ join_decision_cb (void *cls, int is_admitted, "_abc", "abc def", 7); GNUNET_ENV_environment_add (tmit->env, GNUNET_ENV_OP_ASSIGN, "_abc_def", "abc def ghi", 11); + tmit->mod = GNUNET_ENV_environment_head (tmit->env); tmit->n = 0; tmit->data[0] = "slave test"; tmit->data_count = 1; @@ -421,8 +421,8 @@ join_request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key struct GNUNET_HashCode slave_key_hash; GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Got join request from %s.\n", - GNUNET_h2s (&slave_key_hash)); + "Got join request #%u from %s.\n", + join_req_count, GNUNET_h2s (&slave_key_hash)); /* Reject first request */ int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; @@ -493,6 +493,7 @@ master_transmit () name_cont, val_cont, GNUNET_PSYC_MODIFIER_MAX_PAYLOAD - name_cont_size + GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD); + tmit->mod = GNUNET_ENV_environment_head (tmit->env); tmit->data[0] = "foo"; tmit->data[1] = GNUNET_malloc (GNUNET_PSYC_DATA_MAX_PAYLOAD + 1); for (i = 0; i < GNUNET_PSYC_DATA_MAX_PAYLOAD; i++) |