aboutsummaryrefslogtreecommitdiff
path: root/src/multicast
diff options
context:
space:
mode:
authorxrs <xrs@mail36.net>2018-01-02 17:12:39 +0100
committerxrs <xrs@mail36.net>2018-01-02 17:12:39 +0100
commit3611a5295f95fad5d9e1fdb3866e7db9ecf8f47e (patch)
treedb191de206319d924f1bb49e65cdf8a9612ba85c /src/multicast
parentcc90f1df695ccee3d35fe00fa179a856e4128009 (diff)
parent61f532f18450e0d7c72f0c17f4a20b5854cf53bf (diff)
Merge branch 'master' of ssh://gnunet.org/gnunet
Diffstat (limited to 'src/multicast')
-rw-r--r--src/multicast/Makefile.am2
-rw-r--r--src/multicast/gnunet-service-multicast.c248
-rw-r--r--src/multicast/multicast_api.c51
-rw-r--r--src/multicast/test_multicast_multipeer.c312
4 files changed, 342 insertions, 271 deletions
diff --git a/src/multicast/Makefile.am b/src/multicast/Makefile.am
index 13212bca31..48185e1a42 100644
--- a/src/multicast/Makefile.am
+++ b/src/multicast/Makefile.am
@@ -19,7 +19,7 @@ endif
lib_LTLIBRARIES = libgnunetmulticast.la
libgnunetmulticast_la_SOURCES = \
- multicast_api.c
+ multicast_api.c multicast.h
libgnunetmulticast_la_LIBADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(GN_LIBINTL) $(XLIB)
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index 2f4dc8a145..ba1086cc5b 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -137,6 +137,7 @@ struct Channel
*/
struct GNUNET_CADET_Channel *channel;
+ // FIXME: not used
/**
* CADET transmission handle.
*/
@@ -228,7 +229,7 @@ struct Group
/**
* Is the client disconnected? #GNUNET_YES or #GNUNET_NO
*/
- uint8_t disconnected;
+ uint8_t is_disconnected;
/**
* Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)?
@@ -365,6 +366,8 @@ client_send_join_decision (struct Member *mem,
static void
shutdown_task (void *cls)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "shutting down\n");
if (NULL != cadet)
{
GNUNET_CADET_disconnect (cadet);
@@ -420,6 +423,11 @@ cleanup_member (struct Member *mem)
GNUNET_free (mem->join_dcsn);
mem->join_dcsn = NULL;
}
+ if (NULL != mem->origin_channel)
+ {
+ GNUNET_CADET_channel_destroy (mem->origin_channel->channel);
+ mem->origin_channel = NULL;
+ }
GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
GNUNET_free (mem);
}
@@ -553,36 +561,47 @@ client_send (struct GNUNET_SERVICE_Client *client,
* Send message to all clients connected to the group.
*/
static void
-client_send_group (const struct Group *grp,
- const struct GNUNET_MessageHeader *msg)
+client_send_group_keep_envelope (const struct Group *grp,
+ struct GNUNET_MQ_Envelope *env)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "%p Sending message to all clients of the group.\n", grp);
+ struct ClientList *cli = grp->clients_head;
- struct ClientList *cl = grp->clients_head;
- while (NULL != cl)
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "%p Sending message to all clients of the group.\n",
+ grp);
+ while (NULL != cli)
{
- struct GNUNET_MQ_Envelope *
- env = GNUNET_MQ_msg_copy (msg);
-
- GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client),
- env);
- cl = cl->next;
+ GNUNET_MQ_send_copy (GNUNET_SERVICE_client_get_mq (cli->client),
+ env);
+ cli = cli->next;
}
}
/**
+ * Send message to all clients connected to the group and
+ * takes care of freeing @env.
+ */
+static void
+client_send_group (const struct Group *grp,
+ struct GNUNET_MQ_Envelope *env)
+{
+ client_send_group_keep_envelope (grp, env);
+ GNUNET_MQ_discard (env);
+}
+
+
+/**
* Iterator callback for sending a message to origin clients.
*/
static int
client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
void *origin)
{
- const struct GNUNET_MessageHeader *msg = cls;
+ struct GNUNET_MQ_Envelope *env = cls;
struct Member *orig = origin;
- client_send_group (&orig->group, msg);
+ client_send_group_keep_envelope (&orig->group, env);
return GNUNET_YES;
}
@@ -594,12 +613,12 @@ static int
client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
void *member)
{
- const struct GNUNET_MessageHeader *msg = cls;
+ struct GNUNET_MQ_Envelope *env = cls;
struct Member *mem = member;
if (NULL != mem->join_dcsn)
{ /* Only send message to admitted members */
- client_send_group (&mem->group, msg);
+ client_send_group_keep_envelope (&mem->group, env);
}
return GNUNET_YES;
}
@@ -615,15 +634,16 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
*/
static int
client_send_all (struct GNUNET_HashCode *pub_key_hash,
- const struct GNUNET_MessageHeader *msg)
+ struct GNUNET_MQ_Envelope *env)
{
int n = 0;
n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
client_send_origin_cb,
- (void *) msg);
+ (void *) env);
n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash,
client_send_member_cb,
- (void *) msg);
+ (void *) env);
+ GNUNET_MQ_discard (env);
return n;
}
@@ -636,14 +656,14 @@ client_send_all (struct GNUNET_HashCode *pub_key_hash,
*/
static int
client_send_random (struct GNUNET_HashCode *pub_key_hash,
- const struct GNUNET_MessageHeader *msg)
+ struct GNUNET_MQ_Envelope *env)
{
int n = 0;
n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb,
- (void *) msg);
+ (void *) env);
if (n <= 0)
n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb,
- (void *) msg);
+ (void *) env);
return n;
}
@@ -658,12 +678,12 @@ client_send_random (struct GNUNET_HashCode *pub_key_hash,
*/
static int
client_send_origin (struct GNUNET_HashCode *pub_key_hash,
- const struct GNUNET_MessageHeader *msg)
+ struct GNUNET_MQ_Envelope *env)
{
int n = 0;
n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash,
client_send_origin_cb,
- (void *) msg);
+ (void *) env);
return n;
}
@@ -677,17 +697,12 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash,
static void
client_send_ack (struct GNUNET_HashCode *pub_key_hash)
{
+ struct GNUNET_MQ_Envelope *env;
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Sending message ACK to client.\n");
-
- static struct GNUNET_MessageHeader *msg = NULL;
- if (NULL == msg)
- {
- msg = GNUNET_malloc (sizeof (*msg));
- msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
- msg->size = htons (sizeof (*msg));
- }
- client_send_all (pub_key_hash, msg);
+ env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK);
+ client_send_all (pub_key_hash, env);
}
@@ -983,7 +998,8 @@ handle_cadet_join_request (void *cls,
chn->peer = req->peer;
chn->join_status = JOIN_WAITING;
- client_send_all (&group_pub_hash, &req->header);
+ client_send_all (&group_pub_hash,
+ GNUNET_MQ_msg_copy (&req->header));
}
@@ -1102,7 +1118,8 @@ handle_cadet_message (void *cls,
{
struct Channel *chn = cls;
GNUNET_CADET_receive_done (chn->channel);
- client_send_all (&chn->group_pub_hash, &msg->header);
+ client_send_all (&chn->group_pub_hash,
+ GNUNET_MQ_msg_copy (&msg->header));
}
@@ -1153,30 +1170,32 @@ handle_cadet_request (void *cls,
{
struct Channel *chn = cls;
GNUNET_CADET_receive_done (chn->channel);
- client_send_origin (&chn->group_pub_hash, &req->header);
+ client_send_origin (&chn->group_pub_hash,
+ GNUNET_MQ_msg_copy (&req->header));
}
-static int
-check_cadet_replay_request (void *cls,
- const struct MulticastReplayRequestMessage *req)
-{
- uint16_t size = ntohs (req->header.size);
- if (size < sizeof (*req))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
-
- struct Channel *chn = cls;
- if (NULL == chn)
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
-
- return GNUNET_OK;
-}
+// FIXME: do checks in handle_cadet_replay_request
+//static int
+//check_cadet_replay_request (void *cls,
+// const struct MulticastReplayRequestMessage *req)
+//{
+// uint16_t size = ntohs (req->header.size);
+// if (size < sizeof (*req))
+// {
+// GNUNET_break_op (0);
+// return GNUNET_SYSERR;
+// }
+//
+// struct Channel *chn = cls;
+// if (NULL == chn)
+// {
+// GNUNET_break_op (0);
+// return GNUNET_SYSERR;
+// }
+//
+// return GNUNET_OK;
+//}
/**
@@ -1187,6 +1206,7 @@ handle_cadet_replay_request (void *cls,
const struct MulticastReplayRequestMessage *req)
{
struct Channel *chn = cls;
+
GNUNET_CADET_receive_done (chn->channel);
struct MulticastReplayRequestMessage rep = *req;
@@ -1203,12 +1223,16 @@ handle_cadet_replay_request (void *cls,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
}
struct GNUNET_HashCode key_hash;
- replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset,
- rep.flags, &key_hash);
+ replay_key_hash (rep.fragment_id,
+ rep.message_id,
+ rep.fragment_offset,
+ rep.flags,
+ &key_hash);
GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- client_send_random (&chn->group_pub_hash, &rep.header);
+ client_send_random (&chn->group_pub_hash,
+ GNUNET_MQ_msg_copy (&rep.header));
}
@@ -1290,10 +1314,10 @@ cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer)
struct MulticastJoinDecisionMessageHeader,
chn),
- GNUNET_MQ_hd_var_size (cadet_replay_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- struct MulticastReplayRequestMessage,
- chn),
+ GNUNET_MQ_hd_fixed_size (cadet_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage,
+ chn),
GNUNET_MQ_hd_var_size (cadet_replay_response,
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
@@ -1357,6 +1381,7 @@ handle_client_origin_start (void *cls,
grp->is_origin = GNUNET_YES;
grp->pub_key = pub_key;
grp->pub_key_hash = pub_key_hash;
+ grp->is_disconnected = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
@@ -1379,10 +1404,10 @@ handle_client_origin_start (void *cls,
struct MulticastJoinRequestMessage,
grp),
- GNUNET_MQ_hd_var_size (cadet_replay_request,
- GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
- struct MulticastReplayRequestMessage,
- grp),
+ GNUNET_MQ_hd_fixed_size (cadet_replay_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ struct MulticastReplayRequestMessage,
+ grp),
GNUNET_MQ_hd_var_size (cadet_replay_response,
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
@@ -1484,6 +1509,7 @@ handle_client_member_join (void *cls,
grp->is_origin = GNUNET_NO;
grp->pub_key = msg->group_pub_key;
grp->pub_key_hash = pub_key_hash;
+ grp->is_disconnected = GNUNET_NO;
group_set_cadet_port_hash (grp);
if (NULL == grp_mem)
@@ -1494,7 +1520,8 @@ handle_client_member_join (void *cls,
}
GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
-
+
+ // FIXME: should the members hash map have option UNIQUE_FAST?
GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
}
@@ -1509,10 +1536,11 @@ handle_client_member_join (void *cls,
char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client connected to group %s as member %s (%s).\n",
+ "Client connected to group %s as member %s (%s). size = %d\n",
GNUNET_h2s (&grp->pub_key_hash),
GNUNET_h2s2 (&mem->pub_key_hash),
- str);
+ str,
+ GNUNET_CONTAINER_multihashmap_size (members));
GNUNET_free (str);
if (NULL != mem->join_dcsn)
@@ -1567,7 +1595,9 @@ handle_client_member_join (void *cls,
GNUNET_free (mem->join_req);
mem->join_req = req;
- if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header))
+ if (0 ==
+ client_send_origin (&grp->pub_key_hash,
+ GNUNET_MQ_msg_copy (&mem->join_req->header)))
{ /* No local origins, send to remote origin */
cadet_send_join_request (mem);
}
@@ -1580,7 +1610,7 @@ static void
client_send_join_decision (struct Member *mem,
const struct MulticastJoinDecisionMessageHeader *hdcsn)
{
- client_send_group (&mem->group, &hdcsn->header);
+ client_send_group (&mem->group, GNUNET_MQ_msg_copy (&hdcsn->header));
const struct MulticastJoinDecisionMessage *
dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
@@ -1621,8 +1651,9 @@ handle_client_join_decision (void *cls,
GNUNET_SERVICE_client_drop (client);
return;
}
+ GNUNET_assert (GNUNET_NO == grp->is_disconnected);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%p Got join decision from client for group %s..\n",
+ "%p got join decision from client for group %s..\n",
grp, GNUNET_h2s (&grp->pub_key_hash));
struct GNUNET_CONTAINER_MultiHashMap *
@@ -1652,6 +1683,32 @@ handle_client_join_decision (void *cls,
}
+static void
+handle_client_part_request (void *cls,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct Client *c = cls;
+ struct GNUNET_SERVICE_Client *client = c->client;
+ struct Group *grp = c->group;
+ struct GNUNET_MQ_Envelope *env;
+
+ if (NULL == grp)
+ {
+ GNUNET_break (0);
+ GNUNET_SERVICE_client_drop (client);
+ return;
+ }
+ GNUNET_assert (GNUNET_NO == grp->is_disconnected);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p got part request from client for group %s.\n",
+ grp, GNUNET_h2s (&grp->pub_key_hash));
+ grp->is_disconnected = GNUNET_YES;
+ env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK);
+ client_send_group (grp, env);
+ GNUNET_SERVICE_client_continue (client);
+}
+
+
static int
check_client_multicast_message (void *cls,
const struct GNUNET_MULTICAST_MessageHeader *msg)
@@ -1667,6 +1724,7 @@ static void
handle_client_multicast_message (void *cls,
const struct GNUNET_MULTICAST_MessageHeader *msg)
{
+ // FIXME: what if GNUNET_YES == grp->is_disconnected? Do we allow sending messages?
struct Client *c = cls;
struct GNUNET_SERVICE_Client *client = c->client;
struct Group *grp = c->group;
@@ -1680,6 +1738,7 @@ handle_client_multicast_message (void *cls,
GNUNET_assert (GNUNET_YES == grp->is_origin);
struct Origin *orig = grp->origin;
+ // FIXME: use GNUNET_MQ_msg_copy
/* FIXME: yucky, should use separate message structs for P2P and CS! */
struct GNUNET_MULTICAST_MessageHeader *
out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header);
@@ -1696,7 +1755,7 @@ handle_client_multicast_message (void *cls,
GNUNET_assert (0);
}
- client_send_all (&grp->pub_key_hash, &out->header);
+ client_send_all (&grp->pub_key_hash, GNUNET_MQ_msg_copy (&out->header));
cadet_send_children (&grp->pub_key_hash, &out->header);
client_send_ack (&grp->pub_key_hash);
GNUNET_free (out);
@@ -1730,6 +1789,7 @@ handle_client_multicast_request (void *cls,
GNUNET_SERVICE_client_drop (client);
return;
}
+ GNUNET_assert (GNUNET_NO == grp->is_disconnected);
GNUNET_assert (GNUNET_NO == grp->is_origin);
struct Member *mem = grp->member;
@@ -1751,7 +1811,9 @@ handle_client_multicast_request (void *cls,
}
uint8_t send_ack = GNUNET_YES;
- if (0 == client_send_origin (&grp->pub_key_hash, &out->header))
+ if (0 ==
+ client_send_origin (&grp->pub_key_hash,
+ GNUNET_MQ_msg_copy (&out->header)))
{ /* No local origins, send to remote origin */
if (NULL != mem->origin_channel)
{
@@ -1792,6 +1854,7 @@ handle_client_replay_request (void *cls,
GNUNET_SERVICE_client_drop (client);
return;
}
+ GNUNET_assert (GNUNET_NO == grp->is_disconnected);
GNUNET_assert (GNUNET_NO == grp->is_origin);
struct Member *mem = grp->member;
@@ -1812,7 +1875,9 @@ handle_client_replay_request (void *cls,
GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- if (0 == client_send_origin (&grp->pub_key_hash, &rep->header))
+ if (0 ==
+ client_send_origin (&grp->pub_key_hash,
+ GNUNET_MQ_msg_copy (&rep->header)))
{ /* No local origin, replay from remote members / origin. */
if (NULL != mem->origin_channel)
{
@@ -1821,6 +1886,7 @@ handle_client_replay_request (void *cls,
else
{
/* FIXME: not yet connected to origin */
+ GNUNET_assert (0);
GNUNET_SERVICE_client_drop (client);
return;
}
@@ -1880,6 +1946,7 @@ handle_client_replay_response_end (void *cls,
GNUNET_SERVICE_client_drop (client);
return;
}
+ GNUNET_assert (GNUNET_NO == grp->is_disconnected);
struct GNUNET_HashCode key_hash;
replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset,
@@ -1939,6 +2006,7 @@ handle_client_replay_response (void *cls,
GNUNET_SERVICE_client_drop (client);
return;
}
+ GNUNET_assert (GNUNET_NO == grp->is_disconnected);
const struct GNUNET_MessageHeader *msg = &res->header;
if (GNUNET_MULTICAST_REC_OK == res->error_code)
@@ -2033,9 +2101,14 @@ client_notify_disconnect (void *cls,
grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member",
GNUNET_h2s (&grp->pub_key_hash));
+ // FIXME (due to protocol change): here we must not remove all clients,
+ // only the one we were notified about!
struct ClientList *cl = grp->clients_head;
while (NULL != cl)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "iterating clients for group %p\n",
+ grp);
if (cl->client == client)
{
GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl);
@@ -2049,16 +2122,7 @@ client_notify_disconnect (void *cls,
if (NULL == grp->clients_head)
{ /* Last client disconnected. */
-#if FIXME
- if (NULL != grp->tmit_head)
- { /* Send pending messages via CADET before cleanup. */
- transmit_message (grp);
- }
- else
-#endif
- {
- cleanup_group (grp);
- }
+ cleanup_group (grp);
}
}
@@ -2103,9 +2167,9 @@ run (void *cls,
GNUNET_SERVICE_MAIN
("multicast",
GNUNET_SERVICE_OPTION_NONE,
- run,
- client_notify_connect,
- client_notify_disconnect,
+ &run,
+ &client_notify_connect,
+ &client_notify_disconnect,
NULL,
GNUNET_MQ_hd_fixed_size (client_origin_start,
GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START,
@@ -2119,6 +2183,10 @@ GNUNET_SERVICE_MAIN
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
struct MulticastJoinDecisionMessageHeader,
NULL),
+ GNUNET_MQ_hd_fixed_size (client_part_request,
+ GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST,
+ struct GNUNET_MessageHeader,
+ NULL),
GNUNET_MQ_hd_var_size (client_multicast_message,
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
struct GNUNET_MULTICAST_MessageHeader,
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index a8b1dee40e..3c911f48ac 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -542,31 +542,12 @@ group_cleanup (struct GNUNET_MULTICAST_Group *grp)
static void
-group_disconnect (struct GNUNET_MULTICAST_Group *grp,
- GNUNET_ContinuationCallback cb,
- void *cls)
+handle_group_part_ack (void *cls,
+ const struct GNUNET_MessageHeader *msg)
{
- grp->is_disconnecting = GNUNET_YES;
- grp->disconnect_cb = cb;
- grp->disconnect_cls = cls;
+ struct GNUNET_MULTICAST_Group *grp = cls;
- if (NULL != grp->mq)
- {
- struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq);
- if (NULL != last)
- {
- GNUNET_MQ_notify_sent (last,
- (GNUNET_SCHEDULER_TaskCallback) group_cleanup, grp);
- }
- else
- {
- group_cleanup (grp);
- }
- }
- else
- {
- group_cleanup (grp);
- }
+ group_cleanup (grp);
}
@@ -779,6 +760,10 @@ origin_connect (struct GNUNET_MULTICAST_Origin *orig)
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
struct MulticastJoinRequestMessage,
grp),
+ GNUNET_MQ_hd_fixed_size (group_part_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
+ struct GNUNET_MessageHeader,
+ grp),
GNUNET_MQ_hd_fixed_size (group_replay_request,
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
struct MulticastReplayRequestMessage,
@@ -879,8 +864,13 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig,
void *stop_cls)
{
struct GNUNET_MULTICAST_Group *grp = &orig->grp;
+ struct GNUNET_MQ_Envelope *env;
- group_disconnect (grp, stop_cb, stop_cls);
+ grp->is_disconnecting = GNUNET_YES;
+ grp->disconnect_cb = stop_cb;
+ grp->disconnect_cls = stop_cls;
+ env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
+ GNUNET_MQ_send (grp->mq, env);
}
@@ -1065,6 +1055,10 @@ member_connect (struct GNUNET_MULTICAST_Member *mem)
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
struct MulticastJoinDecisionMessageHeader,
mem),
+ GNUNET_MQ_hd_fixed_size (group_part_ack,
+ GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK,
+ struct GNUNET_MessageHeader,
+ grp),
GNUNET_MQ_hd_fixed_size (group_replay_request,
GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
struct MulticastReplayRequestMessage,
@@ -1198,16 +1192,19 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
GNUNET_ContinuationCallback part_cb,
void *part_cls)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem);
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
+ struct GNUNET_MQ_Envelope *env;
mem->join_dcsn_cb = NULL;
grp->join_req_cb = NULL;
grp->message_cb = NULL;
grp->replay_msg_cb = NULL;
grp->replay_frag_cb = NULL;
-
- group_disconnect (grp, part_cb, part_cls);
+ grp->is_disconnecting = GNUNET_YES;
+ grp->disconnect_cb = part_cb;
+ grp->disconnect_cls = part_cls;
+ env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST);
+ GNUNET_MQ_send (grp->mq, env);
}
diff --git a/src/multicast/test_multicast_multipeer.c b/src/multicast/test_multicast_multipeer.c
index 0fb515f543..65c51c2e38 100644
--- a/src/multicast/test_multicast_multipeer.c
+++ b/src/multicast/test_multicast_multipeer.c
@@ -35,9 +35,10 @@
#define PEERS_REQUESTED 12
-struct multicast_peer
+struct MulticastPeerContext
{
int peer; /* peer number */
+ struct GNUNET_CRYPTO_EcdsaPrivateKey *key;
const struct GNUNET_PeerIdentity *id;
struct GNUNET_TESTBED_Operation *op; /* not yet in use */
struct GNUNET_TESTBED_Operation *pi_op; /* not yet in use */
@@ -61,7 +62,7 @@ static void service_connect (void *cls,
void *ca_result,
const char *emsg);
-static struct multicast_peer **mc_peers;
+static struct MulticastPeerContext **multicast_peers;
static struct GNUNET_TESTBED_Peer **peers;
// FIXME: refactor
@@ -69,18 +70,14 @@ static struct GNUNET_TESTBED_Operation *op[PEERS_REQUESTED];
static struct GNUNET_TESTBED_Operation *pi_op[PEERS_REQUESTED];
static struct GNUNET_MULTICAST_Origin *origin;
-static struct GNUNET_MULTICAST_Member *member[PEERS_REQUESTED]; /* first element always empty */
+static struct GNUNET_MULTICAST_Member *members[PEERS_REQUESTED]; /* first element always empty */
static struct GNUNET_SCHEDULER_Task *timeout_tid;
-static struct GNUNET_CRYPTO_EddsaPrivateKey group_key;
+//static struct GNUNET_CRYPTO_EddsaPrivateKey *group_key;
static struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key;
static struct GNUNET_HashCode group_pub_key_hash;
-static struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key[PEERS_REQUESTED];
-static struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key[PEERS_REQUESTED];
-
-
/**
* Global result for testcase.
*/
@@ -93,6 +90,8 @@ static int result;
static void
shutdown_task (void *cls)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "shutdown_task!\n");
for (int i=0;i<PEERS_REQUESTED;i++)
{
if (NULL != op[i])
@@ -107,14 +106,16 @@ shutdown_task (void *cls)
}
}
- if (NULL != mc_peers)
+ if (NULL != multicast_peers)
{
for (int i=0; i < PEERS_REQUESTED; i++)
{
- GNUNET_free (mc_peers[i]);
- mc_peers[i] = NULL;
+ GNUNET_free (multicast_peers[i]->key);
+ GNUNET_free (multicast_peers[i]);
+ multicast_peers[i] = NULL;
}
- GNUNET_free (mc_peers);
+ GNUNET_free (multicast_peers);
+ multicast_peers = NULL;
}
if (NULL != timeout_tid)
@@ -141,11 +142,11 @@ member_join_request (void *cls,
const struct GNUNET_MessageHeader *join_msg,
struct GNUNET_MULTICAST_JoinHandle *jh)
{
- struct multicast_peer *mc_peer = (struct multicast_peer*)cls;
+ struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Peer #%u (%s) sent a join request.\n",
mc_peer->peer,
- GNUNET_i2s (mc_peers[mc_peer->peer]->id));
+ GNUNET_i2s (multicast_peers[mc_peer->peer]->id));
}
@@ -154,7 +155,7 @@ notify (void *cls,
size_t *data_size,
void *data)
{
- struct multicast_peer *mc_peer = (struct multicast_peer*)cls;
+ struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls;
struct pingpong_msg *pp_msg = GNUNET_new (struct pingpong_msg);
pp_msg->peer = mc_peer->peer;
@@ -178,18 +179,18 @@ member_join_decision (void *cls,
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_msg)
{
- struct multicast_peer *mc_peer = (struct multicast_peer*)cls;
+ struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls;
struct GNUNET_MULTICAST_MemberTransmitHandle *req;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Peer #%u (%s) received a decision from origin: %s\n",
mc_peer->peer,
- GNUNET_i2s (mc_peers[mc_peer->peer]->id),
+ GNUNET_i2s (multicast_peers[mc_peer->peer]->id),
(GNUNET_YES == is_admitted)?"accepted":"rejected");
if (GNUNET_YES == is_admitted)
{
- req = GNUNET_MULTICAST_member_to_origin (member[mc_peer->peer],
+ req = GNUNET_MULTICAST_member_to_origin (members[mc_peer->peer],
0,
notify,
cls);
@@ -215,10 +216,32 @@ member_replay_msg ()
static void
+origin_disconnected_cb (void *cls)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Origin disconnected. Shutting down.\n");
+ result = GNUNET_YES;
+ GNUNET_SCHEDULER_shutdown ();
+}
+
+
+static void
+member_disconnected_cb (void *cls)
+{
+ for (int i = 1; i < PEERS_REQUESTED; ++i)
+ if (GNUNET_NO == multicast_peers[i]->test_ok)
+ return;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "All member disconnected. Stopping origin.\n");
+ GNUNET_MULTICAST_origin_stop (origin, origin_disconnected_cb, cls);
+}
+
+
+static void
member_message (void *cls,
const struct GNUNET_MULTICAST_MessageHeader *msg)
{
- struct multicast_peer *mc_peer = (struct multicast_peer*)cls;
+ struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls;
struct pingpong_msg *pp_msg = (struct pingpong_msg*) &(msg[1]);
if (PONG == pp_msg->msg && mc_peer->peer == pp_msg->peer)
@@ -226,18 +249,15 @@ member_message (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"peer #%i (%s) receives a pong\n",
mc_peer->peer,
- GNUNET_i2s (mc_peers[mc_peer->peer]->id));
-
+ GNUNET_i2s (multicast_peers[mc_peer->peer]->id));
mc_peer->test_ok = GNUNET_OK;
- }
-
- // Test for completeness of received PONGs
- for (int i=1; i<PEERS_REQUESTED; i++)
- if (GNUNET_NO == mc_peers[i]->test_ok)
- return;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "peer #%u (%s) parting from multicast group\n",
+ mc_peer->peer,
+ GNUNET_i2s (multicast_peers[mc_peer->peer]->id));
- result = GNUNET_YES;
- GNUNET_SCHEDULER_shutdown();
+ GNUNET_MULTICAST_member_part (members[mc_peer->peer], member_disconnected_cb, cls);
+ }
}
@@ -349,81 +369,53 @@ origin_message (void *cls,
static void
-multicast_da (void *cls,
- void *op_result)
+multicast_disconnect (void *cls,
+ void *op_result)
{
- struct multicast_peer *mc_peer = (struct multicast_peer*)cls;
- if (0 == mc_peer->peer)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Origin closes multicast group\n");
-
- GNUNET_MULTICAST_origin_stop (origin, NULL, cls);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "peer #%u (%s) parting from multicast group\n",
- mc_peer->peer,
- GNUNET_i2s (mc_peers[mc_peer->peer]->id));
-
- GNUNET_MULTICAST_member_part (member[mc_peer->peer], NULL, cls);
- }
}
static void *
-multicast_ca (void *cls,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
+multicast_connect (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
{
- struct multicast_peer *mc_peer = (struct multicast_peer*)cls;
+ struct MulticastPeerContext *multicast_peer = cls;
struct GNUNET_MessageHeader *join_msg;
char data[64];
- if (0 == mc_peer->peer)
+ multicast_peer->key = GNUNET_CRYPTO_ecdsa_key_create ();
+ if (0 == multicast_peer->peer)
{
- struct GNUNET_CRYPTO_EddsaPrivateKey *key = GNUNET_CRYPTO_eddsa_key_create ();
- GNUNET_CRYPTO_eddsa_key_get_public (key, &group_pub_key);
+ GNUNET_CRYPTO_eddsa_key_get_public (multicast_peer->key, &group_pub_key);
GNUNET_CRYPTO_hash (&group_pub_key, sizeof (group_pub_key), &group_pub_key_hash);
-
- group_key = *key;
-
origin = GNUNET_MULTICAST_origin_start (cfg,
- &group_key,
- 0,
- origin_join_request,
- origin_replay_frag,
- origin_replay_msg,
- origin_request,
- origin_message,
- cls);
-
- if (NULL == origin) {
+ multicast_peer->key,
+ 0,
+ origin_join_request,
+ origin_replay_frag,
+ origin_replay_msg,
+ origin_request,
+ origin_message,
+ cls);
+ if (NULL == origin)
+ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Peer #%u could not create a multicast group",
- mc_peer->peer);
+ multicast_peer->peer);
return NULL;
}
-
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Peer #%u connected as origin to group %s\n",
- mc_peer->peer,
+ multicast_peer->peer,
GNUNET_h2s (&group_pub_key_hash));
-
return origin;
}
else
{
- // Get members keys
- member_pub_key[mc_peer->peer] = GNUNET_new (struct GNUNET_CRYPTO_EcdsaPublicKey);
- member_key[mc_peer->peer] = GNUNET_CRYPTO_ecdsa_key_create ();
- GNUNET_CRYPTO_ecdsa_key_get_public (member_key[mc_peer->peer],
- member_pub_key[mc_peer->peer]);
-
sprintf(data, "Hi, I am peer #%u (%s). Can I enter?",
- mc_peer->peer,
- GNUNET_i2s (mc_peers[mc_peer->peer]->id));
+ multicast_peer->peer,
+ GNUNET_i2s (multicast_peers[multicast_peer->peer]->id));
uint8_t data_size = strlen (data) + 1;
join_msg = GNUNET_malloc (sizeof (join_msg) + data_size);
join_msg->size = htons (sizeof (join_msg) + data_size);
@@ -432,24 +424,25 @@ multicast_ca (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Peer #%u (%s) tries to join multicast group %s\n",
- mc_peer->peer,
- GNUNET_i2s (mc_peers[mc_peer->peer]->id),
+ multicast_peer->peer,
+ GNUNET_i2s (multicast_peers[multicast_peer->peer]->id),
GNUNET_h2s (&group_pub_key_hash));
- member[mc_peer->peer] = GNUNET_MULTICAST_member_join (cfg,
- &group_pub_key,
- member_key[mc_peer->peer],
- mc_peers[0]->id,
- 0,
- NULL,
- join_msg, /* join message */
- member_join_request,
- member_join_decision,
- member_replay_frag,
- member_replay_msg,
- member_message,
- cls);
- return member[mc_peer->peer];
+ members[multicast_peer->peer] =
+ GNUNET_MULTICAST_member_join (cfg,
+ &group_pub_key,
+ multicast_peer->key,
+ multicast_peers[0]->id,
+ 0,
+ NULL,
+ join_msg, /* join message */
+ member_join_request,
+ member_join_decision,
+ member_replay_frag,
+ member_replay_msg,
+ member_message,
+ cls);
+ return members[multicast_peer->peer];
}
}
@@ -460,7 +453,7 @@ peer_information_cb (void *cls,
const struct GNUNET_TESTBED_PeerInformation *pinfo,
const char *emsg)
{
- struct multicast_peer *mc_peer = (struct multicast_peer*)cls;
+ struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls;
if (NULL == pinfo) {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got no peer information\n");
@@ -468,7 +461,7 @@ peer_information_cb (void *cls,
GNUNET_SCHEDULER_shutdown ();
}
- mc_peers[mc_peer->peer]->id = pinfo->result.id;
+ multicast_peers[mc_peer->peer]->id = pinfo->result.id;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got peer information of %s (%s)\n",
@@ -478,22 +471,28 @@ peer_information_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Create peer #%u (%s)\n",
mc_peer->peer,
- GNUNET_i2s (mc_peers[mc_peer->peer]->id));
+ GNUNET_i2s (multicast_peers[mc_peer->peer]->id));
if (0 != mc_peer->peer)
{
/* connect to multicast service of members */
- op[mc_peer->peer] = GNUNET_TESTBED_service_connect (NULL, /* Closure for operation */
- peers[mc_peer->peer], /* The peer whose service to connect to */
- "multicast", /* The name of the service */
- service_connect, /* callback to call after a handle to service
- is opened */
- cls, /* closure for the above callback */
- multicast_ca, /* callback to call with peer's configuration;
- this should open the needed service connection */
- multicast_da, /* callback to be called when closing the
- opened service connection */
- cls); /* closure for the above two callbacks */
+ op[mc_peer->peer] =
+ GNUNET_TESTBED_service_connect (/* Closure for operation */
+ NULL,
+ /* The peer whose service to connect to */
+ peers[mc_peer->peer],
+ /* The name of the service */
+ "multicast",
+ /* called after a handle to service is opened */
+ service_connect,
+ /* closure for the above callback */
+ cls,
+ /* called when opening the service connection */
+ multicast_connect,
+ /* called when closing the service connection */
+ multicast_disconnect,
+ /* closure for the above two callbacks */
+ cls);
}
}
@@ -504,14 +503,14 @@ service_connect (void *cls,
void *ca_result,
const char *emsg)
{
- struct multicast_peer *mc_peer = (struct multicast_peer*)cls;
+ struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls;
if (NULL == ca_result)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Connection adapter not created for peer #%u (%s)\n",
mc_peer->peer,
- GNUNET_i2s (mc_peers[mc_peer->peer]->id));
+ GNUNET_i2s (multicast_peers[mc_peer->peer]->id));
result = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown();
@@ -525,7 +524,7 @@ service_connect (void *cls,
pi_op[i] = GNUNET_TESTBED_peer_get_information (peers[i],
GNUNET_TESTBED_PIT_IDENTITY,
peer_information_cb,
- mc_peers[i]);
+ multicast_peers[i]);
}
}
}
@@ -550,50 +549,51 @@ service_connect (void *cls,
*/
static void
testbed_master (void *cls,
- struct GNUNET_TESTBED_RunHandle *h,
- unsigned int num_peers,
- struct GNUNET_TESTBED_Peer **p,
- unsigned int links_succeeded,
- unsigned int links_failed)
+ struct GNUNET_TESTBED_RunHandle *h,
+ unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **p,
+ unsigned int links_succeeded,
+ unsigned int links_failed)
{
/* Testbed is ready with peers running and connected in a pre-defined overlay
topology (FIXME) */
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Connected to testbed_master()\n");
-
peers = p;
-
- mc_peers = GNUNET_new_array (PEERS_REQUESTED, struct multicast_peer*);
+ multicast_peers = GNUNET_new_array (PEERS_REQUESTED, struct MulticastPeerContext*);
// Create test contexts for members
for (int i = 0; i<PEERS_REQUESTED; i++)
{
- mc_peers[i] = GNUNET_new (struct multicast_peer);
- mc_peers[i]->peer = i;
- mc_peers[i]->test_ok = GNUNET_NO;
+ multicast_peers[i] = GNUNET_new (struct MulticastPeerContext);
+ multicast_peers[i]->peer = i;
+ multicast_peers[i]->test_ok = GNUNET_NO;
}
-
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Create origin peer\n");
-
- op[0] = GNUNET_TESTBED_service_connect (NULL, /* Closure for operation */
- peers[0], /* The peer whose service to connect to */
- "multicast", /* The name of the service */
- service_connect, /* callback to call after a handle to service
- is opened */
- mc_peers[0], /* closure for the above callback */
- multicast_ca, /* callback to call with peer's configuration;
- this should open the needed service connection */
- multicast_da, /* callback to be called when closing the
- opened service connection */
- mc_peers[0]); /* closure for the above two callbacks */
-
- GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); /* Schedule a new task on shutdown */
-
+ op[0] =
+ GNUNET_TESTBED_service_connect (/* Closure for operation */
+ NULL,
+ /* The peer whose service to connect to */
+ peers[0],
+ /* The name of the service */
+ "multicast",
+ /* called after a handle to service is opened */
+ service_connect,
+ /* closure for the above callback */
+ multicast_peers[0],
+ /* called when opening the service connection */
+ multicast_connect,
+ /* called when closing the service connection */
+ multicast_disconnect,
+ /* closure for the above two callbacks */
+ multicast_peers[0]);
+ /* Schedule a new task on shutdown */
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
/* Schedule the shutdown task with a delay of a few Seconds */
- timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 400),
- &timeout_task, NULL);
+ timeout_tid =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 400),
+ &timeout_task,
+ NULL);
}
@@ -617,15 +617,21 @@ main (int argc, char *argv[])
}
result = GNUNET_SYSERR;
- ret = GNUNET_TESTBED_test_run
- ("test-multicast-multipeer", /* test case name */
- config_file, /* template configuration */
- PEERS_REQUESTED, /* number of peers to start */
- 0LL, /* Event mask - set to 0 for no event notifications */
- NULL, /* Controller event callback */
- NULL, /* Closure for controller event callback */
- testbed_master, /* continuation callback to be called when testbed setup is complete */
- NULL); /* Closure for the test_master callback */
+ ret =
+ GNUNET_TESTBED_test_run ("test-multicast-multipeer",
+ config_file,
+ /* number of peers to start */
+ PEERS_REQUESTED,
+ /* Event mask - set to 0 for no event notifications */
+ 0LL,
+ /* Controller event callback */
+ NULL,
+ /* Closure for controller event callback */
+ NULL,
+ /* called when testbed setup is complete */
+ testbed_master,
+ /* Closure for the test_master callback */
+ NULL);
if ( (GNUNET_OK != ret) || (GNUNET_OK != result) )
return 1;
return 0;