aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_multicast_service.h4
-rw-r--r--src/include/gnunet_protocols.h10
-rw-r--r--src/multicast/gnunet-service-multicast.c562
-rw-r--r--src/multicast/multicast.h114
-rw-r--r--src/multicast/multicast_api.c236
-rw-r--r--src/multicast/test_multicast.c211
-rw-r--r--src/util/client_manager.c4
7 files changed, 940 insertions, 201 deletions
diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h
index 3b53a93da0..c618f3dd5f 100644
--- a/src/include/gnunet_multicast_service.h
+++ b/src/include/gnunet_multicast_service.h
@@ -750,7 +750,9 @@ struct GNUNET_MULTICAST_MemberReplayHandle;
struct GNUNET_MULTICAST_MemberReplayHandle *
GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member,
uint64_t fragment_id,
- uint64_t flags);
+ uint64_t flags,
+ GNUNET_MULTICAST_ResultCallback result_cb,
+ void *result_cb_cls);
/**
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 0063e1ff7e..7e36ebfe99 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2437,6 +2437,16 @@ extern "C"
*/
#define GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT 762
+/**
+ * C<->S<->T: Replay response from a group member to another member.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE 763
+
+/**
+ * C<->S: End of replay response.
+ */
+#define GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END 764
+
/*******************************************************************************
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index dee5738484..e7ee92cdfa 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -88,18 +88,33 @@ static struct GNUNET_CONTAINER_MultiHashMap *members;
static struct GNUNET_CONTAINER_MultiHashMap *group_members;
/**
- * Incoming CADET channels.
+ * Incoming CADET channels with connected children in the tree.
* Group's pub_key_hash -> struct Channel * (multi)
*/
static struct GNUNET_CONTAINER_MultiHashMap *channels_in;
/**
- * Outgoing CADET channels.
+ * Outgoing CADET channels connecting to parents in the tree.
* Group's pub_key_hash -> struct Channel * (multi)
*/
static struct GNUNET_CONTAINER_MultiHashMap *channels_out;
/**
+ * Incoming replay requests from CADET.
+ * Group's pub_key_hash ->
+ * H(fragment_id, message_id, fragment_offset, flags) -> struct Channel *
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *replay_req_cadet;
+
+/**
+ * Incoming replay requests from clients.
+ * Group's pub_key_hash ->
+ * H(fragment_id, message_id, fragment_offset, flags) -> struct GNUNET_SERVER_Client *
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *replay_req_client;
+
+
+/**
* Join status of a remote peer.
*/
enum JoinStatus
@@ -294,6 +309,15 @@ struct Member
};
+struct ReplayRequestKey
+{
+ uint64_t fragment_id;
+ uint64_t message_id;
+ uint64_t fragment_offset;
+ uint64_t flags;
+};
+
+
/**
* Task run during shutdown.
*
@@ -375,6 +399,95 @@ cleanup_group (struct Group *grp)
}
+void
+replay_key_hash (uint64_t fragment_id, uint64_t message_id,
+ uint64_t fragment_offset, uint64_t flags,
+ struct GNUNET_HashCode *key_hash)
+{
+ struct ReplayRequestKey key = {
+ .fragment_id = fragment_id,
+ .message_id = message_id,
+ .fragment_offset = fragment_offset,
+ .flags = flags,
+ };
+ GNUNET_CRYPTO_hash (&key, sizeof (key), key_hash);
+}
+
+
+/**
+ * Remove channel from replay request hashmap.
+ *
+ * @param chn
+ * Channel to remove.
+ *
+ * @return #GNUNET_YES if there are more entries to process,
+ * #GNUNET_NO when reached end of hashmap.
+ */
+static int
+replay_req_remove_cadet (struct Channel *chn)
+{
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+ &chn->grp->pub_key_hash);
+ if (NULL == grp_replay_req)
+ return GNUNET_NO;
+
+ struct GNUNET_CONTAINER_MultiHashMapIterator *
+ it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
+ struct GNUNET_HashCode key;
+ const struct Channel *c;
+ while (GNUNET_YES
+ == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
+ (const void **) &c))
+ {
+ if (c == chn)
+ {
+ GNUNET_CONTAINER_multihashmap_remove (grp_replay_req, &key, chn);
+ return GNUNET_YES;
+ }
+ }
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
+ return GNUNET_NO;
+}
+
+
+/**
+ * Remove client from replay request hashmap.
+ *
+ * @param client
+ * Client to remove.
+ *
+ * @return #GNUNET_YES if there are more entries to process,
+ * #GNUNET_NO when reached end of hashmap.
+ */
+static int
+replay_req_remove_client (struct Group *grp, struct GNUNET_SERVER_Client *client)
+{
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
+ &grp->pub_key_hash);
+ if (NULL == grp_replay_req)
+ return GNUNET_NO;
+
+ struct GNUNET_CONTAINER_MultiHashMapIterator *
+ it = GNUNET_CONTAINER_multihashmap_iterator_create (grp_replay_req);
+ struct GNUNET_HashCode key;
+ const struct GNUNET_SERVER_Client *c;
+ while (GNUNET_YES
+ == GNUNET_CONTAINER_multihashmap_iterator_next (it, &key,
+ (const void **) &c))
+ {
+ if (c == client)
+ {
+ GNUNET_CONTAINER_multihashmap_remove (replay_req_client, &key, client);
+ return GNUNET_YES;
+ }
+ }
+ GNUNET_CONTAINER_multihashmap_iterator_destroy (it);
+ return GNUNET_NO;
+}
+
+
/**
* Called whenever a client is disconnected.
*
@@ -417,6 +530,8 @@ client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
cl = cl->next;
}
+ while (GNUNET_YES == replay_req_remove_client (grp, client));
+
if (NULL == grp->clients_head)
{ /* Last client disconnected. */
#if FIXME
@@ -434,14 +549,29 @@ client_notify_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
/**
+ * Send message to a client.
+ */
+static void
+client_send (struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Sending message to client.\n", client);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, msg, GNUNET_NO);
+}
+
+
+/**
* Send message to all clients connected to the group.
*/
static void
-client_send_msg (const struct Group *grp,
- const struct GNUNET_MessageHeader *msg)
+client_send_group (const struct Group *grp,
+ const struct GNUNET_MessageHeader *msg)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Sending message to clients.\n", grp);
+ "%p Sending message to all clients of the group.\n", grp);
struct ClientList *cl = grp->clients_head;
while (NULL != cl)
@@ -463,7 +593,7 @@ client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
const struct GNUNET_MessageHeader *msg = cls;
struct Member *orig = origin;
- client_send_msg (&orig->grp, msg);
+ client_send_group (&orig->grp, msg);
return GNUNET_YES;
}
@@ -480,7 +610,7 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
if (NULL != mem->join_dcsn)
{ /* Only send message to admitted members */
- client_send_msg (&mem->grp, msg);
+ client_send_group (&mem->grp, msg);
}
return GNUNET_YES;
}
@@ -497,14 +627,32 @@ client_send_all (struct GNUNET_HashCode *pub_key_hash,
const struct GNUNET_MessageHeader *msg)
{
int n = 0;
- if (origins != NULL)
- n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
- client_send_origin_cb,
- (void *) msg);
- if (members != NULL)
- n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
- client_send_member_cb,
- (void *) msg);
+ n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
+ client_send_origin_cb,
+ (void *) msg);
+ n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
+ client_send_member_cb,
+ (void *) msg);
+ return n;
+}
+
+
+/**
+ * Send message to a random origin client or a random member client.
+ *
+ * @param grp The group to send @a msg to.
+ * @param msg Message to send.
+ */
+static int
+client_send_random (struct GNUNET_HashCode *pub_key_hash,
+ const struct GNUNET_MessageHeader *msg)
+{
+ int n = 0;
+ n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
+ (void *) msg);
+ if (n <= 0)
+ n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
+ (void *) msg);
return n;
}
@@ -520,10 +668,9 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash,
const struct GNUNET_MessageHeader *msg)
{
int n = 0;
- if (origins != NULL)
- n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
- client_send_origin_cb,
- (void *) msg);
+ n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
+ client_send_origin_cb,
+ (void *) msg);
return n;
}
@@ -554,7 +701,7 @@ cadet_notify_transmit_ready (void *cls, size_t buf_size, void *buf)
* @param msg Message.
*/
static void
-cadet_send_msg (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
+cadet_send_channel (struct Channel *chn, const struct GNUNET_MessageHeader *msg)
{
chn->tmit_handle
= GNUNET_CADET_notify_transmit_ready (chn->channel, GNUNET_NO,
@@ -604,14 +751,14 @@ static void
cadet_send_join_request (struct Member *mem)
{
mem->origin_channel = cadet_channel_create (&mem->grp, &mem->origin);
- cadet_send_msg (mem->origin_channel, &mem->join_req->header);
+ cadet_send_channel (mem->origin_channel, &mem->join_req->header);
uint32_t i;
for (i = 0; i < mem->relay_count; i++)
{
struct Channel *
chn = cadet_channel_create (&mem->grp, &mem->relays[i]);
- cadet_send_msg (chn, &mem->join_req->header);
+ cadet_send_channel (chn, &mem->join_req->header);
}
}
@@ -627,7 +774,7 @@ cadet_send_join_decision_cb (void *cls,
if (0 == memcmp (&hdcsn->member_key, &chn->member_key, sizeof (chn->member_key))
&& 0 == memcmp (&hdcsn->peer, &chn->peer, sizeof (chn->peer)))
{
- cadet_send_msg (chn, &hdcsn->header);
+ cadet_send_channel (chn, &hdcsn->header);
return GNUNET_NO;
}
return GNUNET_YES;
@@ -651,29 +798,47 @@ cadet_send_join_decision (struct Group *grp,
* Iterator callback for sending a message to origin clients.
*/
static int
-cadet_send_members_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *channel)
+cadet_send_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+ void *channel)
{
const struct GNUNET_MessageHeader *msg = cls;
struct Channel *chn = channel;
if (JOIN_ADMITTED == chn->join_status)
- cadet_send_msg (chn, msg);
+ cadet_send_channel (chn, msg);
return GNUNET_YES;
}
+/**
+ * Send message to all connected children.
+ */
static int
-cadet_send_members (struct GNUNET_HashCode *pub_key_hash,
- const struct GNUNET_MessageHeader *msg)
+cadet_send_children (struct GNUNET_HashCode *pub_key_hash,
+ const struct GNUNET_MessageHeader *msg)
{
int n = 0;
if (channels_in != NULL)
n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_in, pub_key_hash,
- cadet_send_members_cb,
- (void *) msg);
+ cadet_send_cb, (void *) msg);
+ return n;
+}
+
+
+/**
+ * Send message to all connected parents.
+ */
+static int
+cadet_send_parents (struct GNUNET_HashCode *pub_key_hash,
+ const struct GNUNET_MessageHeader *msg)
+{
+ int n = 0;
+ if (channels_in != NULL)
+ n += GNUNET_CONTAINER_multihashmap_get_multiple (channels_out, pub_key_hash,
+ cadet_send_cb, (void *) msg);
return n;
}
+
/**
* Handle a connecting client starting an origin.
*/
@@ -866,7 +1031,7 @@ static void
client_send_join_decision (struct Member *mem,
const struct MulticastJoinDecisionMessageHeader *hdcsn)
{
- client_send_msg (&mem->grp, &hdcsn->header);
+ client_send_group (&mem->grp, &hdcsn->header);
const struct MulticastJoinDecisionMessage *
dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
@@ -959,9 +1124,9 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
}
GNUNET_assert (GNUNET_YES == grp->is_origin);
orig = (struct Origin *) grp;
+
/* FIXME: yucky, should use separate message structs for P2P and CS! */
out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (m);
-
out->fragment_id = GNUNET_htonll (++orig->max_fragment_id);
out->purpose.size = htonl (ntohs (out->header.size)
- sizeof (out->header)
@@ -976,7 +1141,7 @@ client_recv_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
}
client_send_all (&grp->pub_key_hash, &out->header);
- cadet_send_members (&grp->pub_key_hash, &out->header);
+ cadet_send_children (&grp->pub_key_hash, &out->header);
GNUNET_free (out);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -993,7 +1158,6 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
struct Member *mem;
struct GNUNET_MULTICAST_RequestHeader *out;
-
if (NULL == grp)
{
GNUNET_break (0);
@@ -1002,9 +1166,9 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
}
GNUNET_assert (GNUNET_NO == grp->is_origin);
mem = (struct Member *) grp;
+
/* FIXME: yucky, should use separate message structs for P2P and CS! */
out = (struct GNUNET_MULTICAST_RequestHeader *) GNUNET_copy_message (m);
-
out->member_key = mem->pub_key;
out->fragment_id = GNUNET_ntohll (++mem->max_fragment_id);
out->purpose.size = htonl (ntohs (out->header.size)
@@ -1023,7 +1187,7 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
{ /* No local origins, send to remote origin */
if (NULL != mem->origin_channel)
{
- cadet_send_msg (mem->origin_channel, &out->header);
+ cadet_send_channel (mem->origin_channel, &out->header);
}
else
{
@@ -1039,6 +1203,207 @@ client_recv_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
/**
+ * Incoming replay request from a client.
+ */
+static void
+client_recv_replay_request (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
+ struct Member *mem;
+ if (NULL == grp)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ GNUNET_assert (GNUNET_NO == grp->is_origin);
+ mem = (struct Member *) grp;
+
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
+ &grp->pub_key_hash);
+ if (NULL == grp_replay_req)
+ {
+ grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ GNUNET_CONTAINER_multihashmap_put (replay_req_client,
+ &grp->pub_key_hash, grp_replay_req,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+ struct MulticastReplayRequestMessage *
+ rep = (struct MulticastReplayRequestMessage *) m;
+ struct GNUNET_HashCode key_hash;
+ replay_key_hash (rep->fragment_id, rep->message_id, rep->fragment_offset,
+ rep->flags, &key_hash);
+ GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+ if (0 == client_send_origin (&grp->pub_key_hash, m))
+ { /* No local origin, replay from remote members / origin. */
+ if (NULL != mem->origin_channel)
+ {
+ cadet_send_channel (mem->origin_channel, m);
+ }
+ else
+ {
+ /* FIXME: not yet connected to origin */
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ }
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static int
+cadet_send_replay_response_cb (void *cls,
+ const struct GNUNET_HashCode *key_hash,
+ void *value)
+{
+ struct Channel *chn = value;
+ struct GNUNET_MessageHeader *msg = cls;
+
+ cadet_send_channel (chn, msg);
+ return GNUNET_OK;
+}
+
+
+static int
+client_send_replay_response_cb (void *cls,
+ const struct GNUNET_HashCode *key_hash,
+ void *value)
+{
+ struct GNUNET_SERVER_Client *client = value;
+ struct GNUNET_MessageHeader *msg = cls;
+
+ client_send (client, msg);
+ return GNUNET_OK;
+}
+
+
+/**
+ * End of replay response from a client.
+ */
+static void
+client_recv_replay_response_end (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
+ if (NULL == grp)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct MulticastReplayResponseMessage *
+ res = (struct MulticastReplayResponseMessage *) m;
+
+ struct GNUNET_HashCode key_hash;
+ replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
+ res->flags, &key_hash);
+
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+ &grp->pub_key_hash);
+ if (NULL != grp_replay_req_cadet)
+ {
+ GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_cadet, &key_hash);
+ }
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
+ &grp->pub_key_hash);
+ if (NULL != grp_replay_req_client)
+ {
+ GNUNET_CONTAINER_multihashmap_remove_all (grp_replay_req_client, &key_hash);
+ }
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Incoming replay response from a client.
+ *
+ * Respond with a multicast message on success, or otherwise with an error code.
+ */
+static void
+client_recv_replay_response (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
+ if (NULL == grp)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+
+ struct MulticastReplayResponseMessage *
+ res = (struct MulticastReplayResponseMessage *) m;
+
+ const struct GNUNET_MessageHeader *msg = m;
+ if (GNUNET_MULTICAST_REC_OK == res->error_code)
+ {
+ msg = (struct GNUNET_MessageHeader *) &res[1];
+ }
+
+ struct GNUNET_HashCode key_hash;
+ replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
+ res->flags, &key_hash);
+
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_replay_req_cadet = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+ &grp->pub_key_hash);
+ if (NULL != grp_replay_req_cadet)
+ {
+ GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_cadet, &key_hash,
+ cadet_send_replay_response_cb,
+ (void *) msg);
+ }
+ if (GNUNET_MULTICAST_REC_OK == res->error_code)
+ {
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_replay_req_client = GNUNET_CONTAINER_multihashmap_get (replay_req_client,
+ &grp->pub_key_hash);
+ if (NULL != grp_replay_req_client)
+ {
+ GNUNET_CONTAINER_multihashmap_get_multiple (grp_replay_req_client, &key_hash,
+ client_send_replay_response_cb,
+ (void *) msg);
+ }
+ }
+ else
+ {
+ client_recv_replay_response_end (cls, client, m);
+ return;
+ }
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/**
+ * Incoming replay request from a client.
+ */
+static void
+client_recv_replay_request_cancel (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
+}
+
+
+/**
+ * Incoming replay request from a client.
+ */
+static void
+client_recv_membership_test_result (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Group *grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
+}
+
+
+/**
* A new client connected.
*/
static void
@@ -1053,22 +1418,37 @@ client_notify_connect (void *cls, struct GNUNET_SERVER_Client *client)
* Message handlers for the server.
*/
static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
- { &client_recv_origin_start, NULL,
+ { client_recv_origin_start, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
- { &client_recv_member_join, NULL,
+ { client_recv_member_join, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
- { &client_recv_join_decision, NULL,
+ { client_recv_join_decision, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
- { &client_recv_multicast_message, NULL,
+ { client_recv_multicast_message, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
- { &client_recv_multicast_request, NULL,
+ { client_recv_multicast_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
- {NULL, NULL, 0, 0}
+ { client_recv_replay_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
+
+ { client_recv_replay_request_cancel, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL, 0 },
+
+ { client_recv_replay_response, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
+
+ { client_recv_replay_response_end, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END, 0 },
+
+ { client_recv_membership_test_result, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT, 0 },
+
+ { NULL, NULL, 0, 0 }
};
@@ -1107,6 +1487,9 @@ cadet_notify_channel_end (void *cls,
mem->origin_channel = NULL;
}
}
+
+ while (GNUNET_YES == replay_req_remove_cadet (chn));
+
GNUNET_free (chn);
}
@@ -1320,12 +1703,99 @@ cadet_recv_request (void *cls,
/**
+ * Incoming multicast replay request from CADET.
+ */
+int
+cadet_recv_replay_request (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ void **ctx,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct MulticastReplayRequestMessage rep;
+ uint16_t size = ntohs (m->size);
+ if (size < sizeof (rep))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+ struct Channel *chn = *ctx;
+
+ memcpy (&rep, m, sizeof (rep));
+ memcpy (&rep.member_key, &chn->member_key, sizeof (chn->member_key));
+
+ struct GNUNET_CONTAINER_MultiHashMap *
+ grp_replay_req = GNUNET_CONTAINER_multihashmap_get (replay_req_cadet,
+ &chn->grp->pub_key_hash);
+ if (NULL == grp_replay_req)
+ {
+ grp_replay_req = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ GNUNET_CONTAINER_multihashmap_put (replay_req_cadet,
+ &chn->grp->pub_key_hash, grp_replay_req,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
+ }
+ struct GNUNET_HashCode key_hash;
+ replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
+ rep.flags, &key_hash);
+ GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+
+ client_send_random (&chn->group_key_hash, &rep.header);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Incoming multicast replay request cancellation from CADET.
+ */
+int
+cadet_recv_replay_request_cancel (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ void **ctx,
+ const struct GNUNET_MessageHeader *m)
+{
+
+}
+
+
+/**
+ * Incoming multicast replay response from CADET.
+ */
+int
+cadet_recv_replay_response (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ void **ctx,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Channel *chn = *ctx;
+
+ /* @todo FIXME: got replay error response, send request to other members */
+
+ return GNUNET_OK;
+}
+
+
+/**
* Message handlers for CADET.
*/
static const struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
- { &cadet_recv_join_request, GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
- { &cadet_recv_message, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
- { &cadet_recv_request, GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
+ { cadet_recv_join_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, 0 },
+
+ { cadet_recv_message,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
+
+ { cadet_recv_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
+
+ { cadet_recv_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, 0 },
+
+ { cadet_recv_replay_request_cancel,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL, 0 },
+
+ { cadet_recv_replay_response,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, 0 },
+
{ NULL, 0, 0 }
};
@@ -1350,6 +1820,8 @@ core_connected_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
group_members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
channels_in = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
channels_out = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
+ replay_req_cadet = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ replay_req_client = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
cadet = GNUNET_CADET_connect (cfg, NULL,
&cadet_notify_channel_new,
diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h
index 5dc803952a..497d67683c 100644
--- a/src/multicast/multicast.h
+++ b/src/multicast/multicast.h
@@ -27,6 +27,9 @@
#ifndef MULTICAST_H
#define MULTICAST_H
+#include "platform.h"
+#include "gnunet_multicast_service.h"
+
GNUNET_NETWORK_STRUCT_BEGIN
@@ -157,49 +160,91 @@ struct MulticastMembershipTestResultMessage
/**
- * Message sent from the client to the service to give the service
- * a replayed message.
+ * Message sent from the client to the service OR the service to the
+ * client asking for a message fragment to be replayed.
*/
-struct MulticastReplayResponseMessage
+struct MulticastReplayRequestMessage
{
/**
- *
+ * The message type can be either
+ * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST or
+ * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL.
*/
struct GNUNET_MessageHeader header;
/**
- * Unique ID that identifies the associated replay session.
+ * S->C: Public key of the member requesting replay.
+ * C->S: Unused.
*/
- uint32_t uid;
+ struct GNUNET_CRYPTO_EcdsaPublicKey member_key;
/**
- * An `enum GNUNET_MULTICAST_ReplayErrorCode` identifying issues (in NBO).
+ * ID of the message that is being requested.
*/
- int32_t error_code;
+ uint64_t fragment_id;
- /* followed by replayed message */
+ /**
+ * ID of the message that is being requested.
+ */
+ uint64_t message_id;
+
+ /**
+ * Offset of the fragment that is being requested.
+ */
+ uint64_t fragment_offset;
+ /**
+ * Additional flags for the request.
+ */
+ uint64_t flags;
+
+ /**
+ * Replay request ID.
+ */
+ uint32_t uid;
};
/**
- * Message sent from the client to the service to notify the service
- * about the end of a replay session.
+ * Message sent from the client to the service to give the service
+ * a replayed message.
*/
-struct MulticastReplayEndMessage
+struct MulticastReplayResponseMessage
{
/**
- *
+ * Type: GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE
+ * or GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END
*/
struct GNUNET_MessageHeader header;
/**
- * Unique ID that identifies the associated replay session.
+ * ID of the message that is being requested.
*/
- uint32_t uid;
+ uint64_t fragment_id;
+
+ /**
+ * ID of the message that is being requested.
+ */
+ uint64_t message_id;
+
+ /**
+ * Offset of the fragment that is being requested.
+ */
+ uint64_t fragment_offset;
+ /**
+ * Additional flags for the request.
+ */
+ uint64_t flags;
+
+ /**
+ * An `enum GNUNET_MULTICAST_ReplayErrorCode` identifying issues (in NBO).
+ */
+ int32_t error_code;
+
+ /* followed by replayed message */
};
@@ -253,6 +298,7 @@ struct MulticastMemberJoinMessage
/* Followed by struct GNUNET_MessageHeader join_msg */
};
+
#if NOT_USED
/**
* Message sent from the client to the service to broadcast to all group
@@ -323,44 +369,6 @@ struct MulticastJoinMessage
/**
- * Message sent from the client to the service OR the service to the
- * client asking for a message fragment to be replayed.
- */
-struct MulticastReplayRequestMessage
-{
-
- /**
- * The message type can be either
- * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST or
- * #GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST_CANCEL.
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * Replay request ID.
- */
- uint32_t uid;
-
- /**
- * ID of the message that is being requested.
- */
- uint64_t message_id;
-
- /**
- * Offset of the fragment that is being requested.
- */
- uint64_t fragment_offset;
-
- /**
- * Additional flags for the request.
- */
- uint64_t flags;
-
-};
-
-
-
-/**
* Message sent from the client to the service to unicast to the group origin.
*/
struct MulticastUnicastToOriginMessage
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index ef4cc73e79..9f0c77f365 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -137,6 +137,11 @@ struct GNUNET_MULTICAST_Member
GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
+ /**
+ * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
+
uint64_t next_fragment_id;
};
@@ -176,6 +181,8 @@ struct GNUNET_MULTICAST_MembershipTestHandle
*/
struct GNUNET_MULTICAST_ReplayHandle
{
+ struct GNUNET_MULTICAST_Group *grp;
+ struct MulticastReplayRequestMessage req;
};
@@ -184,6 +191,9 @@ struct GNUNET_MULTICAST_ReplayHandle
*/
struct GNUNET_MULTICAST_MemberReplayHandle
{
+
+ GNUNET_MULTICAST_ResultCallback result_cb;
+ void *result_cls;
};
@@ -263,11 +273,14 @@ group_recv_message (void *cls,
struct GNUNET_MULTICAST_MessageHeader *
mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
+ if (GNUNET_YES == grp->is_disconnecting)
+ return;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Calling message callback with a message of size %u.\n",
ntohs (mmsg->header.size));
- if (GNUNET_YES != grp->is_disconnecting && NULL != grp->message_cb)
+ if (NULL != grp->message_cb)
grp->message_cb (grp->cb_cls, mmsg);
}
@@ -297,6 +310,73 @@ origin_recv_request (void *cls,
/**
+ * Receive multicast replay request from service.
+ */
+static void
+group_recv_replay_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct