diff options
author | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2014-05-29 16:35:55 +0000 |
commit | 9955561e1b204ccf23fbf841f409bd3ef79be88c (patch) | |
tree | 0271c23ae9f1dad72266a0e6073d696e5afca027 /src/multicast/multicast_api.c | |
parent | a5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff) |
psyc, multicast: reorg code, use new client manager & psyc util lib
Diffstat (limited to 'src/multicast/multicast_api.c')
-rw-r--r-- | src/multicast/multicast_api.c | 614 |
1 files changed, 154 insertions, 460 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 36d564d524..b6d51896d7 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -24,6 +24,7 @@ * @author Christian Grothoff * @author Gabor X Toth */ + #include "platform.h" #include "gnunet_util_lib.h" #include "gnunet_multicast_service.h" @@ -33,26 +34,6 @@ /** - * Started origins. - * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin - */ -static struct GNUNET_CONTAINER_MultiHashMap *origins; - -/** - * Joined members. - * group_key_hash -> struct GNUNET_MULTICAST_Member - */ -static struct GNUNET_CONTAINER_MultiHashMap *members; - - -struct MessageQueue -{ - struct MessageQueue *prev; - struct MessageQueue *next; -}; - - -/** * Handle for a request to send a message to all multicast group members * (from the origin). */ @@ -90,47 +71,14 @@ struct GNUNET_MULTICAST_Group const struct GNUNET_CONFIGURATION_Handle *cfg; /** - * Socket (if available). + * Client connection to the service. */ - struct GNUNET_CLIENT_Connection *client; - - /** - * Currently pending transmission request, or NULL for none. - */ - struct GNUNET_CLIENT_TransmitHandle *th; - - /** - * Head of operations to transmit. - */ - struct MessageQueue *tmit_head; - - /** - * Tail of operations to transmit. - */ - struct MessageQueue *tmit_tail; - - /** - * Message being transmitted to the Multicast service. - */ - struct MessageQueue *tmit_msg; + struct GNUNET_CLIENT_MANAGER_Connection *client; /** * Message to send on reconnect. */ - struct GNUNET_MessageHeader *reconnect_msg; - - /** - * Task doing exponential back-off trying to reconnect. - */ - GNUNET_SCHEDULER_TaskIdentifier reconnect_task; - - /** - * Time for next connect retry. - */ - struct GNUNET_TIME_Relative reconnect_delay; - - struct GNUNET_CRYPTO_EddsaPublicKey pub_key; - struct GNUNET_HashCode pub_key_hash; + struct GNUNET_MessageHeader *connect_msg; GNUNET_MULTICAST_JoinRequestCallback join_req_cb; GNUNET_MULTICAST_MembershipTestCallback member_test_cb; @@ -140,11 +88,6 @@ struct GNUNET_MULTICAST_Group void *cb_cls; /** - * Are we polling for incoming messages right now? - */ - uint8_t in_receive; - - /** * Are we currently transmitting a message? */ uint8_t in_transmit; @@ -163,7 +106,6 @@ struct GNUNET_MULTICAST_Origin { struct GNUNET_MULTICAST_Group grp; struct GNUNET_MULTICAST_OriginTransmitHandle tmit; - struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; GNUNET_MULTICAST_RequestCallback request_cb; }; @@ -229,294 +171,125 @@ struct GNUNET_MULTICAST_MemberReplayHandle }; -static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - - -static void -reschedule_connect (struct GNUNET_MULTICAST_Group *grp); - - /** - * Schedule transmission of the next message from our queue. - * - * @param grp PSYC channel handle + * Send first message to the service after connecting. */ static void -transmit_next (struct GNUNET_MULTICAST_Group *grp); - - -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg); - - -/** - * Reschedule a connect attempt to the service. - * - * @param c channel to reconnect - */ -static void -reschedule_connect (struct GNUNET_MULTICAST_Group *grp) +group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp) { - GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK); - - if (NULL != grp->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th); - grp->th = NULL; - } - if (NULL != grp->client) - { - GNUNET_CLIENT_disconnect (grp->client); - grp->client = NULL; - } - grp->in_receive = GNUNET_NO; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to Multicast service in %s.\n", - GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES)); - grp->reconnect_task = - GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp); - grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay); + uint16_t cmsg_size = ntohs (grp->connect_msg->size); + struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size); + memcpy (cmsg, grp->connect_msg, cmsg_size); + GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg); } /** - * Reset stored data related to the last received message. + * Got disconnected from service. Reconnect. */ static void -recv_reset (struct GNUNET_MULTICAST_Group *grp) -{ -} - - -static void -recv_error (struct GNUNET_MULTICAST_Group *grp) -{ - if (NULL != grp->message_cb) - grp->message_cb (grp->cb_cls, NULL); - - recv_reset (grp); -} - - -/** - * Transmit next message to service. - * - * @param cls The struct GNUNET_MULTICAST_Group. - * @param size Number of bytes available in @a buf. - * @param buf Where to copy the message. - * - * @return Number of bytes copied to @a buf. - */ -static size_t -send_next_message (void *cls, size_t size, void *buf) +group_recv_disconnect (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n"); - struct GNUNET_MULTICAST_Group *grp = cls; - struct MessageQueue *mq = grp->tmit_head; - if (NULL == mq) - return 0; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - size_t ret = ntohs (qmsg->size); - grp->th = NULL; - if (ret > size) - { - reschedule_connect (grp); - return 0; - } - memcpy (buf, qmsg, ret); - - GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq); - GNUNET_free (mq); - - if (NULL != grp->tmit_head) - transmit_next (grp); - - if (GNUNET_NO == grp->in_receive) - { - grp->in_receive = GNUNET_YES; - GNUNET_CLIENT_receive (grp->client, &message_handler, grp, - GNUNET_TIME_UNIT_FOREVER_REL); - } - return ret; + struct GNUNET_MULTICAST_Group * + grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + GNUNET_CLIENT_MANAGER_reconnect (client); + group_send_connect_msg (grp); } /** - * Schedule transmission of the next message from our queue. - * - * @param grp Multicast group handle. + * Receive join request from service. */ static void -transmit_next (struct GNUNET_MULTICAST_Group *grp) +group_recv_join_request (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n"); - if (NULL != grp->th || NULL == grp->client) - return; - - struct MessageQueue *mq = grp->tmit_head; - if (NULL == mq) - return; - struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1]; - - grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client, - ntohs (qmsg->size), - GNUNET_TIME_UNIT_FOREVER_REL, - GNUNET_NO, - &send_next_message, - grp); -} + struct GNUNET_MULTICAST_Group * + grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + const struct MulticastJoinRequestMessage * + jreq = (const struct MulticastJoinRequestMessage *) msg; -/** - * Try again to connect to the Multicast service. - * - * @param cls Channel handle. - * @param tc Scheduler context. - */ -static void -reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct GNUNET_MULTICAST_Group *grp = cls; - - recv_reset (grp); - grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Connecting to Multicast service.\n"); - GNUNET_assert (NULL == grp->client); - grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg); - GNUNET_assert (NULL != grp->client); - uint16_t reconn_size = ntohs (grp->reconnect_msg->size); - - if (NULL == grp->tmit_head || - 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size)) - { - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size); - memcpy (&mq[1], grp->reconnect_msg, reconn_size); - GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq); - } - transmit_next (grp); -} + struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + jh->group = grp; + jh->member_key = jreq->member_key; + jh->member_peer = jreq->member_peer; + const struct GNUNET_MessageHeader *jmsg = NULL; + if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size)) + jmsg = (const struct GNUNET_MessageHeader *) &jreq[1]; -/** - * Disconnect from the Multicast service. - * - * @param g Group handle to disconnect. - */ -static void -disconnect (void *g) -{ - struct GNUNET_MULTICAST_Group *grp = g; - - GNUNET_assert (NULL != grp); - if (grp->tmit_head != grp->tmit_tail) - { - LOG (GNUNET_ERROR_TYPE_ERROR, - "Disconnecting while there are still outstanding messages!\n"); - GNUNET_break (0); - } - if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (grp->reconnect_task); - grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - } - if (NULL != grp->th) - { - GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th); - grp->th = NULL; - } - if (NULL != grp->client) - { - GNUNET_CLIENT_disconnect (grp->client); - grp->client = NULL; - } - if (NULL != grp->reconnect_msg) - { - GNUNET_free (grp->reconnect_msg); - grp->reconnect_msg = NULL; - } + if (NULL != grp->join_req_cb) + grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh); } /** - * Iterator callback for calling message callbacks for all groups. + * Receive multicast message from service. */ -static int -message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group) +static void +group_recv_message (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - const struct GNUNET_MessageHeader *msg = cls; - struct GNUNET_MULTICAST_Group *grp = group; + struct GNUNET_MULTICAST_Group * + grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + struct GNUNET_MULTICAST_MessageHeader * + mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling message callback with a message " - "of type %u and size %u.\n", - ntohs (msg->type), ntohs (msg->size)); + "Calling message callback with a message of size %u.\n", + ntohs (mmsg->header.size)); if (NULL != grp->message_cb) - grp->message_cb (grp->cb_cls, msg); - - return GNUNET_YES; + grp->message_cb (grp->cb_cls, mmsg); } /** - * Iterator callback for calling request callbacks of origins. + * Origin receives uniquest request from a member. */ -static int -request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) -{ - const struct GNUNET_MULTICAST_RequestHeader *req = cls; - struct GNUNET_MULTICAST_Origin *orig = origin; +static void +origin_recv_request (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Origin * + orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + grp = &orig->grp; + struct GNUNET_MULTICAST_RequestHeader * + req = (struct GNUNET_MULTICAST_RequestHeader *) msg; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Calling request callback for a request of type %u and size %u.\n", - ntohs (req->header.type), ntohs (req->header.size)); + "Calling request callback with a request of size %u.\n", + ntohs (req->header.size)); if (NULL != orig->request_cb) - orig->request_cb (orig->grp.cb_cls, &req->member_key, - (const struct GNUNET_MessageHeader *) req, 0); - return GNUNET_YES; + orig->request_cb (grp->cb_cls, req); } /** - * Iterator callback for calling join request callbacks of origins. + * Member receives join decision. */ -static int -join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *group) +static void +member_recv_join_decision (void *cls, + struct GNUNET_CLIENT_MANAGER_Connection *client, + const struct GNUNET_MessageHeader *msg) { - const struct MulticastJoinRequestMessage *req = cls; - struct GNUNET_MULTICAST_Group *grp = group; - - struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); - jh->group = grp; - jh->member_key = req->member_key; - jh->member_peer = req->member_peer; - - const struct GNUNET_MessageHeader *msg = NULL; - if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size)) - msg = (const struct GNUNET_MessageHeader *) &req[1]; - - if (NULL != grp->join_req_cb) - grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh); - return GNUNET_YES; -} - + struct GNUNET_MULTICAST_Group *grp; + struct GNUNET_MULTICAST_Member * + mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp)); + grp = &mem->grp; -/** - * Iterator callback for calling join decision callbacks of members. - */ -static int -join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *member) -{ - const struct MulticastJoinDecisionMessageHeader *hdcsn = cls; + const struct MulticastJoinDecisionMessageHeader * + hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg; const struct MulticastJoinDecisionMessage * dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; - struct GNUNET_MULTICAST_Member *mem = member; - struct GNUNET_MULTICAST_Group *grp = &mem->grp; uint16_t dcsn_size = ntohs (dcsn->header.size); int is_admitted = ntohl (dcsn->is_admitted); @@ -549,116 +322,53 @@ join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, if (GNUNET_YES != is_admitted) GNUNET_MULTICAST_member_part (mem); - - return GNUNET_YES; } + /** - * Function called when we receive a message from the service. - * - * @param cls struct GNUNET_MULTICAST_Group - * @param msg Message received, NULL on timeout or fatal error. + * Message handlers for an origin. */ -static void -message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] = { - struct GNUNET_MULTICAST_Group *grp = cls; + { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - if (NULL == msg) - { - // timeout / disconnected from service, reconnect - reschedule_connect (grp); - return; - } + { &group_recv_message, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, - uint16_t size_eq = 0; - uint16_t size_min = 0; - uint16_t size = ntohs (msg->size); - uint16_t type = ntohs (msg->type); + { &origin_recv_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, + sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES }, - LOG (GNUNET_ERROR_TYPE_DEBUG, - "Received message of type %d and size %u from Multicast service\n", - type, size); + { &group_recv_join_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, - switch (type) - { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: - size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: - size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader); - break; + { NULL, NULL, 0, 0, GNUNET_NO } +}; - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: - size_min = sizeof (struct MulticastJoinRequestMessage); - break; - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: - size_min = sizeof (struct MulticastJoinDecisionMessage); - break; +/** + * Message handlers for a member. + */ +static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] = +{ + { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO }, - default: - GNUNET_break_op (0); - type = 0; - } + { &group_recv_message, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, + sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES }, - if (! ((0 < size_eq && size == size_eq) - || (0 < size_min && size_min <= size))) - { - GNUNET_break_op (0); - type = 0; - } + { &group_recv_join_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, + sizeof (struct MulticastJoinRequestMessage), GNUNET_YES }, - switch (type) - { - case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: - if (origins != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - message_cb, (void *) msg); - if (members != NULL) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - message_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST: - if (GNUNET_YES != grp->is_origin) - { - GNUNET_break (0); - break; - } - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - request_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST: - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - join_request_cb, (void *) msg); - if (NULL != members) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - join_request_cb, (void *) msg); - break; - - case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: - if (GNUNET_NO != grp->is_origin) - { - GNUNET_break (0); - break; - } - if (NULL != members) - GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, - join_decision_cb, (void *) msg); - break; - } + { &member_recv_join_decision, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, + sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES }, - if (NULL != grp->client) - { - GNUNET_CLIENT_receive (grp->client, &message_handler, grp, - GNUNET_TIME_UNIT_FOREVER_REL); - } -} + { NULL, NULL, 0, 0, GNUNET_NO } +}; /** @@ -667,7 +377,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) * Must be called once and only once in response to an invocation of the * #GNUNET_MULTICAST_JoinRequestCallback. * - * @param jh Join request handle. + * @param join Join request handle. * @param is_admitted #GNUNET_YES if the join is approved, * #GNUNET_NO if it is disapproved, * #GNUNET_SYSERR if we cannot answer the request. @@ -685,27 +395,25 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) * peer that issued the request even if admission is denied. */ struct GNUNET_MULTICAST_ReplayHandle * -GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, +GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join, int is_admitted, uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_resp) { - struct GNUNET_MULTICAST_Group *grp = jh->group; + struct GNUNET_MULTICAST_Group *grp = join->group; uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; uint16_t relay_size = relay_count * sizeof (*relays); + struct MulticastJoinDecisionMessageHeader * hdcsn; struct MulticastJoinDecisionMessage *dcsn; - struct MessageQueue * - mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn) - + relay_size + join_resp_size); - - hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1]; - hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn) + + relay_size + join_resp_size); hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) - + relay_size + join_resp_size); - hdcsn->member_key = jh->member_key; - hdcsn->peer = jh->member_peer; + + relay_size + join_resp_size); + hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn->member_key = join->member_key; + hdcsn->peer = join->member_peer; dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); @@ -717,10 +425,8 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, if (0 < join_resp_size) memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); - transmit_next (grp); - - GNUNET_free (jh); + GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header); + GNUNET_free (join); return NULL; } @@ -832,7 +538,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, start->max_fragment_id = max_fragment_id; memcpy (&start->group_key, priv_key, sizeof (*priv_key)); - grp->reconnect_msg = (struct GNUNET_MessageHeader *) start; + grp->connect_msg = (struct GNUNET_MessageHeader *) start; grp->is_origin = GNUNET_YES; grp->cfg = cfg; @@ -844,20 +550,10 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, grp->message_cb = message_cb; orig->request_cb = request_cb; - orig->priv_key = *priv_key; - - GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key); - GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), - &grp->pub_key_hash); - - if (NULL == origins) - origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); + grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers); + GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp)); + group_send_connect_msg (grp); return orig; } @@ -871,8 +567,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig) { - disconnect (&orig->grp); - GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig); + GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES); GNUNET_free (orig); } @@ -885,26 +580,22 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); - - struct GNUNET_MULTICAST_MessageHeader * - msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1]; + struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size); int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, "OriginTransmitNotify() returned error or invalid message size.\n"); /* FIXME: handle error */ - GNUNET_free (mq); + GNUNET_free (msg); return; } if (GNUNET_NO == ret && 0 == buf_size) { - GNUNET_free (mq); + GNUNET_free (msg); return; /* Transmission paused. */ } @@ -915,7 +606,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig) msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset); tmit->fragment_offset += sizeof (*msg) + buf_size; - transmit_next (grp); + GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header); } @@ -939,6 +630,12 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig, GNUNET_MULTICAST_OriginTransmitNotify notify, void *notify_cls) { +/* FIXME + if (GNUNET_YES == orig->grp.in_transmit) + return NULL; + orig->grp.in_transmit = GNUNET_YES; +*/ + struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit; tmit->origin = orig; tmit->message_id = message_id; @@ -1047,10 +744,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, if (0 < join_msg_size) memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size); - grp->reconnect_msg = (struct GNUNET_MessageHeader *) join; + grp->connect_msg = (struct GNUNET_MessageHeader *) join; grp->is_origin = GNUNET_NO; grp->cfg = cfg; - grp->pub_key = *group_key; mem->join_dcsn_cb = join_decision_cb; grp->join_req_cb = join_request_cb; @@ -1059,17 +755,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, grp->message_cb = message_cb; grp->cb_cls = cls; - GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key); - GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); - - if (NULL == members) - members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - - GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - - grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO; - grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp); + grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers); + GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp)); + group_send_connect_msg (grp); return mem; } @@ -1088,8 +776,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, void GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem) { - disconnect (&mem->grp); - GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem); + GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES); GNUNET_free (mem); } @@ -1162,25 +849,26 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) struct GNUNET_MULTICAST_Group *grp = &mem->grp; struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; - size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD; - struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size); - GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq); - - struct GNUNET_MULTICAST_RequestHeader * - req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1]; + size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE; + struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size); int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]); if (! (GNUNET_YES == ret || GNUNET_NO == ret) - || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size) + || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size) { LOG (GNUNET_ERROR_TYPE_ERROR, "MemberTransmitNotify() returned error or invalid message size.\n"); /* FIXME: handle error */ + GNUNET_free (req); return; } if (GNUNET_NO == ret && 0 == buf_size) - return; /* Transmission paused. */ + { + /* Transmission paused. */ + GNUNET_free (req); + return; + } req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST); req->header.size = htons (sizeof (*req) + buf_size); @@ -1188,7 +876,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem) req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset); tmit->fragment_offset += sizeof (*req) + buf_size; - transmit_next (grp); + GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header); } @@ -1207,6 +895,12 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem, GNUNET_MULTICAST_MemberTransmitNotify notify, void *notify_cls) { +/* FIXME + if (GNUNET_YES == mem->grp.in_transmit) + return NULL; + mem->grp.in_transmit = GNUNET_YES; +*/ + struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit; tmit->member = mem; tmit->request_id = request_id; |