diff options
author | Gabor X Toth <*@tg-x.net> | 2015-09-26 17:09:57 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2015-09-26 17:09:57 +0000 |
commit | 5a042e00e06de726a21d9db05ddeb2ac16ca7c0c (patch) | |
tree | 9b2f504796b72a50d061eb812be4aa87efbf6584 | |
parent | 0c5cea1c26efb8b53f643a6ce6b162d9c9fe2b8f (diff) |
multicast: replay
-rw-r--r-- | src/include/gnunet_multicast_service.h | 4 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 10 | ||||
-rw-r--r-- | src/multicast/gnunet-service-multicast.c | 562 | ||||
-rw-r--r-- | src/multicast/multicast.h | 114 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 236 | ||||
-rw-r--r-- | src/multicast/test_multicast.c | 211 | ||||
-rw-r--r-- | src/util/client_manager.c | 4 |
7 files changed, 940 insertions, 201 deletions
diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h index 3b53a93da0..c618f3dd5f 100644 --- a/src/include/gnunet_multicast_service.h +++ b/src/include/gnunet_multicast_service.h @@ -750,7 +750,9 @@ struct GNUNET_MULTICAST_MemberReplayHandle; struct GNUNET_MULTICAST_MemberReplayHandle * GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member, uint64_t fragment_id, - uint64_t flags); + uint64_t flags, + GNUNET_MULTICAST_ResultCallback result_cb, + void *result_cb_cls); /** diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 0063e1ff7e..7e36ebfe99 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2437,6 +2437,16 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT 762 +/** + * C<->S<->T: Replay response from a group member to another member. + */ +#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 763 + +/** + * C<->S: End of replay response. + */ +#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 764 + /******************************************************************************* diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index dee5738484..e7ee92cdfa 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c @@ -88,18 +88,33 @@ static struct GNUNET_CONTAINER_MultiHashMap *members; static struct GNUNET_CONTAINER_MultiHashMap *group_members; /** - * Incoming CADET channels. + * Incoming CADET channels with connected children in the tree. * Group's pub_key_hash -> struct Channel * (multi) */ static struct GNUNET_CONTAINER_MultiHashMap *channels_in; /** - * Outgoing CADET channels. + * Outgoing CADET channels connecting to parents in the tree. * Group's pub_key_hash -> struct Channel * (multi) */ static struct GNUNET_CONTAINER_MultiHashMap *channels_out; /** + * Incoming replay requests from CADET. + * Group's pub_key_hash -> + * H(fragment_id, message_id, fragment_offset, flags) -> struct Channel * + */ +static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet; + +/** + * Incoming replay requests from clients. + * Group's pub_key_hash -> + * H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVER_Client * + */ +static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client; + + +/** * Join status of a remote peer. */ enum JoinStatus @@ -294,6 +309,15 @@ struct Member }; +struct ReplayRequestKey +{ + uint64_t fragment_id; + uint64_t message_id; + uint64_t fragment_offset; + uint64_t flags; +}; + + /** * Task run during shutdown. * @@ -375,6 +399,95 @@ cleanup_group (struct Group *grp) } +void +replay_key_hash (uint64_t fragment_id, uint64_t message_id, + uint64_t fragment_offset, uint64_t flags, + struct GNUNET_HashCode *key_hash) +{ + struct ReplayRequestKey key = { + .fragment_id = fragment_id, + .message_id = message_id, + .fragment_offset = fragment_offset, + .flags = flags, + }; + GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash); +} + + +/** + * Remove channel from replay request hashmap. + * + * @param chn + * Channel to remove. + * + * @return #GNUNET_YES if there are more entries to process, + * #GNUNET_NO when reached end of hashmap. + */ +static int +replay_req_remove_cadet (struct Channel *chn) +{ + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &chn->grp->pub_key_hash); + if (NULL == grp_replay_req) + return GNUNET_NO; + + struct GNUNET_CONTAINER_MultiHashMapIterator * + it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req); + struct GNUNET_HashCode key; + const struct Channel *c; + while (GNUNET_YES + == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key, + (const void **) &c)) + { + if (c == chn) + { + GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn); + return GNUNET_YES; + } + } + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_NO; +} + + +/** + * Remove client from replay request hashmap. + * + * @param client + * Client to remove. + * + * @return #GNUNET_YES if there are more entries to process, + * #GNUNET_NO when reached end of hashmap. + */ +static int +replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client *client) +{ + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL == grp_replay_req) + return GNUNET_NO; + + struct GNUNET_CONTAINER_MultiHashMapIterator * + it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req); + struct GNUNET_HashCode key; + const struct GNUNET_SERVER_Client *c; + while (GNUNET_YES + == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key, + (const void **) &c)) + { + if (c == client) + { + GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client); + return GNUNET_YES; + } + } + GNUNET_CONTAINER_multihashmap_iterator_destroy (it); + return GNUNET_NO; +} + + /** * Called whenever a client is disconnected. * @@ -417,6 +530,8 @@ client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client) cl = cl->next; } + while (GNUNET_YES == replay_req_remove_client (grp, client)); + if (NULL == grp->clients_head) { /* Last client disconnected. */ #if FIXME @@ -434,14 +549,29 @@ client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client) /** + * Send message to a client. + */ +static void +client_send (struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Sending message to client.\n", client); + + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, msg, GNUNET_NO); +} + + +/** * Send message to all clients connected to the group. */ static void -client_send_msg (const struct Group *grp, - const struct GNUNET_MessageHeader *msg) +client_send_group (const struct Group *grp, + const struct GNUNET_MessageHeader *msg) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Sending message to clients.\n", grp); + "%p Sending message to all clients of the group.\n", grp); struct ClientList *cl = grp->clients_head; while (NULL != cl) @@ -463,7 +593,7 @@ client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, const struct GNUNET_MessageHeader *msg = cls; struct Member *orig = origin; - client_send_msg (&orig->grp, msg); + client_send_group (&orig->grp, msg); return GNUNET_YES; } @@ -480,7 +610,7 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, if (NULL != mem->join_dcsn) { /* Only send message to admitted members */ - client_send_msg (&mem->grp, msg); + client_send_group (&mem->grp, msg); } return GNUNET_YES; } @@ -497,14 +627,32 @@ client_send_all (struct GNUNET_HashCode *pub_key_hash, const struct GNUNET_MessageHeader *msg) { int n = 0; - if (origins != NULL) - n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, - client_send_origin_cb, - (void *) msg); - if (members != NULL) - n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash, - client_send_member_cb, - (void *) msg); + n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, + client_send_origin_cb, + (void *) msg); + n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash, + client_send_member_cb, + (void *) msg); + return n; +} + + +/** + * Send message to a random origin client or a random member client. + * + * @param grp The group to send @a msg to. + * @param msg Message to send. + */ +static int +client_send_random (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) +{ + int n = 0; + n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb, + (void *) msg); + if (n <= 0) + n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb, + (void *) msg); return n; } @@ -520,10 +668,9 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash, const struct GNUNET_MessageHeader *msg) { int n = 0; - if (origins != NULL) - n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, - client_send_origin_cb, - (void *) msg); + n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, + client_send_origin_cb, + (void *) msg); return n; } @@ -554,7 +701,7 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf) * @param msg Message. */ static void -cadet_send_msg (struct Channel *chn, const struct GNUNET_MessageHeader *msg) +cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg) { chn->tmit_handle = GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO, @@ -604,14 +751,14 @@ static void cadet_send_join_request (struct Member *mem) { mem->origin_channel = cadet_channel_create (&mem->grp, &mem->origin); - cadet_send_msg (mem->origin_channel, &mem->join_req->header); + cadet_send_channel (mem->origin_channel, &mem->join_req->header); uint32_t i; for (i = 0; i < mem->relay_count; i++) { struct Channel * chn = cadet_channel_create (&mem->grp, &mem->relays[i]); - cadet_send_msg (chn, &mem->join_req->header); + cadet_send_channel (chn, &mem->join_req->header); } } @@ -627,7 +774,7 @@ cadet_send_join_decision_cb (void *cls, if (0 == memcmp (&hdcsn->member_key, &chn->member_key, sizeof (chn->member_key)) && 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer))) { - cadet_send_msg (chn, &hdcsn->header); + cadet_send_channel (chn, &hdcsn->header); return GNUNET_NO; } return GNUNET_YES; @@ -651,29 +798,47 @@ cadet_send_join_decision (struct Group *grp, * Iterator callback for sending a message to origin clients. */ static int -cadet_send_members_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *channel) +cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *channel) { const struct GNUNET_MessageHeader *msg = cls; struct Channel *chn = channel; if (JOIN_ADMITTED == chn->join_status) - cadet_send_msg (chn, msg); + cadet_send_channel (chn, msg); return GNUNET_YES; } +/** + * Send message to all connected children. + */ static int -cadet_send_members (struct GNUNET_HashCode *pub_key_hash, - const struct GNUNET_MessageHeader *msg) +cadet_send_children (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) { int n = 0; if (channels_in != NULL) n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash, - cadet_send_members_cb, - (void *) msg); + cadet_send_cb, (void *) msg); + return n; +} + + +/** + * Send message to all connected parents. + */ +static int +cadet_send_parents (struct GNUNET_HashCode *pub_key_hash, + const struct GNUNET_MessageHeader *msg) +{ + int n = 0; + if (channels_in != NULL) + n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash, + cadet_send_cb, (void *) msg); return n; } + /** * Handle a connecting client starting an origin. */ @@ -866,7 +1031,7 @@ static void client_send_join_decision (struct Member *mem, const struct MulticastJoinDecisionMessageHeader *hdcsn) { - client_send_msg (&mem->grp, &hdcsn->header); + client_send_group (&mem->grp, &hdcsn->header); const struct MulticastJoinDecisionMessage * dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; @@ -959,9 +1124,9 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, } GNUNET_assert (GNUNET_YES == grp->is_origin); orig = (struct Origin *) grp; + /* FIXME: yucky, should use separate message structs for P2P and CS! */ out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (m); - out->fragment_id = GNUNET_htonll (++orig->max_fragment_id); out->purpose.size = htonl (ntohs (out->header.size) - sizeof (out->header) @@ -976,7 +1141,7 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, } client_send_all (&grp->pub_key_hash, &out->header); - cadet_send_members (&grp->pub_key_hash, &out->header); + cadet_send_children (&grp->pub_key_hash, &out->header); GNUNET_free (out); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -993,7 +1158,6 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group); struct Member *mem; struct GNUNET_MULTICAST_RequestHeader *out; - if (NULL == grp) { GNUNET_break (0); @@ -1002,9 +1166,9 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, } GNUNET_assert (GNUNET_NO == grp->is_origin); mem = (struct Member *) grp; + /* FIXME: yucky, should use separate message structs for P2P and CS! */ out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m); - out->member_key = mem->pub_key; out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id); out->purpose.size = htonl (ntohs (out->header.size) @@ -1023,7 +1187,7 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, { /* No local origins, send to remote origin */ if (NULL != mem->origin_channel) { - cadet_send_msg (mem->origin_channel, &out->header); + cadet_send_channel (mem->origin_channel, &out->header); } else { @@ -1039,6 +1203,207 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, /** + * Incoming replay request from a client. + */ +static void +client_recv_replay_request (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) +{ + struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group); + struct Member *mem; + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_origin); + mem = (struct Member *) grp; + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL == grp_replay_req) + { + grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (replay_req_client, + &grp->pub_key_hash, grp_replay_req, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + struct MulticastReplayRequestMessage * + rep = (struct MulticastReplayRequestMessage *) m; + struct GNUNET_HashCode key_hash; + replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset, + rep->flags, &key_hash); + GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + if (0 == client_send_origin (&grp->pub_key_hash, m)) + { /* No local origin, replay from remote members / origin. */ + if (NULL != mem->origin_channel) + { + cadet_send_channel (mem->origin_channel, m); + } + else + { + /* FIXME: not yet connected to origin */ + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + } + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +static int +cadet_send_replay_response_cb (void *cls, + const struct GNUNET_HashCode *key_hash, + void *value) +{ + struct Channel *chn = value; + struct GNUNET_MessageHeader *msg = cls; + + cadet_send_channel (chn, msg); + return GNUNET_OK; +} + + +static int +client_send_replay_response_cb (void *cls, + const struct GNUNET_HashCode *key_hash, + void *value) +{ + struct GNUNET_SERVER_Client *client = value; + struct GNUNET_MessageHeader *msg = cls; + + client_send (client, msg); + return GNUNET_OK; +} + + +/** + * End of replay response from a client. + */ +static void +client_recv_replay_response_end (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) +{ + struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group); + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct MulticastReplayResponseMessage * + res = (struct MulticastReplayResponseMessage *) m; + + struct GNUNET_HashCode key_hash; + replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset, + res->flags, &key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &grp->pub_key_hash); + if (NULL != grp_replay_req_cadet) + { + GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash); + } + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL != grp_replay_req_client) + { + GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash); + } + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Incoming replay response from a client. + * + * Respond with a multicast message on success, or otherwise with an error code. + */ +static void +client_recv_replay_response (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) +{ + struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group); + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + + struct MulticastReplayResponseMessage * + res = (struct MulticastReplayResponseMessage *) m; + + const struct GNUNET_MessageHeader *msg = m; + if (GNUNET_MULTICAST_REC_OK == res->error_code) + { + msg = (struct GNUNET_MessageHeader *) &res[1]; + } + + struct GNUNET_HashCode key_hash; + replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset, + res->flags, &key_hash); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &grp->pub_key_hash); + if (NULL != grp_replay_req_cadet) + { + GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash, + cadet_send_replay_response_cb, + (void *) msg); + } + if (GNUNET_MULTICAST_REC_OK == res->error_code) + { + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client, + &grp->pub_key_hash); + if (NULL != grp_replay_req_client) + { + GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash, + client_send_replay_response_cb, + (void *) msg); + } + } + else + { + client_recv_replay_response_end (cls, client, m); + return; + } + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + +/** + * Incoming replay request from a client. + */ +static void +client_recv_replay_request_cancel (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) +{ + struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group); +} + + +/** + * Incoming replay request from a client. + */ +static void +client_recv_membership_test_result (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) +{ + struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group); +} + + +/** * A new client connected. */ static void @@ -1053,22 +1418,37 @@ client_notify_connect (void *cls, struct GNUNET_SERVER_Client *client) * Message handlers for the server. */ static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { - { &client_recv_origin_start, NULL, + { client_recv_origin_start, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 }, - { &client_recv_member_join, NULL, + { client_recv_member_join, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 }, - { &client_recv_join_decision, NULL, + { client_recv_join_decision, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 }, - { &client_recv_multicast_message, NULL, + { client_recv_multicast_message, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 }, - { &client_recv_multicast_request, NULL, + { client_recv_multicast_request, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 }, - {NULL, NULL, 0, 0} + { client_recv_replay_request, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 }, + + { client_recv_replay_request_cancel, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL, 0 }, + + { client_recv_replay_response, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 }, + + { client_recv_replay_response_end, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END, 0 }, + + { client_recv_membership_test_result, NULL, + GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT, 0 }, + + { NULL, NULL, 0, 0 } }; @@ -1107,6 +1487,9 @@ cadet_notify_channel_end (void *cls, mem->origin_channel = NULL; } } + + while (GNUNET_YES == replay_req_remove_cadet (chn)); + GNUNET_free (chn); } @@ -1320,12 +1703,99 @@ cadet_recv_request (void *cls, /** + * Incoming multicast replay request from CADET. + */ +int +cadet_recv_replay_request (void *cls, + struct GNUNET_CADET_Channel *channel, + void **ctx, + const struct GNUNET_MessageHeader *m) +{ + struct MulticastReplayRequestMessage rep; + uint16_t size = ntohs (m->size); + if (size < sizeof (rep)) + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + struct Channel *chn = *ctx; + + memcpy (&rep, m, sizeof (rep)); + memcpy (&rep.member_key, &chn->member_key, sizeof (chn->member_key)); + + struct GNUNET_CONTAINER_MultiHashMap * + grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet, + &chn->grp->pub_key_hash); + if (NULL == grp_replay_req) + { + grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + GNUNET_CONTAINER_multihashmap_put (replay_req_cadet, + &chn->grp->pub_key_hash, grp_replay_req, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); + } + struct GNUNET_HashCode key_hash; + replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset, + rep.flags, &key_hash); + GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + client_send_random (&chn->group_key_hash, &rep.header); + return GNUNET_OK; +} + + +/** + * Incoming multicast replay request cancellation from CADET. + */ +int +cadet_recv_replay_request_cancel (void *cls, + struct GNUNET_CADET_Channel *channel, + void **ctx, + const struct GNUNET_MessageHeader *m) +{ + +} + + +/** + * Incoming multicast replay response from CADET. + */ +int +cadet_recv_replay_response (void *cls, + struct GNUNET_CADET_Channel *channel, + void **ctx, + const struct GNUNET_MessageHeader *m) +{ + struct Channel *chn = *ctx; + + /* @todo FIXME: got replay error response, send request to other members */ + + return GNUNET_OK; +} + + +/** * Message handlers for CADET. */ static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = { - { &cadet_recv_join_request, GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 }, - { &cadet_recv_message, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 }, - { &cadet_recv_request, GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 }, + { cadet_recv_join_request, + GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 }, + + { cadet_recv_message, + GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 }, + + { cadet_recv_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 }, + + { cadet_recv_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 }, + + { cadet_recv_replay_request_cancel, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL, 0 }, + + { cadet_recv_replay_response, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 }, + { NULL, 0, 0 } }; @@ -1350,6 +1820,8 @@ core_connected_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); + replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); + replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO); cadet = GNUNET_CADET_connect (cfg, NULL, &cadet_notify_channel_new, diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h index 5dc803952a..497d67683c 100644 --- a/src/multicast/multicast.h +++ b/src/multicast/multicast.h @@ -27,6 +27,9 @@ #ifndef MULTICAST_H #define MULTICAST_H +#include "platform.h" +#include "gnunet_multicast_service.h" + GNUNET_NETWORK_STRUCT_BEGIN @@ -157,49 +160,91 @@ struct MulticastMembershipTestResultMessage /** - * Message sent from the client to the service to give the service - * a replayed message. + * Message sent from the client to the service OR the service to the + * client asking for a message fragment to be replayed. */ -struct MulticastReplayResponseMessage +struct MulticastReplayRequestMessage { /** - * + * The message type can be either + * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST or + * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL. */ struct GNUNET_MessageHeader header; /** - * Unique ID that identifies the associated replay session. + * S->C: Public key of the member requesting replay. + * C->S: Unused. */ - uint32_t uid; + struct GNUNET_CRYPTO_EcdsaPublicKey member_key; /** - * An `enum GNUNET_MULTICAST_ReplayErrorCode` identifying issues (in NBO). + * ID of the message that is being requested. */ - int32_t error_code; + uint64_t fragment_id; - /* followed by replayed message */ + /** + * ID of the message that is being requested. + */ + uint64_t message_id; + + /** + * Offset of the fragment that is being requested. + */ + uint64_t fragment_offset; + /** + * Additional flags for the request. + */ + uint64_t flags; + + /** + * Replay request ID. + */ + uint32_t uid; }; /** - * Message sent from the client to the service to notify the service - * about the end of a replay session. + * Message sent from the client to the service to give the service + * a replayed message. */ -struct MulticastReplayEndMessage +struct MulticastReplayResponseMessage { /** - * + * Type: GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE + * or GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END */ struct GNUNET_MessageHeader header; /** - * Unique ID that identifies the associated replay session. + * ID of the message that is being requested. */ - uint32_t uid; + uint64_t fragment_id; + + /** + * ID of the message that is being requested. + */ + uint64_t message_id; + + /** + * Offset of the fragment that is being requested. + */ + uint64_t fragment_offset; + /** + * Additional flags for the request. + */ + uint64_t flags; + + /** + * An `enum GNUNET_MULTICAST_ReplayErrorCode` identifying issues (in NBO). + */ + int32_t error_code; + + /* followed by replayed message */ }; @@ -253,6 +298,7 @@ struct MulticastMemberJoinMessage /* Followed by struct GNUNET_MessageHeader join_msg */ }; + #if NOT_USED /** * Message sent from the client to the service to broadcast to all group @@ -323,44 +369,6 @@ struct MulticastJoinMessage /** - * Message sent from the client to the service OR the service to the - * client asking for a message fragment to be replayed. - */ -struct MulticastReplayRequestMessage -{ - - /** - * The message type can be either - * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST or - * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL. - */ - struct GNUNET_MessageHeader header; - - /** - * Replay request ID. - */ - uint32_t uid; - - /** - * ID of the message that is being requested. - */ - uint64_t message_id; - - /** - * Offset of the fragment that is being requested. - */ - uint64_t fragment_offset; - - /** - * Additional flags for the request. - */ - uint64_t flags; - -}; - - - -/** * Message sent from the client to the service to unicast to the group origin. */ struct MulticastUnicastToOriginMessage diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index ef4cc73e79..9f0c77f365 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -137,6 +137,11 @@ struct GNUNET_MULTICAST_Member GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb; + /** + * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle * + */ + struct GNUNET_CONTAINER_MultiHashMap *replay_reqs; + uint64_t next_fragment_id; }; @@ -176,6 +181,8 @@ struct GNUNET_MULTICAST_MembershipTestHandle */ struct GNUNET_MULTICAST_ReplayHandle { + struct GNUNET_MULTICAST_Group *grp; + struct MulticastReplayRequestMessage req; }; @@ -184,6 +191,9 @@ struct GNUNET_MULTICAST_ReplayHandle */ struct GNUNET_MULTICAST_MemberReplayHandle { + + GNUNET_MULTICAST_ResultCallback result_cb; + void *result_cls; }; @@ -263,11 +273,14 @@ group_recv_message (void *cls, struct GNUNET_MULTICAST_MessageHeader * mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg; + if (GNUNET_YES == grp->is_disconnecting) + return; |