diff options
author | xrs <xrs@mail36.net> | 2018-01-02 17:12:39 +0100 |
---|---|---|
committer | xrs <xrs@mail36.net> | 2018-01-02 17:12:39 +0100 |
commit | 3611a5295f95fad5d9e1fdb3866e7db9ecf8f47e (patch) | |
tree | db191de206319d924f1bb49e65cdf8a9612ba85c /src/multicast | |
parent | cc90f1df695ccee3d35fe00fa179a856e4128009 (diff) | |
parent | 61f532f18450e0d7c72f0c17f4a20b5854cf53bf (diff) |
Merge branch 'master' of ssh://gnunet.org/gnunet
Diffstat (limited to 'src/multicast')
-rw-r--r-- | src/multicast/Makefile.am | 2 | ||||
-rw-r--r-- | src/multicast/gnunet-service-multicast.c | 248 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 51 | ||||
-rw-r--r-- | src/multicast/test_multicast_multipeer.c | 312 |
4 files changed, 342 insertions, 271 deletions
diff --git a/src/multicast/Makefile.am b/src/multicast/Makefile.am index 13212bca31..48185e1a42 100644 --- a/src/multicast/Makefile.am +++ b/src/multicast/Makefile.am @@ -19,7 +19,7 @@ endif lib_LTLIBRARIES = libgnunetmulticast.la libgnunetmulticast_la_SOURCES = \ - multicast_api.c + multicast_api.c multicast.h libgnunetmulticast_la_LIBADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(GN_LIBINTL) $(XLIB) diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index 2f4dc8a145..ba1086cc5b 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c @@ -137,6 +137,7 @@ struct Channel */ struct GNUNET_CADET_Channel *channel; + // FIXME: not used /** * CADET transmission handle. */ @@ -228,7 +229,7 @@ struct Group /** * Is the client disconnected? #GNUNET_YES or #GNUNET_NO */ - uint8_t disconnected; + uint8_t is_disconnected; /** * Is this an origin (#GNUNET_YES), or member (#GNUNET_NO)? @@ -365,6 +366,8 @@ client_send_join_decision (struct Member *mem, static void shutdown_task (void *cls) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "shutting down\n"); if (NULL != cadet) { GNUNET_CADET_disconnect (cadet); @@ -420,6 +423,11 @@ cleanup_member (struct Member *mem) GNUNET_free (mem->join_dcsn); mem->join_dcsn = NULL; } + if (NULL != mem->origin_channel) + { + GNUNET_CADET_channel_destroy (mem->origin_channel->channel); + mem->origin_channel = NULL; + } GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem); GNUNET_free (mem); } @@ -553,36 +561,47 @@ client_send (struct GNUNET_SERVICE_Client *client, * Send message to all clients connected to the group. */ static void -client_send_group (const struct Group *grp, - const struct GNUNET_MessageHeader *msg) +client_send_group_keep_envelope (const struct Group *grp, + struct GNUNET_MQ_Envelope *env) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "%p Sending message to all clients of the group.\n", grp); + struct ClientList *cli = grp->clients_head; - struct ClientList *cl = grp->clients_head; - while (NULL != cl) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%p Sending message to all clients of the group.\n", + grp); + while (NULL != cli) { - struct GNUNET_MQ_Envelope * - env = GNUNET_MQ_msg_copy (msg); - - GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cl->client), - env); - cl = cl->next; + GNUNET_MQ_send_copy (GNUNET_SERVICE_client_get_mq (cli->client), + env); + cli = cli->next; } } /** + * Send message to all clients connected to the group and + * takes care of freeing @env. + */ +static void +client_send_group (const struct Group *grp, + struct GNUNET_MQ_Envelope *env) +{ + client_send_group_keep_envelope (grp, env); + GNUNET_MQ_discard (env); +} + + +/** * Iterator callback for sending a message to origin clients. */ static int client_send_origin_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) { - const struct GNUNET_MessageHeader *msg = cls; + struct GNUNET_MQ_Envelope *env = cls; struct Member *orig = origin; - client_send_group (&orig->group, msg); + client_send_group_keep_envelope (&orig->group, env); return GNUNET_YES; } @@ -594,12 +613,12 @@ static int client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *member) { - const struct GNUNET_MessageHeader *msg = cls; + struct GNUNET_MQ_Envelope *env = cls; struct Member *mem = member; if (NULL != mem->join_dcsn) { /* Only send message to admitted members */ - client_send_group (&mem->group, msg); + client_send_group_keep_envelope (&mem->group, env); } return GNUNET_YES; } @@ -615,15 +634,16 @@ client_send_member_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, */ static int client_send_all (struct GNUNET_HashCode *pub_key_hash, - const struct GNUNET_MessageHeader *msg) + struct GNUNET_MQ_Envelope *env) { int n = 0; n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, client_send_origin_cb, - (void *) msg); + (void *) env); n += GNUNET_CONTAINER_multihashmap_get_multiple (members, pub_key_hash, client_send_member_cb, - (void *) msg); + (void *) env); + GNUNET_MQ_discard (env); return n; } @@ -636,14 +656,14 @@ client_send_all (struct GNUNET_HashCode *pub_key_hash, */ static int client_send_random (struct GNUNET_HashCode *pub_key_hash, - const struct GNUNET_MessageHeader *msg) + struct GNUNET_MQ_Envelope *env) { int n = 0; n = GNUNET_CONTAINER_multihashmap_get_random (origins, client_send_origin_cb, - (void *) msg); + (void *) env); if (n <= 0) n = GNUNET_CONTAINER_multihashmap_get_random (members, client_send_member_cb, - (void *) msg); + (void *) env); return n; } @@ -658,12 +678,12 @@ client_send_random (struct GNUNET_HashCode *pub_key_hash, */ static int client_send_origin (struct GNUNET_HashCode *pub_key_hash, - const struct GNUNET_MessageHeader *msg) + struct GNUNET_MQ_Envelope *env) { int n = 0; n += GNUNET_CONTAINER_multihashmap_get_multiple (origins, pub_key_hash, client_send_origin_cb, - (void *) msg); + (void *) env); return n; } @@ -677,17 +697,12 @@ client_send_origin (struct GNUNET_HashCode *pub_key_hash, static void client_send_ack (struct GNUNET_HashCode *pub_key_hash) { + struct GNUNET_MQ_Envelope *env; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Sending message ACK to client.\n"); - - static struct GNUNET_MessageHeader *msg = NULL; - if (NULL == msg) - { - msg = GNUNET_malloc (sizeof (*msg)); - msg->type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK); - msg->size = htons (sizeof (*msg)); - } - client_send_all (pub_key_hash, msg); + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_FRAGMENT_ACK); + client_send_all (pub_key_hash, env); } @@ -983,7 +998,8 @@ handle_cadet_join_request (void *cls, chn->peer = req->peer; chn->join_status = JOIN_WAITING; - client_send_all (&group_pub_hash, &req->header); + client_send_all (&group_pub_hash, + GNUNET_MQ_msg_copy (&req->header)); } @@ -1102,7 +1118,8 @@ handle_cadet_message (void *cls, { struct Channel *chn = cls; GNUNET_CADET_receive_done (chn->channel); - client_send_all (&chn->group_pub_hash, &msg->header); + client_send_all (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&msg->header)); } @@ -1153,30 +1170,32 @@ handle_cadet_request (void *cls, { struct Channel *chn = cls; GNUNET_CADET_receive_done (chn->channel); - client_send_origin (&chn->group_pub_hash, &req->header); + client_send_origin (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&req->header)); } -static int -check_cadet_replay_request (void *cls, - const struct MulticastReplayRequestMessage *req) -{ - uint16_t size = ntohs (req->header.size); - if (size < sizeof (*req)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - - struct Channel *chn = cls; - if (NULL == chn) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } - - return GNUNET_OK; -} +// FIXME: do checks in handle_cadet_replay_request +//static int +//check_cadet_replay_request (void *cls, +// const struct MulticastReplayRequestMessage *req) +//{ +// uint16_t size = ntohs (req->header.size); +// if (size < sizeof (*req)) +// { +// GNUNET_break_op (0); +// return GNUNET_SYSERR; +// } +// +// struct Channel *chn = cls; +// if (NULL == chn) +// { +// GNUNET_break_op (0); +// return GNUNET_SYSERR; +// } +// +// return GNUNET_OK; +//} /** @@ -1187,6 +1206,7 @@ handle_cadet_replay_request (void *cls, const struct MulticastReplayRequestMessage *req) { struct Channel *chn = cls; + GNUNET_CADET_receive_done (chn->channel); struct MulticastReplayRequestMessage rep = *req; @@ -1203,12 +1223,16 @@ handle_cadet_replay_request (void *cls, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); } struct GNUNET_HashCode key_hash; - replay_key_hash (rep.fragment_id, rep.message_id, rep.fragment_offset, - rep.flags, &key_hash); + replay_key_hash (rep.fragment_id, + rep.message_id, + rep.fragment_offset, + rep.flags, + &key_hash); GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, chn, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - client_send_random (&chn->group_pub_hash, &rep.header); + client_send_random (&chn->group_pub_hash, + GNUNET_MQ_msg_copy (&rep.header)); } @@ -1290,10 +1314,10 @@ cadet_channel_create (struct Group *grp, struct GNUNET_PeerIdentity *peer) struct MulticastJoinDecisionMessageHeader, chn), - GNUNET_MQ_hd_var_size (cadet_replay_request, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, - struct MulticastReplayRequestMessage, - chn), + GNUNET_MQ_hd_fixed_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + chn), GNUNET_MQ_hd_var_size (cadet_replay_response, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, @@ -1357,6 +1381,7 @@ handle_client_origin_start (void *cls, grp->is_origin = GNUNET_YES; grp->pub_key = pub_key; grp->pub_key_hash = pub_key_hash; + grp->is_disconnected = GNUNET_NO; GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); @@ -1379,10 +1404,10 @@ handle_client_origin_start (void *cls, struct MulticastJoinRequestMessage, grp), - GNUNET_MQ_hd_var_size (cadet_replay_request, - GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, - struct MulticastReplayRequestMessage, - grp), + GNUNET_MQ_hd_fixed_size (cadet_replay_request, + GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, + struct MulticastReplayRequestMessage, + grp), GNUNET_MQ_hd_var_size (cadet_replay_response, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE, @@ -1484,6 +1509,7 @@ handle_client_member_join (void *cls, grp->is_origin = GNUNET_NO; grp->pub_key = msg->group_pub_key; grp->pub_key_hash = pub_key_hash; + grp->is_disconnected = GNUNET_NO; group_set_cadet_port_hash (grp); if (NULL == grp_mem) @@ -1494,7 +1520,8 @@ handle_client_member_join (void *cls, } GNUNET_CONTAINER_multihashmap_put (grp_mem, &mem->pub_key_hash, mem, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); - + + // FIXME: should the members hash map have option UNIQUE_FAST? GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } @@ -1509,10 +1536,11 @@ handle_client_member_join (void *cls, char *str = GNUNET_CRYPTO_ecdsa_public_key_to_string (&mem->pub_key); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client connected to group %s as member %s (%s).\n", + "Client connected to group %s as member %s (%s). size = %d\n", GNUNET_h2s (&grp->pub_key_hash), GNUNET_h2s2 (&mem->pub_key_hash), - str); + str, + GNUNET_CONTAINER_multihashmap_size (members)); GNUNET_free (str); if (NULL != mem->join_dcsn) @@ -1567,7 +1595,9 @@ handle_client_member_join (void *cls, GNUNET_free (mem->join_req); mem->join_req = req; - if (0 == client_send_origin (&grp->pub_key_hash, &mem->join_req->header)) + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&mem->join_req->header))) { /* No local origins, send to remote origin */ cadet_send_join_request (mem); } @@ -1580,7 +1610,7 @@ static void client_send_join_decision (struct Member *mem, const struct MulticastJoinDecisionMessageHeader *hdcsn) { - client_send_group (&mem->group, &hdcsn->header); + client_send_group (&mem->group, GNUNET_MQ_msg_copy (&hdcsn->header)); const struct MulticastJoinDecisionMessage * dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; @@ -1621,8 +1651,9 @@ handle_client_join_decision (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%p Got join decision from client for group %s..\n", + "%p got join decision from client for group %s..\n", grp, GNUNET_h2s (&grp->pub_key_hash)); struct GNUNET_CONTAINER_MultiHashMap * @@ -1652,6 +1683,32 @@ handle_client_join_decision (void *cls, } +static void +handle_client_part_request (void *cls, + const struct GNUNET_MessageHeader *msg) +{ + struct Client *c = cls; + struct GNUNET_SERVICE_Client *client = c->client; + struct Group *grp = c->group; + struct GNUNET_MQ_Envelope *env; + + if (NULL == grp) + { + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p got part request from client for group %s.\n", + grp, GNUNET_h2s (&grp->pub_key_hash)); + grp->is_disconnected = GNUNET_YES; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK); + client_send_group (grp, env); + GNUNET_SERVICE_client_continue (client); +} + + static int check_client_multicast_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *msg) @@ -1667,6 +1724,7 @@ static void handle_client_multicast_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *msg) { + // FIXME: what if GNUNET_YES == grp->is_disconnected? Do we allow sending messages? struct Client *c = cls; struct GNUNET_SERVICE_Client *client = c->client; struct Group *grp = c->group; @@ -1680,6 +1738,7 @@ handle_client_multicast_message (void *cls, GNUNET_assert (GNUNET_YES == grp->is_origin); struct Origin *orig = grp->origin; + // FIXME: use GNUNET_MQ_msg_copy /* FIXME: yucky, should use separate message structs for P2P and CS! */ struct GNUNET_MULTICAST_MessageHeader * out = (struct GNUNET_MULTICAST_MessageHeader *) GNUNET_copy_message (&msg->header); @@ -1696,7 +1755,7 @@ handle_client_multicast_message (void *cls, GNUNET_assert (0); } - client_send_all (&grp->pub_key_hash, &out->header); + client_send_all (&grp->pub_key_hash, GNUNET_MQ_msg_copy (&out->header)); cadet_send_children (&grp->pub_key_hash, &out->header); client_send_ack (&grp->pub_key_hash); GNUNET_free (out); @@ -1730,6 +1789,7 @@ handle_client_multicast_request (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); GNUNET_assert (GNUNET_NO == grp->is_origin); struct Member *mem = grp->member; @@ -1751,7 +1811,9 @@ handle_client_multicast_request (void *cls, } uint8_t send_ack = GNUNET_YES; - if (0 == client_send_origin (&grp->pub_key_hash, &out->header)) + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&out->header))) { /* No local origins, send to remote origin */ if (NULL != mem->origin_channel) { @@ -1792,6 +1854,7 @@ handle_client_replay_request (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); GNUNET_assert (GNUNET_NO == grp->is_origin); struct Member *mem = grp->member; @@ -1812,7 +1875,9 @@ handle_client_replay_request (void *cls, GNUNET_CONTAINER_multihashmap_put (grp_replay_req, &key_hash, client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - if (0 == client_send_origin (&grp->pub_key_hash, &rep->header)) + if (0 == + client_send_origin (&grp->pub_key_hash, + GNUNET_MQ_msg_copy (&rep->header))) { /* No local origin, replay from remote members / origin. */ if (NULL != mem->origin_channel) { @@ -1821,6 +1886,7 @@ handle_client_replay_request (void *cls, else { /* FIXME: not yet connected to origin */ + GNUNET_assert (0); GNUNET_SERVICE_client_drop (client); return; } @@ -1880,6 +1946,7 @@ handle_client_replay_response_end (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); struct GNUNET_HashCode key_hash; replay_key_hash (res->fragment_id, res->message_id, res->fragment_offset, @@ -1939,6 +2006,7 @@ handle_client_replay_response (void *cls, GNUNET_SERVICE_client_drop (client); return; } + GNUNET_assert (GNUNET_NO == grp->is_disconnected); const struct GNUNET_MessageHeader *msg = &res->header; if (GNUNET_MULTICAST_REC_OK == res->error_code) @@ -2033,9 +2101,14 @@ client_notify_disconnect (void *cls, grp, (GNUNET_YES == grp->is_origin) ? "origin" : "member", GNUNET_h2s (&grp->pub_key_hash)); + // FIXME (due to protocol change): here we must not remove all clients, + // only the one we were notified about! struct ClientList *cl = grp->clients_head; while (NULL != cl) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "iterating clients for group %p\n", + grp); if (cl->client == client) { GNUNET_CONTAINER_DLL_remove (grp->clients_head, grp->clients_tail, cl); @@ -2049,16 +2122,7 @@ client_notify_disconnect (void *cls, if (NULL == grp->clients_head) { /* Last client disconnected. */ -#if FIXME - if (NULL != grp->tmit_head) - { /* Send pending messages via CADET before cleanup. */ - transmit_message (grp); - } - else -#endif - { - cleanup_group (grp); - } + cleanup_group (grp); } } @@ -2103,9 +2167,9 @@ run (void *cls, GNUNET_SERVICE_MAIN ("multicast", GNUNET_SERVICE_OPTION_NONE, - run, - client_notify_connect, - client_notify_disconnect, + &run, + &client_notify_connect, + &client_notify_disconnect, NULL, GNUNET_MQ_hd_fixed_size (client_origin_start, GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, @@ -2119,6 +2183,10 @@ GNUNET_SERVICE_MAIN GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, struct MulticastJoinDecisionMessageHeader, NULL), + GNUNET_MQ_hd_fixed_size (client_part_request, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST, + struct GNUNET_MessageHeader, + NULL), GNUNET_MQ_hd_var_size (client_multicast_message, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, struct GNUNET_MULTICAST_MessageHeader, diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index a8b1dee40e..3c911f48ac 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -542,31 +542,12 @@ group_cleanup (struct GNUNET_MULTICAST_Group *grp) static void -group_disconnect (struct GNUNET_MULTICAST_Group *grp, - GNUNET_ContinuationCallback cb, - void *cls) +handle_group_part_ack (void *cls, + const struct GNUNET_MessageHeader *msg) { - grp->is_disconnecting = GNUNET_YES; - grp->disconnect_cb = cb; - grp->disconnect_cls = cls; + struct GNUNET_MULTICAST_Group *grp = cls; - if (NULL != grp->mq) - { - struct GNUNET_MQ_Envelope *last = GNUNET_MQ_get_last_envelope (grp->mq); - if (NULL != last) - { - GNUNET_MQ_notify_sent (last, - (GNUNET_SCHEDULER_TaskCallback) group_cleanup, grp); - } - else - { - group_cleanup (grp); - } - } - else - { - group_cleanup (grp); - } + group_cleanup (grp); } @@ -779,6 +760,10 @@ origin_connect (struct GNUNET_MULTICAST_Origin *orig) GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST, struct MulticastJoinRequestMessage, grp), + GNUNET_MQ_hd_fixed_size (group_part_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK, + struct GNUNET_MessageHeader, + grp), GNUNET_MQ_hd_fixed_size (group_replay_request, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, struct MulticastReplayRequestMessage, @@ -879,8 +864,13 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig, void *stop_cls) { struct GNUNET_MULTICAST_Group *grp = &orig->grp; + struct GNUNET_MQ_Envelope *env; - group_disconnect (grp, stop_cb, stop_cls); + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = stop_cb; + grp->disconnect_cls = stop_cls; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST); + GNUNET_MQ_send (grp->mq, env); } @@ -1065,6 +1055,10 @@ member_connect (struct GNUNET_MULTICAST_Member *mem) GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, struct MulticastJoinDecisionMessageHeader, mem), + GNUNET_MQ_hd_fixed_size (group_part_ack, + GNUNET_MESSAGE_TYPE_MULTICAST_PART_ACK, + struct GNUNET_MessageHeader, + grp), GNUNET_MQ_hd_fixed_size (group_replay_request, GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST, struct MulticastReplayRequestMessage, @@ -1198,16 +1192,19 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem, GNUNET_ContinuationCallback part_cb, void *part_cls) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Member parting.\n", mem); struct GNUNET_MULTICAST_Group *grp = &mem->grp; + struct GNUNET_MQ_Envelope *env; mem->join_dcsn_cb = NULL; grp->join_req_cb = NULL; grp->message_cb = NULL; grp->replay_msg_cb = NULL; grp->replay_frag_cb = NULL; - - group_disconnect (grp, part_cb, part_cls); + grp->is_disconnecting = GNUNET_YES; + grp->disconnect_cb = part_cb; + grp->disconnect_cls = part_cls; + env = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_MULTICAST_PART_REQUEST); + GNUNET_MQ_send (grp->mq, env); } diff --git a/src/multicast/test_multicast_multipeer.c b/src/multicast/test_multicast_multipeer.c index 0fb515f543..65c51c2e38 100644 --- a/src/multicast/test_multicast_multipeer.c +++ b/src/multicast/test_multicast_multipeer.c @@ -35,9 +35,10 @@ #define PEERS_REQUESTED 12 -struct multicast_peer +struct MulticastPeerContext { int peer; /* peer number */ + struct GNUNET_CRYPTO_EcdsaPrivateKey *key; const struct GNUNET_PeerIdentity *id; struct GNUNET_TESTBED_Operation *op; /* not yet in use */ struct GNUNET_TESTBED_Operation *pi_op; /* not yet in use */ @@ -61,7 +62,7 @@ static void service_connect (void *cls, void *ca_result, const char *emsg); -static struct multicast_peer **mc_peers; +static struct MulticastPeerContext **multicast_peers; static struct GNUNET_TESTBED_Peer **peers; // FIXME: refactor @@ -69,18 +70,14 @@ static struct GNUNET_TESTBED_Operation *op[PEERS_REQUESTED]; static struct GNUNET_TESTBED_Operation *pi_op[PEERS_REQUESTED]; static struct GNUNET_MULTICAST_Origin *origin; -static struct GNUNET_MULTICAST_Member *member[PEERS_REQUESTED]; /* first element always empty */ +static struct GNUNET_MULTICAST_Member *members[PEERS_REQUESTED]; /* first element always empty */ static struct GNUNET_SCHEDULER_Task *timeout_tid; -static struct GNUNET_CRYPTO_EddsaPrivateKey group_key; +//static struct GNUNET_CRYPTO_EddsaPrivateKey *group_key; static struct GNUNET_CRYPTO_EddsaPublicKey group_pub_key; static struct GNUNET_HashCode group_pub_key_hash; -static struct GNUNET_CRYPTO_EcdsaPrivateKey *member_key[PEERS_REQUESTED]; -static struct GNUNET_CRYPTO_EcdsaPublicKey *member_pub_key[PEERS_REQUESTED]; - - /** * Global result for testcase. */ @@ -93,6 +90,8 @@ static int result; static void shutdown_task (void *cls) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "shutdown_task!\n"); for (int i=0;i<PEERS_REQUESTED;i++) { if (NULL != op[i]) @@ -107,14 +106,16 @@ shutdown_task (void *cls) } } - if (NULL != mc_peers) + if (NULL != multicast_peers) { for (int i=0; i < PEERS_REQUESTED; i++) { - GNUNET_free (mc_peers[i]); - mc_peers[i] = NULL; + GNUNET_free (multicast_peers[i]->key); + GNUNET_free (multicast_peers[i]); + multicast_peers[i] = NULL; } - GNUNET_free (mc_peers); + GNUNET_free (multicast_peers); + multicast_peers = NULL; } if (NULL != timeout_tid) @@ -141,11 +142,11 @@ member_join_request (void *cls, const struct GNUNET_MessageHeader *join_msg, struct GNUNET_MULTICAST_JoinHandle *jh) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u (%s) sent a join request.\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); } @@ -154,7 +155,7 @@ notify (void *cls, size_t *data_size, void *data) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; struct pingpong_msg *pp_msg = GNUNET_new (struct pingpong_msg); pp_msg->peer = mc_peer->peer; @@ -178,18 +179,18 @@ member_join_decision (void *cls, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_msg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; struct GNUNET_MULTICAST_MemberTransmitHandle *req; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u (%s) received a decision from origin: %s\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id), + GNUNET_i2s (multicast_peers[mc_peer->peer]->id), (GNUNET_YES == is_admitted)?"accepted":"rejected"); if (GNUNET_YES == is_admitted) { - req = GNUNET_MULTICAST_member_to_origin (member[mc_peer->peer], + req = GNUNET_MULTICAST_member_to_origin (members[mc_peer->peer], 0, notify, cls); @@ -215,10 +216,32 @@ member_replay_msg () static void +origin_disconnected_cb (void *cls) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Origin disconnected. Shutting down.\n"); + result = GNUNET_YES; + GNUNET_SCHEDULER_shutdown (); +} + + +static void +member_disconnected_cb (void *cls) +{ + for (int i = 1; i < PEERS_REQUESTED; ++i) + if (GNUNET_NO == multicast_peers[i]->test_ok) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "All member disconnected. Stopping origin.\n"); + GNUNET_MULTICAST_origin_stop (origin, origin_disconnected_cb, cls); +} + + +static void member_message (void *cls, const struct GNUNET_MULTICAST_MessageHeader *msg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; struct pingpong_msg *pp_msg = (struct pingpong_msg*) &(msg[1]); if (PONG == pp_msg->msg && mc_peer->peer == pp_msg->peer) @@ -226,18 +249,15 @@ member_message (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer #%i (%s) receives a pong\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); - + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); mc_peer->test_ok = GNUNET_OK; - } - - // Test for completeness of received PONGs - for (int i=1; i<PEERS_REQUESTED; i++) - if (GNUNET_NO == mc_peers[i]->test_ok) - return; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "peer #%u (%s) parting from multicast group\n", + mc_peer->peer, + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); - result = GNUNET_YES; - GNUNET_SCHEDULER_shutdown(); + GNUNET_MULTICAST_member_part (members[mc_peer->peer], member_disconnected_cb, cls); + } } @@ -349,81 +369,53 @@ origin_message (void *cls, static void -multicast_da (void *cls, - void *op_result) +multicast_disconnect (void *cls, + void *op_result) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; - if (0 == mc_peer->peer) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Origin closes multicast group\n"); - - GNUNET_MULTICAST_origin_stop (origin, NULL, cls); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "peer #%u (%s) parting from multicast group\n", - mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); - - GNUNET_MULTICAST_member_part (member[mc_peer->peer], NULL, cls); - } } static void * -multicast_ca (void *cls, - const struct GNUNET_CONFIGURATION_Handle *cfg) +multicast_connect (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *multicast_peer = cls; struct GNUNET_MessageHeader *join_msg; char data[64]; - if (0 == mc_peer->peer) + multicast_peer->key = GNUNET_CRYPTO_ecdsa_key_create (); + if (0 == multicast_peer->peer) { - struct GNUNET_CRYPTO_EddsaPrivateKey *key = GNUNET_CRYPTO_eddsa_key_create (); - GNUNET_CRYPTO_eddsa_key_get_public (key, &group_pub_key); + GNUNET_CRYPTO_eddsa_key_get_public (multicast_peer->key, &group_pub_key); GNUNET_CRYPTO_hash (&group_pub_key, sizeof (group_pub_key), &group_pub_key_hash); - - group_key = *key; - origin = GNUNET_MULTICAST_origin_start (cfg, - &group_key, - 0, - origin_join_request, - origin_replay_frag, - origin_replay_msg, - origin_request, - origin_message, - cls); - - if (NULL == origin) { + multicast_peer->key, + 0, + origin_join_request, + origin_replay_frag, + origin_replay_msg, + origin_request, + origin_message, + cls); + if (NULL == origin) + { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u could not create a multicast group", - mc_peer->peer); + multicast_peer->peer); return NULL; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u connected as origin to group %s\n", - mc_peer->peer, + multicast_peer->peer, GNUNET_h2s (&group_pub_key_hash)); - return origin; } else { - // Get members keys - member_pub_key[mc_peer->peer] = GNUNET_new (struct GNUNET_CRYPTO_EcdsaPublicKey); - member_key[mc_peer->peer] = GNUNET_CRYPTO_ecdsa_key_create (); - GNUNET_CRYPTO_ecdsa_key_get_public (member_key[mc_peer->peer], - member_pub_key[mc_peer->peer]); - sprintf(data, "Hi, I am peer #%u (%s). Can I enter?", - mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); + multicast_peer->peer, + GNUNET_i2s (multicast_peers[multicast_peer->peer]->id)); uint8_t data_size = strlen (data) + 1; join_msg = GNUNET_malloc (sizeof (join_msg) + data_size); join_msg->size = htons (sizeof (join_msg) + data_size); @@ -432,24 +424,25 @@ multicast_ca (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Peer #%u (%s) tries to join multicast group %s\n", - mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id), + multicast_peer->peer, + GNUNET_i2s (multicast_peers[multicast_peer->peer]->id), GNUNET_h2s (&group_pub_key_hash)); - member[mc_peer->peer] = GNUNET_MULTICAST_member_join (cfg, - &group_pub_key, - member_key[mc_peer->peer], - mc_peers[0]->id, - 0, - NULL, - join_msg, /* join message */ - member_join_request, - member_join_decision, - member_replay_frag, - member_replay_msg, - member_message, - cls); - return member[mc_peer->peer]; + members[multicast_peer->peer] = + GNUNET_MULTICAST_member_join (cfg, + &group_pub_key, + multicast_peer->key, + multicast_peers[0]->id, + 0, + NULL, + join_msg, /* join message */ + member_join_request, + member_join_decision, + member_replay_frag, + member_replay_msg, + member_message, + cls); + return members[multicast_peer->peer]; } } @@ -460,7 +453,7 @@ peer_information_cb (void *cls, const struct GNUNET_TESTBED_PeerInformation *pinfo, const char *emsg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; if (NULL == pinfo) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "got no peer information\n"); @@ -468,7 +461,7 @@ peer_information_cb (void *cls, GNUNET_SCHEDULER_shutdown (); } - mc_peers[mc_peer->peer]->id = pinfo->result.id; + multicast_peers[mc_peer->peer]->id = pinfo->result.id; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got peer information of %s (%s)\n", @@ -478,22 +471,28 @@ peer_information_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Create peer #%u (%s)\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); if (0 != mc_peer->peer) { /* connect to multicast service of members */ - op[mc_peer->peer] = GNUNET_TESTBED_service_connect (NULL, /* Closure for operation */ - peers[mc_peer->peer], /* The peer whose service to connect to */ - "multicast", /* The name of the service */ - service_connect, /* callback to call after a handle to service - is opened */ - cls, /* closure for the above callback */ - multicast_ca, /* callback to call with peer's configuration; - this should open the needed service connection */ - multicast_da, /* callback to be called when closing the - opened service connection */ - cls); /* closure for the above two callbacks */ + op[mc_peer->peer] = + GNUNET_TESTBED_service_connect (/* Closure for operation */ + NULL, + /* The peer whose service to connect to */ + peers[mc_peer->peer], + /* The name of the service */ + "multicast", + /* called after a handle to service is opened */ + service_connect, + /* closure for the above callback */ + cls, + /* called when opening the service connection */ + multicast_connect, + /* called when closing the service connection */ + multicast_disconnect, + /* closure for the above two callbacks */ + cls); } } @@ -504,14 +503,14 @@ service_connect (void *cls, void *ca_result, const char *emsg) { - struct multicast_peer *mc_peer = (struct multicast_peer*)cls; + struct MulticastPeerContext *mc_peer = (struct MulticastPeerContext*)cls; if (NULL == ca_result) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Connection adapter not created for peer #%u (%s)\n", mc_peer->peer, - GNUNET_i2s (mc_peers[mc_peer->peer]->id)); + GNUNET_i2s (multicast_peers[mc_peer->peer]->id)); result = GNUNET_SYSERR; GNUNET_SCHEDULER_shutdown(); @@ -525,7 +524,7 @@ service_connect (void *cls, pi_op[i] = GNUNET_TESTBED_peer_get_information (peers[i], GNUNET_TESTBED_PIT_IDENTITY, peer_information_cb, - mc_peers[i]); + multicast_peers[i]); } } } @@ -550,50 +549,51 @@ service_connect (void *cls, */ static void testbed_master (void *cls, - struct GNUNET_TESTBED_RunHandle *h, - unsigned int num_peers, - struct GNUNET_TESTBED_Peer **p, - unsigned int links_succeeded, - unsigned int links_failed) + struct GNUNET_TESTBED_RunHandle *h, + unsigned int num_peers, + struct GNUNET_TESTBED_Peer **p, + unsigned int links_succeeded, + unsigned int links_failed) { /* Testbed is ready with peers running and connected in a pre-defined overlay topology (FIXME) */ - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Connected to testbed_master()\n"); - peers = p; - - mc_peers = GNUNET_new_array (PEERS_REQUESTED, struct multicast_peer*); + multicast_peers = GNUNET_new_array (PEERS_REQUESTED, struct MulticastPeerContext*); // Create test contexts for members for (int i = 0; i<PEERS_REQUESTED; i++) { - mc_peers[i] = GNUNET_new (struct multicast_peer); - mc_peers[i]->peer = i; - mc_peers[i]->test_ok = GNUNET_NO; + multicast_peers[i] = GNUNET_new (struct MulticastPeerContext); + multicast_peers[i]->peer = i; + multicast_peers[i]->test_ok = GNUNET_NO; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Create origin peer\n"); - - op[0] = GNUNET_TESTBED_service_connect (NULL, /* Closure for operation */ - peers[0], /* The peer whose service to connect to */ - "multicast", /* The name of the service */ - service_connect, /* callback to call after a handle to service - is opened */ - mc_peers[0], /* closure for the above callback */ - multicast_ca, /* callback to call with peer's configuration; - this should open the needed service connection */ - multicast_da, /* callback to be called when closing the - opened service connection */ - mc_peers[0]); /* closure for the above two callbacks */ - - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); /* Schedule a new task on shutdown */ - + op[0] = + GNUNET_TESTBED_service_connect (/* Closure for operation */ + NULL, + /* The peer whose service to connect to */ + peers[0], + /* The name of the service */ + "multicast", + /* called after a handle to service is opened */ + service_connect, + /* closure for the above callback */ + multicast_peers[0], + /* called when opening the service connection */ + multicast_connect, + /* called when closing the service connection */ + multicast_disconnect, + /* closure for the above two callbacks */ + multicast_peers[0]); + /* Schedule a new task on shutdown */ + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); /* Schedule the shutdown task with a delay of a few Seconds */ - timeout_tid = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 400), - &timeout_task, NULL); + timeout_tid = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 400), + &timeout_task, + NULL); } @@ -617,15 +617,21 @@ main (int argc, char *argv[]) } result = GNUNET_SYSERR; - ret = GNUNET_TESTBED_test_run - ("test-multicast-multipeer", /* test case name */ - config_file, /* template configuration */ - PEERS_REQUESTED, /* number of peers to start */ - 0LL, /* Event mask - set to 0 for no event notifications */ - NULL, /* Controller event callback */ - NULL, /* Closure for controller event callback */ - testbed_master, /* continuation callback to be called when testbed setup is complete */ - NULL); /* Closure for the test_master callback */ + ret = + GNUNET_TESTBED_test_run ("test-multicast-multipeer", + config_file, + /* number of peers to start */ + PEERS_REQUESTED, + /* Event mask - set to 0 for no event notifications */ + 0LL, + /* Controller event callback */ + NULL, + /* Closure for controller event callback */ + NULL, + /* called when testbed setup is complete */ + testbed_master, + /* Closure for the test_master callback */ + NULL); if ( (GNUNET_OK != ret) || (GNUNET_OK != result) ) return 1; return 0; |