diff options
-rw-r--r-- | src/include/gnunet_multicast_service.h | 67 | ||||
-rw-r--r-- | src/include/gnunet_psyc_service.h | 119 | ||||
-rw-r--r-- | src/multicast/Makefile.am | 2 | ||||
-rw-r--r-- | src/multicast/gnunet-service-multicast.c | 160 | ||||
-rw-r--r-- | src/multicast/multicast.h | 51 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 120 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 261 | ||||
-rw-r--r-- | src/psyc/psyc.h | 19 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 147 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 86 |
10 files changed, 665 insertions, 367 deletions
diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h index 7ceb2e352c..5079a087b6 100644 --- a/src/include/gnunet_multicast_service.h +++ b/src/include/gnunet_multicast_service.h @@ -230,7 +230,7 @@ GNUNET_NETWORK_STRUCT_END /** * Handle that identifies a join request. * - * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the + * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the * corresponding calls to #GNUNET_MULTICAST_join_decision(). */ struct GNUNET_MULTICAST_JoinHandle; @@ -240,11 +240,12 @@ struct GNUNET_MULTICAST_JoinHandle; * Function to call with the decision made for a join request. * * Must be called once and only once in response to an invocation of the - * #GNUNET_MULTICAST_JoinCallback. + * #GNUNET_MULTICAST_JoinRequestCallback. * * @param jh Join request handle. - * @param is_admitted #GNUNET_YES if joining is approved, - * #GNUNET_NO if it is disapproved + * @param is_admitted #GNUNET_YES if the join is approved, + * #GNUNET_NO if it is disapproved, + * #GNUNET_SYSERR if we cannot answer the request. * @param relay_count Number of relays given. * @param relays Array of suggested peers that might be useful relays to use * when joining the multicast group (essentially a list of peers that @@ -261,7 +262,7 @@ struct GNUNET_MULTICAST_JoinHandle; struct GNUNET_MULTICAST_ReplayHandle * GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, int is_admitted, - unsigned int relay_count, + uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_resp); @@ -273,16 +274,40 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, * with the decision. * * @param cls Closure. - * @param peer Identity of the member that wants to join. - * @param member_key Requesting member's public key. + * @param member_key Public key of the member requesting join. * @param join_msg Application-dependent join message from the new member. * @param jh Join handle to pass to GNUNET_MULTICAST_join_decison(). */ typedef void -(*GNUNET_MULTICAST_JoinCallback) (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, - const struct GNUNET_MessageHeader *join_msg, - struct GNUNET_MULTICAST_JoinHandle *jh); +(*GNUNET_MULTICAST_JoinRequestCallback) (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *member_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh); + + +/** + * Method called to inform about the decision in response to a join request. + * + * If @a is_admitted is not #GNUNET_YES, then the multicast service disconnects + * the client and the multicast member handle returned by + * GNUNET_MULTICAST_member_join() is invalidated. + * + * @param cls Closure. + * @param is_admitted #GNUNET_YES or #GNUNET_NO or #GNUNET_SYSERR + * @param peer The peer we are connected to and the join decision is from. + * @param relay_count Number of peers in the @a relays array. + * @param relays Peer identities of members of the group, which serve as relays + * and can be used to join the group at. If empty, only the origin can + * be used to connect to the group. + * @param join_msg Application-dependent join message from the origin. + */ +typedef void +(*GNUNET_MULTICAST_JoinDecisionCallback) (void *cls, + int is_admitted, + const struct GNUNET_PeerIdentity *peer, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_msg); /** @@ -542,9 +567,9 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, * @param cfg Configuration to use. * @param priv_key ECC key that will be used to sign messages for this * multicast session; public key is used to identify the multicast group; - * @param next_fragment_id Next fragment ID to continue counting fragments from - * when restarting the origin. 1 for a new group. - * @param join_cb Function called to approve / disapprove joining of a peer. + * @param max_fragment_id Maximum fragment ID already sent to the group. + * 0 for a new group. + * @param join_request_cb Function called to approve / disapprove joining of a peer. * @param member_test_cb Function multicast can use to test group membership. * @param replay_frag_cb Function that can be called to replay a message fragment. * @param replay_msg_cb Function that can be called to replay a message. @@ -558,8 +583,8 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, struct GNUNET_MULTICAST_Origin * GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, - uint64_t next_fragment_id, - GNUNET_MULTICAST_JoinCallback join_cb, + uint64_t max_fragment_id, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, GNUNET_MULTICAST_MembershipTestCallback member_test_cb, GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, @@ -667,11 +692,12 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *origin); * of multicast messages. * @param relay_count Number of peers in the @a relays array. * @param relays Peer identities of members of the group, which serve as relays - * and can be used to join the group at. and send the @a join_request to. + * and can be used to join the group at and send the @a join_request to. * If empty, the @a join_request is sent directly to the @a origin. * @param join_msg Application-dependent join message to be passed to the * @a origin. - * @param join_cb Function called to approve / disapprove joining of a peer. + * @param join_request_cb Function called to approve / disapprove joining of a peer. + * @param join_decision_cb Function called to inform about the join decision. * @param mem_test_cb Function multicast can use to test group membership. * @param replay_frag_cb Function that can be called to replay message fragments * this peer already knows from this group. NULL if this @@ -690,10 +716,11 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPublicKey *group_key, const struct GNUNET_CRYPTO_EddsaPrivateKey *member_key, const struct GNUNET_PeerIdentity *origin, - uint32_t relay_count, + uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_request, - GNUNET_MULTICAST_JoinCallback join_cb, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, + GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb, GNUNET_MULTICAST_MembershipTestCallback mem_test_cb, GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 2ae20adc57..4f4c99c1f9 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h @@ -325,17 +325,18 @@ typedef void /** * Method called from PSYC upon receiving a join request. * - * @param cls Closure. - * @param slave requesting to join. + * @param cls Closure. + * @param slave_key Public key of the slave requesting join. * @param join_msg Join message sent along with the request. - * @param jh Join handle to use with GNUNET_PSYC_join_decision() + * @param jh Join handle to use with GNUNET_PSYC_join_decision() */ typedef void -(*GNUNET_PSYC_JoinCallback) (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey - *slave_key, - const struct GNUNET_PSYC_MessageHeader *join_msg, - struct GNUNET_PSYC_JoinHandle *jh); +(*GNUNET_PSYC_JoinRequestCallback) (void *cls, + const struct + GNUNET_CRYPTO_EddsaPublicKey *slave_key, + const struct + GNUNET_PSYC_MessageHeader *join_msg, + struct GNUNET_PSYC_JoinHandle *jh); /** @@ -344,32 +345,30 @@ typedef void * Must be called once and only once in response to an invocation of the * #GNUNET_PSYC_JoinCallback. * - * @param jh Join request handle. - * @param is_admitted #GNUNET_YES if joining is approved, - * #GNUNET_NO if it is disapproved. - * @param relay_count Number of relays given. - * @param relays Array of suggested peers that might be useful relays to use + * @param jh Join request handle. + * @param is_admitted #GNUNET_YES if the join is approved, + * #GNUNET_NO if it is disapproved, + * #GNUNET_SYSERR if we cannot answer the request. + * @param relay_count Number of relays given. + * @param relays Array of suggested peers that might be useful relays to use * when joining the multicast group (essentially a list of peers that * are already part of the multicast group and might thus be willing * to help with routing). If empty, only this local peer (which must * be the multicast origin) is a good candidate for building the * multicast tree. Note that it is unnecessary to specify our own * peer identity in this array. - * @param method_name Method name for the message transmitted with the response. - * @param env Environment containing transient variables for the message, - * or NULL. - * @param data Data of the message. - * @param data_size Size of @a data. + * @param join_resp Application-dependent join response message to send along + * with the decision. + * + * @return #GNUNET_OK on success, + * #GNUNET_SYSERR if @a join_resp is too large. */ -void +int GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, int is_admitted, uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, - const char *method_name, - const struct GNUNET_ENV_Environment *env, - const void *data, - size_t data_size); + const struct GNUNET_PSYC_MessageHeader *join_resp); /** @@ -400,29 +399,31 @@ typedef void * or part messages, the respective methods must call other PSYC functions to * inform PSYC about the meaning of the respective events. * - * @param cfg Configuration to use (to connect to PSYC service). - * @param channel_key ECC key that will be used to sign messages for this + * @param cfg Configuration to use (to connect to PSYC service). + * @param channel_key ECC key that will be used to sign messages for this * PSYC session. The public key is used to identify the PSYC channel. * Note that end-users will usually not use the private key directly, but * rather look it up in GNS for places managed by other users, or select * a file with the private key(s) when setting up their own channels * FIXME: we'll likely want to use NOT the p521 curve here, but a cheaper * one in the future. - * @param policy Channel policy specifying join and history restrictions. + * @param policy Channel policy specifying join and history restrictions. * Used to automate join decisions. - * @param message_cb Function to invoke on message parts received from slaves. - * @param join_cb Function to invoke when a peer wants to join. - * @param master_started_cb Function to invoke after the channel master started. - * @param cls Closure for @a method and @a join_cb. + * @param master_start_cb Function to invoke after the channel master started. + * @param join_request_cb Function to invoke when a slave wants to join. + * @param message_cb Function to invoke on message parts sent to the channel + * and received from slaves + * @param cls Closure for @a method and @a join_cb. + * * @return Handle for the channel master, NULL on error. */ struct GNUNET_PSYC_Master * GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, enum GNUNET_PSYC_Policy policy, + GNUNET_PSYC_MasterStartCallback master_start_cb, + GNUNET_PSYC_JoinRequestCallback join_request_cb, GNUNET_PSYC_MessageCallback message_cb, - GNUNET_PSYC_JoinCallback join_cb, - GNUNET_PSYC_MasterStartCallback master_started_cb, void *cls); @@ -580,13 +581,30 @@ struct GNUNET_PSYC_Slave; /** - * Function called after the slave joined. + * Function called after the slave connected to the PSYC service. * * @param cls Closure. * @param max_message_id Last message ID sent to the channel. */ typedef void -(*GNUNET_PSYC_SlaveJoinCallback) (void *cls, uint64_t max_message_id); +(*GNUNET_PSYC_SlaveConnectCallback) (void *cls, uint64_t max_message_id); + + +/** + * Method called to inform about the decision in response to a join request. + * + * If @a is_admitted is not #GNUNET_YES, then sending messages to the channel is + * not possible, but earlier history can be still queried. + * + * @param cls Closure. + * @param is_admitted #GNUNET_YES or #GNUNET_NO or #GNUNET_SYSERR + * @param join_msg Application-dependent join message from the origin. + */ +typedef void +(*GNUNET_PSYC_JoinDecisionCallback) (void *cls, + int is_admitted, + const struct + GNUNET_PSYC_MessageHeader *join_msg); /** @@ -599,24 +617,28 @@ typedef void * notification on failure (as the channel may simply take days to approve, * and disapproval is simply being ignored). * - * @param cfg Configuration to use. - * @param channel_key ECC public key that identifies the channel we wish to join. - * @param slave_key ECC private-public key pair that identifies the slave, and + * @param cfg Configuration to use. + * @param channel_key ECC public key that identifies the channel we wish to join. + * @param slave_key ECC private-public key pair that identifies the slave, and * used by multicast to sign the join request and subsequent unicast * requests sent to the master. - * @param origin Peer identity of the origin. - * @param relay_count Number of peers in the @a relays array. - * @param relays Peer identities of members of the multicast group, which serve + * @param origin Peer identity of the origin. + * @param relay_count Number of peers in the @a relays array. + * @param relays Peer identities of members of the multicast group, which serve * as relays and used to join the group at. - * @param message_cb Function to invoke on message parts received from the + * @param message_cb Function to invoke on message parts received from the * channel, typically at least contains method handlers for @e join and * @e part. - * @param slave_joined_cb Function invoked once we have joined the channel. - * @param cls Closure for @a message_cb and @a slave_joined_cb. - * @param method_name Method name for the join request. - * @param env Environment containing transient variables for the request, or NULL. - * @param data Payload for the join message. - * @param data_size Number of bytes in @a data. + * @param slave_connect_cb Function invoked once we have connected to the + * PSYC service. + * @param join_decision_cb Function invoked once we have received a join + * decision. + * @param cls Closure for @a message_cb and @a slave_joined_cb. + * @param method_name Method name for the join request. + * @param env Environment containing transient variables for the request, or NULL. + * @param data Payload for the join message. + * @param data_size Number of bytes in @a data. + * * @return Handle for the slave, NULL on error. */ struct GNUNET_PSYC_Slave * @@ -627,7 +649,8 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_MessageCallback message_cb, - GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, + GNUNET_PSYC_SlaveConnectCallback slave_connect_cb, + GNUNET_PSYC_JoinDecisionCallback join_decision_cb, void *cls, const char *method_name, const struct GNUNET_ENV_Environment *env, diff --git a/src/multicast/Makefile.am b/src/multicast/Makefile.am index b2c1702e46..8ccc7b88a6 100644 --- a/src/multicast/Makefile.am +++ b/src/multicast/Makefile.am @@ -46,10 +46,12 @@ gnunet_service_multicast_SOURCES = \ gnunet-service-multicast.c gnunet_service_multicast_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/core/libgnunetcore.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ $(GN_LIBINTL) gnunet_service_multicast_DEPENDENCIES = \ $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/core/libgnunetcore.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c index d412d3f8ed..5421c1b2b0 100644 --- a/src/multicast/gnunet-service-multicast.c +++ b/src/multicast/gnunet-service-multicast.c @@ -27,6 +27,7 @@ #include "gnunet_util_lib.h" #include "gnunet_signatures.h" #include "gnunet_statistics_service.h" +#include "gnunet_core_service.h" #include "gnunet_multicast_service.h" #include "multicast.h" @@ -36,6 +37,22 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** + * Server handle. + */ +static struct GNUNET_SERVER_Handle *server; + +/** + * Core handle. + * Only used during initialization. + */ +static struct GNUNET_CORE_Handle *core; + +/** + * Identity of this peer. + */ +static struct GNUNET_PeerIdentity this_peer; + +/** * Handle to the statistics service. */ static struct GNUNET_STATISTICS_Handle *stats; @@ -62,7 +79,6 @@ static struct GNUNET_CONTAINER_MultiHashMap *members; */ static struct GNUNET_CONTAINER_MultiHashMap *group_members; - /** * List of connected clients. */ @@ -147,7 +163,7 @@ struct Member /** * Join request sent to the origin / members. */ - struct MulticastJoinRequestMessage *join_request; + struct MulticastJoinRequestMessage *join_req; /** * Join decision sent in reply to our request. @@ -155,7 +171,7 @@ struct Member * Only a positive decision is stored here, in case of a negative decision the * client is disconnected. */ - struct MulticastJoinDecisionMessage *join_decision; + struct MulticastJoinDecisionMessageHeader *join_dcsn; /** * Last request fragment ID sent to the origin. @@ -171,9 +187,19 @@ struct Member * @param tc unused */ static void -cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - /* FIXME: do clean up here */ + if (NULL != core) + { + GNUNET_CORE_disconnect (core); + core = NULL; + } + if (NULL != stats) + { + GNUNET_STATISTICS_destroy (stats, GNUNET_YES); + stats = NULL; + } + /* FIXME: do more clean up here */ } /** @@ -206,8 +232,11 @@ cleanup_member (struct Member *mem) grp_mem); GNUNET_CONTAINER_multihashmap_destroy (grp_mem); } - if (NULL != mem->join_decision) - GNUNET_free (mem->join_decision); + if (NULL != mem->join_dcsn) + { + GNUNET_free (mem->join_dcsn); + mem->join_dcsn = NULL; + } GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem); } @@ -329,7 +358,7 @@ member_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, const struct GNUNET_MessageHeader *msg = cls; struct Member *mem = member; - if (NULL != mem->join_decision) + if (NULL != mem->join_dcsn) { /* Only send message to admitted members */ message_to_clients (&mem->grp, msg); } @@ -374,7 +403,7 @@ message_to_origin (struct Group *grp, const struct GNUNET_MessageHeader *msg) * Handle a connecting client starting an origin. */ static void -handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client, +client_origin_start (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *m) { const struct MulticastOriginStartMessage * @@ -424,8 +453,8 @@ handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client, * Handle a connecting client joining a group. */ static void -handle_member_join (void *cls, struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) +client_member_join (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) { const struct MulticastMemberJoinMessage * msg = (const struct MulticastMemberJoinMessage *) m; @@ -487,16 +516,16 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_client_set_user_context (client, grp); - if (NULL != mem->join_decision) + if (NULL != mem->join_dcsn) { /* Already got a join decision, send it to client. */ GNUNET_SERVER_notification_context_add (nc, client); GNUNET_SERVER_notification_context_unicast (nc, client, (struct GNUNET_MessageHeader *) - mem->join_decision, + mem->join_dcsn, GNUNET_NO); } else if (grp->clients_head == grp->clients_tail) - { /* First client, send join request. */ + { /* First client of the group, send join request. */ struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1]; uint32_t relay_count = ntohs (msg->relay_count); uint16_t relay_size = relay_count * sizeof (*relays); @@ -515,6 +544,7 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client, req->header.size = htons (sizeof (*req) + join_msg_size); req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST); req->group_key = grp->pub_key; + req->member_peer = this_peer; GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &req->member_key); if (0 < join_msg_size) memcpy (&req[1], join_msg, join_msg_size); @@ -531,14 +561,14 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_assert (0); } - if (NULL != mem->join_request) - GNUNET_free (mem->join_request); - mem->join_request = req; + if (NULL != mem->join_req) + GNUNET_free (mem->join_req); + mem->join_req = req; if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash)) { /* Local origin */ - message_to_origin (grp, (struct GNUNET_MessageHeader *) mem->join_request); + message_to_origin (grp, (struct GNUNET_MessageHeader *) mem->join_req); } else { @@ -553,34 +583,15 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client, * Join decision from client. */ static void -handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client, +client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *m) { struct Group * grp = GNUNET_SERVER_client_get_user_context (client, struct Group); - const struct MulticastClientJoinDecisionMessage * - cl_dcsn = (const struct MulticastClientJoinDecisionMessage *) m; - - struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &cl_dcsn[1]; - uint32_t relay_count = ntohs (cl_dcsn->relay_count); - uint16_t relay_size = relay_count * sizeof (*relays); - - struct GNUNET_MessageHeader *join_msg = NULL; - uint16_t join_msg_size = 0; - if (sizeof (*cl_dcsn) + relay_size + sizeof (*m) <= ntohs (m->size)) - { - join_msg = (struct GNUNET_MessageHeader *) - (((char *) &cl_dcsn[1]) + relay_size); - join_msg_size = ntohs (join_msg->size); - } - - int keep_dcsn = GNUNET_NO; - struct MulticastJoinDecisionMessage * - dcsn = GNUNET_malloc (sizeof (*dcsn) + join_msg_size); - dcsn->header.size = htons (sizeof (*dcsn) + join_msg_size); - dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); - dcsn->is_admitted = cl_dcsn->is_admitted; - memcpy (&dcsn[1], join_msg, join_msg_size); + const struct MulticastJoinDecisionMessageHeader * + hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m; + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join decision from client for group %s..\n", @@ -595,7 +606,7 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client, if (NULL != grp_mem) { struct GNUNET_HashCode member_key_hash; - GNUNET_CRYPTO_hash (&cl_dcsn->member_key, sizeof (cl_dcsn->member_key), + GNUNET_CRYPTO_hash (&hdcsn->member_key, sizeof (hdcsn->member_key), &member_key_hash); struct Member * mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash); @@ -604,19 +615,21 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client, grp, GNUNET_h2s (&member_key_hash), mem); if (NULL != mem) { - message_to_clients (grp, (struct GNUNET_MessageHeader *) dcsn); - if (GNUNET_YES == dcsn->is_admitted) + message_to_clients (&mem->grp, (struct GNUNET_MessageHeader *) hdcsn); + if (GNUNET_YES == ntohl (dcsn->is_admitted)) { /* Member admitted, store join_decision. */ - mem->join_decision = dcsn; - keep_dcsn = GNUNET_YES; + uint16_t dcsn_size = ntohs (dcsn->header.size); + mem->join_dcsn = GNUNET_malloc (dcsn_size); + memcpy (mem->join_dcsn, dcsn, dcsn_size); } else { /* Refused entry, disconnect clients. */ struct ClientList *cl = mem->grp.clients_head; while (NULL != cl) { - GNUNET_SERVER_client_disconnect (cl->client); + struct GNUNET_SERVER_Client *client = cl->client; cl = cl->next; + GNUNET_SERVER_client_disconnect (client); } } } @@ -624,10 +637,8 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client, } else { - /* FIXME: send join decision to remote peers */ + /* FIXME: send join decision to hdcsn->peer */ } - if (GNUNET_NO == keep_dcsn) - GNUNET_free (dcsn); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -635,7 +646,7 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client, * Incoming message from a client. */ static void -handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, +client_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *m) { struct Group * @@ -670,7 +681,7 @@ handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client, * Incoming request from a client. */ static void -handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, +client_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *m) { struct Group * @@ -708,37 +719,34 @@ handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_receive_done (client, GNUNET_OK); } + /** - * Process multicast requests. - * - * @param cls closure - * @param server the initialized server - * @param cfg configuration to use + * Core connected. */ static void -run (void *cls, struct GNUNET_SERVER_Handle *server, - const struct GNUNET_CONFIGURATION_Handle *c) +core_connected_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity) { + this_peer = *my_identity; + static const struct GNUNET_SERVER_MessageHandler handlers[] = { - { &handle_origin_start, NULL, + { &client_origin_start, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 }, - { &handle_member_join, NULL, + { &client_member_join, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 }, - { &handle_join_decision, NULL, + { &client_join_decision, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 }, - { &handle_multicast_message, NULL, + { &client_multicast_message, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 }, - { &handle_multicast_request, NULL, + { &client_multicast_request, NULL, GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 }, {NULL, NULL, 0, 0} }; - cfg = c; stats = GNUNET_STATISTICS_create ("multicast", cfg); origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); @@ -747,12 +755,30 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, GNUNET_SERVER_add_handlers (server, handlers); GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task, + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); } /** + * Service started. + * + * @param cls closure + * @param server the initialized server + * @param cfg configuration to use + */ +static void +run (void *cls, struct GNUNET_SERVER_Handle *srv, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + cfg = c; + server = srv; + core = GNUNET_CORE_connect (cfg, NULL, core_connected_cb, NULL, NULL, + NULL, GNUNET_NO, NULL, GNUNET_NO, NULL); +} + + +/** * The main function for the multicast service. * * @param argc number of arguments from the command line diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h index 85c5714e6e..76492e868e 100644 --- a/src/multicast/multicast.h +++ b/src/multicast/multicast.h @@ -77,7 +77,7 @@ struct MulticastJoinRequestMessage /** - * Header of a join decision sent to a remote peer. + * Header of a join decision message sent to a peer requesting join. */ struct MulticastJoinDecisionMessage { @@ -87,19 +87,28 @@ struct MulticastJoinDecisionMessage struct GNUNET_MessageHeader header; /** - * #GNUNET_YES if the peer was admitted. + * #GNUNET_YES if the peer was admitted + * #GNUNET_NO if entry was refused, + * #GNUNET_SYSERR if the request could not be answered. */ - uint8_t is_admitted; + int32_t is_admitted; + + /** + * Number of relays given. + */ + uint32_t relay_count; + + /* Followed by relay_count peer identities */ /* Followed by the join response message */ }; /** - * Message sent from the client to the service to notify the service - * about a join decision. + * Header added to a struct MulticastJoinDecisionMessage + * when sent between the client and service. */ -struct MulticastClientJoinDecisionMessage +struct MulticastJoinDecisionMessageHeader { /** * Type: GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION @@ -107,29 +116,18 @@ struct MulticastClientJoinDecisionMessage struct GNUNET_MessageHeader header; /** - * Number of relays given. + * C->S: Peer to send the join decision to. + * S->C: Peer we received the join decision from. */ - uint32_t relay_count; + struct GNUNET_PeerIdentity peer; /** - * Public key of the joining member. + * C->S: Public key of the member requesting join. + * S->C: Unused. */ struct GNUNET_CRYPTO_EddsaPublicKey member_key; - /** - * Peer identity of the joining member. - */ - struct GNUNET_PeerIdentity member_peer; - - /** - * #GNUNET_YES if the peer was admitted. - */ - uint8_t is_admitted; - - /* Followed by relay_count peer identities */ - - /* Followed by the join response message */ - + /* Followed by struct MulticastJoinDecisionMessage */ }; @@ -139,7 +137,6 @@ struct MulticastClientJoinDecisionMessage */ struct MulticastMembershipTestResultMessage { - /** * Type: GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT */ @@ -151,11 +148,11 @@ struct MulticastMembershipTestResultMessage uint32_t uid; /** - * #GNUNET_YES if the peer is a member, #GNUNET_NO if peer was not a member, - * #GNUNET_SYSERR if we cannot answer the test. + * #GNUNET_YES if the peer is a member + * #GNUNET_NO if peer is not a member, + * #GNUNET_SYSERR if the test could not be answered. */ int32_t is_admitted; - }; diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 501ff4b701..e568e77ee1 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -132,7 +132,7 @@ struct GNUNET_MULTICAST_Group struct GNUNET_CRYPTO_EddsaPublicKey pub_key; struct GNUNET_HashCode pub_key_hash; - GNUNET_MULTICAST_JoinCallback join_cb; + GNUNET_MULTICAST_JoinRequestCallback join_req_cb; GNUNET_MULTICAST_MembershipTestCallback member_test_cb; GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb; GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb; @@ -177,10 +177,7 @@ struct GNUNET_MULTICAST_Member struct GNUNET_MULTICAST_Group grp; struct GNUNET_MULTICAST_MemberTransmitHandle tmit; - struct GNUNET_CRYPTO_EddsaPrivateKey priv_key; - struct GNUNET_PeerIdentity origin; - struct GNUNET_PeerIdentity relays; - uint32_t relay_count; + GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb; uint64_t next_fragment_id; }; @@ -197,12 +194,12 @@ struct GNUNET_MULTICAST_JoinHandle struct GNUNET_MULTICAST_Group *group; /** - * Public key of the joining member. + * Public key of the member requesting join. */ struct GNUNET_CRYPTO_EddsaPublicKey member_key; /** - * Peer identity of the joining member. + * Peer identity of the member requesting join. */ struct GNUNET_PeerIdentity member_peer; }; @@ -476,8 +473,9 @@ request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin) "Calling request callback for a request of type %u and size %u.\n", ntohs (req->header.type), ntohs (req->header.size)); - orig->request_cb (orig->grp.cb_cls, &req->member_key, - (const struct GNUNET_MessageHeader *) req, 0); + if (NULL != orig->request_cb) + orig->request_cb (orig->grp.cb_cls, &req->member_key, + (const struct GNUNET_MessageHeader *) req, 0); return GNUNET_YES; } @@ -501,7 +499,8 @@ join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size)) msg = (const struct GNUNET_MessageHeader *) &req[1]; - grp->join_cb (grp->cb_cls, &req->member_key, msg, jh); + if (NULL != grp->join_req_cb) + grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh); return GNUNET_YES; } @@ -513,15 +512,44 @@ static int join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *member) { - const struct MulticastJoinDecisionMessage *dcsn = cls; + const struct MulticastJoinDecisionMessageHeader *hdcsn = cls; + const struct MulticastJoinDecisionMessage * + dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1]; struct GNUNET_MULTICAST_Member *mem = member; struct GNUNET_MULTICAST_Group *grp = &mem->grp; - const struct GNUNET_MessageHeader *msg = NULL; - if (sizeof (*dcsn) + sizeof (*msg) <= ntohs (dcsn->header.size)) - msg = (const struct GNUNET_MessageHeader *) &dcsn[1]; + uint16_t dcsn_size = ntohs (dcsn->header.size); + int is_admitted = ntohl (dcsn->is_admitted); + + const struct GNUNET_MessageHeader *join_resp = NULL; + uint16_t join_resp_size = 0; + + uint16_t relay_count = ntohl (dcsn->relay_count); + const struct GNUNET_PeerIdentity *relays = NULL; + uint16_t relay_size = relay_count * sizeof (*relays); + if (0 < relay_count && dcsn_size < sizeof (*dcsn) + relay_size) + relays = (struct GNUNET_PeerIdentity *) &dcsn[1]; + + if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size) + { + join_resp = (const struct GNUNET_MessageHeader *) &dcsn[1]; + join_resp_size = ntohs (join_resp->size); + } + if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received invalid join decision message from multicast.\n"); + GNUNET_break_op (0); + is_admitted = GNUNET_SYSERR; + } + + if (NULL != mem->join_dcsn_cb) + mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer, + relay_count, relays, join_resp); + + if (GNUNET_YES != is_admitted) + GNUNET_MULTICAST_member_part (mem); - // FIXME: grp->join_decision_cb (grp->cb_cls, msg); return GNUNET_YES; } @@ -599,7 +627,6 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_break (0); break; } - if (NULL != origins) GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, request_cb, (void *) msg); @@ -615,9 +642,11 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) break; case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION: - if (NULL != origins) - GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash, - join_decision_cb, (void *) msg); + if (GNUNET_NO != grp->is_origin) + { + GNUNET_break (0); + break; + } if (NULL != members) GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash, join_decision_cb, (void *) msg); @@ -636,11 +665,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) * Function to call with the decision made for a join request. * * Must be called once and only once in response to an invocation of the - * #GNUNET_MULTICAST_JoinCallback. + * #GNUNET_MULTICAST_JoinRequestCallback. * * @param jh Join request handle. - * @param is_admitted #GNUNET_YES if joining is approved, - * #GNUNET_NO if it is disapproved + * @param is_admitted #GNUNET_YES if the join is approved, + * #GNUNET_NO if it is disapproved, + * #GNUNET_SYSERR if we cannot answer the request. * @param relay_count Number of relays given. * @param relays Array of suggested peers that might be useful relays to use * when joining the multicast group (essentially a list of peers that @@ -657,25 +687,31 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) struct GNUNET_MULTICAST_ReplayHandle * GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh, int is_admitted, - unsigned int relay_count, + uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_resp) { struct GNUNET_MULTICAST_Group *grp = jh->group; uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; uint16_t relay_size = relay_count * sizeof (*relays); - struct MulticastClientJoinDecisionMessage * dcsn; + struct MulticastJoinDecisionMessageHeader * hdcsn; + struct MulticastJoinDecisionMessage *dcsn; struct MessageQueue * - mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) + mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn) + relay_size + join_resp_size); - dcsn = (struct MulticastClientJoinDecisionMessage *) &mq[1]; + hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1]; + hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); + hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn) + + relay_size + join_resp_size); + hdcsn->member_key = jh->member_key; + hdcsn->peer = jh->member_peer; + + dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1]; dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION); dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); - dcsn->member_key = jh->member_key; - dcsn->member_peer = jh->member_peer; - dcsn->is_admitted = is_admitted; - dcsn->relay_count = relay_count; + dcsn->is_admitted = htonl (is_admitted); + dcsn->relay_count = htonl (relay_count); if (0 < relay_size) memcpy (&dcsn[1], relays, relay_size); if (0 < join_resp_size) @@ -763,7 +799,7 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh, * multicast session; public key is used to identify the multicast group; * @param max_fragment_id Maximum fragment ID already sent to the group. * 0 for a new group. - * @param join_cb Function called to approve / disapprove joining of a peer. + * @param join_request_cb Function called to approve / disapprove joining of a peer. * @param member_test_cb Function multicast can use to test group membership. * @param replay_frag_cb Function that can be called to replay a message fragment. * @param replay_msg_cb Function that can be called to replay a message. @@ -779,7 +815,7 @@ struct GNUNET_MULTICAST_Origin * GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key, uint64_t max_fragment_id, - GNUNET_MULTICAST_JoinCallback join_cb, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, GNUNET_MULTICAST_MembershipTestCallback member_test_cb, GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, @@ -801,7 +837,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, grp->cfg = cfg; grp->cb_cls = cls; - grp->join_cb = join_cb; + grp->join_req_cb = join_request_cb; grp->member_test_cb = member_test_cb; grp->replay_frag_cb = replay_frag_cb; grp->replay_msg_cb = replay_msg_cb; @@ -963,7 +999,8 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan * If empty, the @a join_request is sent directly to the @a origin. * @param join_msg Application-dependent join message to be passed to the peer * @a origin. - * @param join_cb Function called to approve / disapprove joining of a peer. + * @param join_request_cb Function called to approve / disapprove joining of a peer. + * @param join_decision_cb Function called to inform about the join decision. * @param member_test_cb Function multicast can use to test group membership. * @param replay_frag_cb Function that can be called to replay message fragments * this peer already knows from this group. NULL if this @@ -982,10 +1019,11 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPublicKey *group_key, const struct GNUNET_CRYPTO_EddsaPrivateKey *member_key, const struct GNUNET_PeerIdentity *origin, - uint32_t relay_count, + uint16_t relay_count, const struct GNUNET_PeerIdentity *relays, const struct GNUNET_MessageHeader *join_msg, - GNUNET_MULTICAST_JoinCallback join_cb, + GNUNET_MULTICAST_JoinRequestCallback join_request_cb, + GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb, GNUNET_MULTICAST_MembershipTestCallback member_test_cb, GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb, GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb, @@ -1014,18 +1052,14 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg, grp->cfg = cfg; grp->pub_key = *group_key; - grp->join_cb = join_cb; + mem->join_dcsn_cb = join_decision_cb; + grp->join_req_cb = join_request_cb; grp->member_test_cb = member_test_cb; grp->replay_frag_cb = replay_frag_cb; grp->message_cb = message_cb; grp->cb_cls = cls; - mem->origin = *origin; - mem->relay_count = relay_count; - mem->relays = *relays; - mem->priv_key = *member_key; - - GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &grp->pub_key); + GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key); GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash); if (NULL == members) diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 2b9ee71354..cef89b8289 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -387,6 +387,11 @@ struct Slave struct GNUNET_MessageHeader *join_req; /** + * Join decision received from multicast. + */ + struct SlaveJoinDecision *join_dcsn; + + /** * Maximum request ID for this channel. */ uint64_t max_request_id; @@ -397,6 +402,10 @@ static inline void transmit_message (struct Channel *ch); +static uint64_t +message_queue_drop (struct Channel *ch); + + /** * Task run during shutdown. * @@ -413,7 +422,7 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } if (NULL != stats) { - GNUNET_STATISTICS_destroy (stats, GNUNET_NO); + GNUNET_STATISTICS_destroy (stats, GNUNET_YES); stats = NULL; } } @@ -471,7 +480,8 @@ cleanup_slave (struct Slave *slv) static void cleanup_channel (struct Channel *ch) { - /* FIXME: fragment_cache_clear */ + message_queue_drop (ch); + GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash); if (NULL != ch->store_op) GNUNET_PSYCSTORE_operation_cancel (ch->store_op); @@ -570,7 +580,7 @@ struct JoinMemTestClosure /** - * Membership test result callback used for join requests.m + * Membership test result callback used for join requests. */ static void join_mem_test_cb (void *cls, int64_t result, const char *err_msg) @@ -585,8 +595,7 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) &slave_key_hash); GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); - msg_to_clients (jcls->ch, - (struct GNUNET_MessageHeader *) jcls->master_join_req); + msg_to_clients (jcls->ch, &jcls->master_join_req->header); } else { @@ -602,9 +611,10 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg) * Incoming join request from multicast. */ static void -join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - const struct GNUNET_MessageHeader *join_msg, - struct GNUNET_MULTICAST_JoinHandle *jh) +mcast_join_request_cb (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + const struct GNUNET_MessageHeader *join_msg, + struct GNUNET_MULTICAST_JoinHandle *jh) { struct Channel *ch = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch); @@ -643,21 +653,58 @@ join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, } +/** + * Join decision received from multicast. + */ +static void +mcast_join_decision_cb (void *cls, int is_admitted, + const struct GNUNET_PeerIdentity *peer, + uint16_t relay_count, + const struct GNUNET_PeerIdentity *relays, + const struct GNUNET_MessageHeader *join_resp) +{ + struct Slave *slv = cls; + struct Channel *ch = &slv->ch; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Got join decision: %d\n", slv, is_admitted); + + uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0; + struct SlaveJoinDecision * + dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size); + dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size); + dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); + dcsn->is_admitted = htonl (is_admitted); + if (0 < join_resp_size) + memcpy (&dcsn[1], join_resp, join_resp_size); + + msg_to_clients (ch, &dcsn->header); + + if (GNUNET_YES == is_admitted) + { + ch->ready = GNUNET_YES; + } + else + { + slv->member = NULL; + } +} + + static void -membership_test_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - uint64_t message_id, uint64_t group_generation, - struct GNUNET_MULTICAST_MembershipTestHandle *mth) +mcast_membership_test_cb (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + uint64_t message_id, uint64_t group_generation, + struct GNUNET_MULTICAST_MembershipTestHandle *mth) { } static void -replay_fragment_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - uint64_t fragment_id, uint64_t flags, - struct GNUNET_MULTICAST_ReplayHandle *rh) +mcast_replay_fragment_cb (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + uint64_t fragment_id, uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) { @@ -665,12 +712,12 @@ replay_fragment_cb (void *cls, static void -replay_message_cb (void *cls, - const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - uint64_t message_id, - uint64_t fragment_offset, - uint64_t flags, - struct GNUNET_MULTICAST_ReplayHandle *rh) +mcast_replay_message_cb (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) { } @@ -744,7 +791,7 @@ mmsg_to_clients (struct Channel *ch, pmsg->message_id = mmsg->message_id; memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg)); - msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg); + msg_to_clients (ch, &pmsg->header); GNUNET_free (pmsg); } @@ -988,6 +1035,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id, * has already been delivered to the client. * * @param ch Channel. + * * @return Number of messages removed from queue and sent to client. */ static uint64_t @@ -1061,6 +1109,43 @@ message_queue_run (struct Channel *ch) /** + * Drop message queue of a channel. + * + * Remove all messages in queue without sending it to clients. + * + * @param ch Channel. + * + * @return Number of messages removed from queue. + */ +static uint64_t +message_queue_drop (struct Channel *ch) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Dropping message queue.\n", ch); + uint64_t n = 0; + uint64_t msg_id; + while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL, + &msg_id)) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id); + struct GNUNET_HashCode msg_id_hash; + hash_key_from_hll (&msg_id_hash, msg_id); + + struct FragmentQueue * + fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash); + + fragment_queue_run (ch, msg_id, fragq, GNUNET_YES); + GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs); + n++; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Removed %" PRIu64 " messages from queue.\n", ch, n); + return n; +} + + +/** * Handle incoming message from multicast. * * @param ch Channel. @@ -1069,7 +1154,7 @@ message_queue_run (struct Channel *ch) * @return #GNUNET_OK or #GNUNET_SYSERR */ static int -handle_multicast_message (struct Channel *ch, +client_multicast_message (struct Channel *ch, const struct GNUNET_MULTICAST_MessageHeader *mmsg) { GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL); @@ -1106,7 +1191,7 @@ handle_multicast_message (struct Channel *ch, * Store it using PSYCstore and send it to the client of the channel. */ static void -message_cb (void *cls, const struct GNUNET_MessageHeader *msg) +mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg) { struct Channel *ch = cls; uint16_t type = ntohs (msg->type); @@ -1120,7 +1205,7 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) { case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE: { - handle_multicast_message (ch, (const struct + client_multicast_message (ch, (const struct GNUNET_MULTICAST_MessageHeader *) msg); break; } @@ -1141,9 +1226,10 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg) * @param flags Request flags. */ static void -request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - const struct GNUNET_MessageHeader *msg, - enum GNUNET_MULTICAST_MessageFlags flags) +mcast_request_cb (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + const struct GNUNET_MessageHeader *msg, + enum GNUNET_MULTICAST_MessageFlags flags) { struct Master *mst = cls; struct Channel *ch = &mst->ch; @@ -1183,7 +1269,7 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST); memcpy (&pmsg[1], &req[1], size - sizeof (*req)); - msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg); + msg_to_clients (ch, &pmsg->header); GNUNET_free (pmsg); break; } @@ -1221,11 +1307,13 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id, ch->max_state_message_id = max_state_message_id; mst->max_group_generation = max_group_generation; mst->origin - = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, - max_fragment_id, - join_cb, membership_test_cb, - replay_fragment_cb, replay_message_cb, - request_cb, message_cb, ch); + = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id, + &mcast_join_request_cb, + &mcast_membership_test_cb, + &mcast_replay_fragment_cb, + &mcast_replay_message_cb, + &mcast_request_cb, + &mcast_message_cb, ch); ch->ready = GNUNET_YES; } else @@ -1266,11 +1354,13 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key, &slv->origin, slv->relay_count, slv->relays, - slv->join_req, join_cb, - membership_test_cb, - replay_fragment_cb, replay_message_cb, - message_cb, ch); - ch->ready = GNUNET_YES; + slv->join_req, + &mcast_join_request_cb, + &mcast_join_decision_cb, + &mcast_membership_test_cb, + &mcast_replay_fragment_cb, + &mcast_replay_message_cb, + &mcast_message_cb, ch); } else { @@ -1297,7 +1387,7 @@ channel_init (struct Channel *ch) * Handle a connecting client starting a channel master. */ static void -handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, +client_master_start (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { const struct MasterStartRequest *req @@ -1363,7 +1453,7 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, * Handle a connecting client joining as a channel slave. */ static void -handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, +client_slave_join (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { const struct SlaveJoinRequest *req @@ -1389,6 +1479,8 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, { slv = GNUNET_new (struct Slave); slv->priv_key = req->slave_key; + slv->pub_key = slv_pub_key; + slv->pub_key_hash = slv_pub_key_hash; slv->origin = req->origin; slv->relay_count = ntohl (req->relay_count); if (0 < slv->relay_count) @@ -1411,10 +1503,10 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, if (NULL == ch_slv) { ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); - GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv, + GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, ch_slv, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); } - GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch, + GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST); GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); @@ -1434,6 +1526,29 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_SERVER_notification_context_add (nc, client); GNUNET_SERVER_notification_context_unicast (nc, client, &res.header, GNUNET_NO); + + if (NULL == slv->member) + { + slv->member + = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key, + &slv->origin, + slv->relay_count, slv->relays, + slv->join_req, + &mcast_join_request_cb, + &mcast_join_decision_cb, + &mcast_membership_test_cb, + &mcast_replay_fragment_cb, + &mcast_replay_message_cb, + &mcast_message_cb, ch); + + } + else if (NULL != slv->join_dcsn) + { + GNUNET_SERVER_notification_context_add (nc, client); + GNUNET_SERVER_notification_context_unicast (nc, client, + &slv->join_dcsn->header, + GNUNET_NO); + } } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1451,7 +1566,7 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, struct JoinDecisionClosure { - uint8_t is_admitted; + int32_t is_admitted; struct GNUNET_MessageHeader *msg; }; @@ -1460,8 +1575,8 @@ struct JoinDecisionClosure * Iterator callback for responding to join requests of a slave. */ static int -join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, - void *jh) +send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, + void *jh) { struct JoinDecisionClosure *jcls = cls; // FIXME: add relays @@ -1474,7 +1589,7 @@ join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, * Join decision from client. */ static void -handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client, +client_join_decision (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { struct Channel * @@ -1484,7 +1599,7 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client, struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg; struct JoinDecisionClosure jcls; - jcls.is_admitted = dcsn->is_admitted; + jcls.is_admitted = ntohl (dcsn->is_admitted); jcls.msg = (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader) <= ntohs (msg->size)) @@ -1494,8 +1609,17 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client, struct GNUNET_HashCode slave_key_hash; GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key), &slave_key_hash); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p Got join decision (%d) from client for channel %s..\n", + mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%p ..and slave %s.\n", + mst, GNUNET_h2s (&slave_key_hash)); + GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash, - &join_decision_cb, &jcls); + &send_join_decision_cb, &jcls); + GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -1758,10 +1882,10 @@ transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client) /** - * Incoming message from a client. + * Incoming message from a master or slave client. */ static void -handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, +client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { struct Channel * @@ -1775,8 +1899,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, if (GNUNET_YES != ch->ready) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "%p Dropping message from client, channel is not ready yet.\n", - ch); + "%p Channel is not ready, dropping message from client.\n", ch); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } @@ -1784,7 +1907,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, uint16_t size = ntohs (msg->size); if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", ch); GNUNET_break (0); transmit_cancel (ch, client); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); @@ -1817,7 +1940,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client, * Client requests to add a slave to the membership database. */ static void -handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client, +client_slave_add (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { @@ -1828,7 +1951,7 @@ handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client, * Client requests to remove a slave from the membership database. */ static void -handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, +client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { @@ -1839,7 +1962,7 @@ handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client, * Client requests channel history from PSYCstore. */ static void -handle_story_request (void *cls, struct GNUNET_SERVER_Client *client, +client_story_request (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { @@ -1850,7 +1973,7 @@ handle_story_request (void *cls, struct GNUNET_SERVER_Client *client, * Client requests best matching state variable from PSYCstore. */ static void -handle_state_get (void *cls, struct GNUNET_SERVER_Client *client, +client_state_get (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { @@ -1861,7 +1984,7 @@ handle_state_get (void *cls, struct GNUNET_SERVER_Client *client, * Client requests state variables with a given prefix from PSYCstore. */ static void -handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, +client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *msg) { @@ -1880,31 +2003,31 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) { static const struct GNUNET_SERVER_MessageHandler handlers[] = { - { &handle_master_start, NULL, + { &client_master_start, NULL, GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 }, - { &handle_slave_join, NULL, + { &client_slave_join, NULL, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, - { &handle_join_decision, NULL, + { &client_join_decision, NULL, GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 }, - { &handle_psyc_message, NULL, + { &client_psyc_message, NULL, GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 }, - { &handle_slave_add, NULL, + { &client_slave_add, NULL, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 }, - { &handle_slave_remove, NULL, + { &client_slave_remove, NULL, GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 }, - { &handle_story_request, NULL, + { &client_story_request, NULL, GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 }, - { &handle_state_get, NULL, + { &client_state_get, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 }, - { &handle_state_get_prefix, NULL, + { &client_state_get_prefix, NULL, GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 } }; diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 47f2a01224..66c8de8983 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h @@ -250,19 +250,36 @@ struct MasterJoinDecision struct GNUNET_MessageHeader header; /** + * #GNUNET_YES if the slave was admitted. + */ + int32_t is_admitted; + + /** * Public key of the joining slave. */ struct GNUNET_CRYPTO_EddsaPublicKey slave_key; + /* Followed by struct GNUNET_MessageHeader join_response */ +}; + + +struct SlaveJoinDecision +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION + */ + struct GNUNET_MessageHeader header; + /** * #GNUNET_YES if the slave was admitted. */ - uint8_t is_admitted; + int32_t is_admitted; /* Followed by struct GNUNET_MessageHeader join_response */ }; + GNUNET_NETWORK_STRUCT_END #endif diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index ee49a584fe..7ec9d21b72 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -198,9 +198,9 @@ struct GNUNET_PSYC_Master GNUNET_PSYC_MasterStartCallback start_cb; /** - * Join handler callback. + * Join request callback. */ - GNUNET_PSYC_JoinCallback join_cb; + GNUNET_PSYC_JoinRequestCallback join_req_cb; }; @@ -211,14 +211,16 @@ struct GNUNET_PSYC_Slave { struct GNUNET_PSYC_Channel ch; - GNUNET_PSYC_SlaveJoinCallback join_cb; + GNUNET_PSYC_SlaveConnectCallback connect_cb; + + GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb; }; /** * Handle that identifies a join request. * - * Used to match calls to #GNUNET_PSYC_JoinCallback to the + * Used to match calls to #GNUNET_PSYC_JoinRequestCallback to the * corresponding calls to GNUNET_PSYC_join_decision(). */ struct GNUNET_PSYC_JoinHandle @@ -922,7 +924,22 @@ handle_psyc_join_request (struct GNUNET_PSYC_Master *mst, jh->mst = mst; jh->slave_key = req->slave_key; - mst->join_cb (mst->ch.cb_cls, &req->slave_key, msg, jh); + if (NULL != mst->join_req_cb) + mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh); +} + + +static void +handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv, + const struct SlaveJoinDecision *dcsn) +{ + struct GNUNET_PSYC_MessageHeader *msg = NULL; + if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg)) + msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1]; + + struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh)); + if (NULL != slv->join_dcsn_cb) + slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg); } @@ -971,6 +988,9 @@ message_handler (void *cls, case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST: size_min = sizeof (struct MasterJoinRequest); break; + case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: + size_min = sizeof (struct SlaveJoinDecision); + break; default: GNUNET_break_op (0); return; @@ -995,8 +1015,8 @@ message_handler (void *cls, case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: { struct CountersResult *cres = (struct CountersResult *) msg; - if (NULL != slv->join_cb) - slv->join_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); + if (NULL != slv->connect_cb) + slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id)); break; } case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK: @@ -1013,6 +1033,11 @@ message_handler (void *cls, handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch, (const struct MasterJoinRequest *) msg); break; + + case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION: + handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch, + (const struct SlaveJoinDecision *) msg); + break; } if (NULL != ch->client) @@ -1175,20 +1200,20 @@ disconnect (void *c) * or part messages, the respective methods must call other PSYC functions to * inform PSYC about the meaning of the respective events. * - * @param cfg Configuration to use (to connect to PSYC service). - * @param channel_key ECC key that will be used to sign messages for this + * @param cfg Configuration to use (to connect to PSYC service). + * @param channel_key ECC key that will be used to sign messages for this * PSYC session. The public key is used to identify the PSYC channel. * Note that end-users will usually not use the private key directly, but * rather look it up in GNS for places managed by other users, or select * a file with the private key(s) when setting up their own channels * FIXME: we'll likely want to use NOT the p521 curve here, but a cheaper * one in the future. - * @param policy Channel policy specifying join and history restrictions. + * @param policy Channel policy specifying join and history restrictions. * Used to automate join decisions. - * @param message_cb Function to invoke on message parts received from slaves. - * @param join_cb Function to invoke when a peer wants to join. - * @param master_started_cb Function to invoke after the channel master started. - * @param cls Closure for @a master_started_cb and @a join_cb. + * @param message_cb Function to invoke on message parts received from slaves. + * @param join_request_cb Function to invoke when a slave wants to join. + * @param master_start_cb Function to invoke after the channel master started. + * @param cls Closure for @a method and @a join_cb. * * @return Handle for the channel master, NULL on error. */ @@ -1196,9 +1221,9 @@ struct GNUNET_PSYC_Master * GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key, enum GNUNET_PSYC_Policy policy, + GNUNET_PSYC_MasterStartCallback start_cb, + GNUNET_PSYC_JoinRequestCallback join_request_cb, GNUNET_PSYC_MessageCallback message_cb, - GNUNET_PSYC_JoinCallback join_cb, - GNUNET_PSYC_MasterStartCallback master_started_cb, void *cls) { struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst)); @@ -1210,8 +1235,8 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, req->channel_key = *channel_key; req->policy = policy; - mst->start_cb = master_started_cb; - mst->join_cb = join_cb; + mst->start_cb = start_cb; + mst->join_req_cb = join_request_cb; ch->message_cb = message_cb; ch->cb_cls = cls; ch->cfg = cfg; @@ -1244,8 +1269,9 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) * #GNUNET_PSYC_JoinCallback. * * @param jh Join request handle. - * @param is_admitted #GNUNET_YES if joining is approved, - * #GNUNET_NO if it is disapproved. + * @param is_admitted #GNUNET_YES if the join is approved, + * #GNUNET_NO if it is disapproved, + * #GNUNET_SYSERR if we cannot answer the request. * @param relay_count Number of relays given. * @param relays Array of suggested peers that might be useful relays to use * when joining the multicast group (essentially a list of peers that @@ -1254,48 +1280,42 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master) * be the multicast origin) is a good candidate for building the * multicast tree. Note that it is unnecessary to specify our own * peer identity in this array. - * @param method_name Method name for the message transmitted with the response. - * @param env Environment containing transient variables for the message, or NULL. - * @param data Data of the message. - * @param data_size Size of @a data. + * @param join_resp Application-dependent join response message. + * + * @return #GNUNET_OK on success, + * #GNUNET_SYSERR if the message is too large. */ -void +int GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, int is_admitted, uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, - const char *method_name, - const struct GNUNET_ENV_Environment *env, - const void *data, - size_t data_size) + const struct GNUNET_PSYC_MessageHeader *join_resp) { struct GNUNET_PSYC_Channel *ch = &jh->mst->ch; - struct MasterJoinDecision *dcsn; - struct GNUNET_PSYC_MessageHeader *pmsg = NULL; - uint16_t pmsg_size = 0; -/* FIXME: - sizeof (*pmsg) - + sizeof (struct GNUNET_PSYC_MessageMethod) - + vars_size - + sizeof (struct GNUNET_MessageHeader) + data_size - + sizeof (struct GNUNET_MessageHeader); -*/ + uint16_t join_resp_size + = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0; uint16_t relay_size = relay_count * sizeof (*relays); - struct MessageQueue * - mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) + relay_size + pmsg_size); + + if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD + < sizeof (*dcsn) + relay_size + join_resp_size) + return GNUNET_SYSERR; + + struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) + + relay_size + join_resp_size); dcsn = (struct MasterJoinDecision *) &mq[1]; - dcsn->header.size = htons (sizeof (*dcsn) + relay_size + pmsg_size); + dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size); dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION); - dcsn->is_admitted = (GNUNET_YES == is_admitted) ? GNUNET_YES : GNUNET_NO; + dcsn->is_admitted = htonl (is_admitted); dcsn->slave_key = jh->slave_key; - /* FIXME: add message parts to pmsg */ - if (0 < pmsg_size) - memcpy (&dcsn[1], pmsg, pmsg_size); + if (0 < join_resp_size) + memcpy (&dcsn[1], join_resp, join_resp_size); GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq); transmit_next (ch); + return GNUNET_OK; } @@ -1359,24 +1379,27 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th) * notification on failure (as the channel may simply take days to approve, * and disapproval is simply being ignored). * - * @param cfg Configuration to use. - * @param channel_key ECC public key that identifies the channel we wish to join. - * @param slave_key ECC private-public key pair that identifies the slave, and + * @param cfg Configuration to use. + * @param channel_key ECC public key that identifies the channel we wish to join. + * @param slave_key ECC private-public key pair that identifies the slave, and * used by multicast to sign the join request and subsequent unicast * requests sent to the master. - * @param origin Peer identity of the origin. - * @param relay_count Number of peers in the @a relays array. - * @param relays Peer identities of members of the multicast group, which serve + * @param origin Peer identity of the origin. + * @param relay_count Number of peers in the @a relays array. + * @param relays Peer identities of members of the multicast group, which serve * as relays and used to join the group at. - * @param message_cb Function to invoke on message parts received from the + * @param message_cb Function to invoke on message parts received from the * channel, typically at least contains method handlers for @e join and * @e part. - * @param slave_joined_cb Function invoked once we have joined the channel. - * @param cls Closure for @a message_cb and @a slave_joined_cb. - * @param method_name Method name for the join request. - * @param env Environment containing transient variables for the request, or NULL. - * @param data Payload for the join message. - * @param data_size Number of bytes in @a data. + * @param slave_connect_cb Function invoked once we have connected to the + * PSYC service. + * @param join_decision_cb Function invoked once we have received a join + * decision. + * @param cls Closure for @a message_cb and @a slave_joined_cb. + * @param method_name Method name for the join request. + * @param env Environment containing transient variables for the request, or NULL. + * @param data Payload for the join message. + * @param data_size Number of bytes in @a data. * * @return Handle for the slave, NULL on error. */ @@ -1388,7 +1411,8 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_MessageCallback message_cb, - GNUNET_PSYC_SlaveJoinCallback slave_joined_cb, + GNUNET_PSYC_SlaveConnectCallback connect_cb, + GNUNET_PSYC_JoinDecisionCallback join_decision_cb, void *cls, const char *method_name, const struct GNUNET_ENV_Environment *env, @@ -1408,7 +1432,8 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, req->relay_count = htonl (relay_count); memcpy (&req[1], relays, relay_count * sizeof (*relays)); - slv->join_cb = slave_joined_cb; + slv->connect_cb = connect_cb; + slv->join_dcsn_cb = join_decision_cb; ch->message_cb = message_cb; ch->cb_cls = cls; diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index 4195e464bf..8b4aad773d 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -35,9 +35,9 @@ #include "gnunet_env_lib.h" #include "gnunet_psyc_service.h" -#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) -#define DEBUG_SERVICE 0 +#define DEBUG_SERVICE 1 /** @@ -79,6 +79,7 @@ struct TransmitClosure struct TransmitClosure *tmit; +static int join_req_count; enum { @@ -167,8 +168,8 @@ end () static void -master_message (void *cls, uint64_t message_id, uint32_t flags, - const struct GNUNET_MessageHeader *msg) +master_message_cb (void *cls, uint64_t message_id, uint32_t flags, + const struct GNUNET_MessageHeader *msg) { if (NULL == msg) { @@ -211,8 +212,8 @@ master_message (void *cls, uint64_t message_id, uint32_t flags, static void -slave_message (void *cls, uint64_t message_id, uint32_t flags, - const struct GNUNET_MessageHeader *msg) +slave_message_cb (void *cls, uint64_t message_id, uint32_t flags, + const struct GNUNET_MessageHeader *msg) { if (NULL == msg) { @@ -243,23 +244,6 @@ slave_message (void *cls, uint64_t message_id, uint32_t flags, static void -join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, - const struct GNUNET_PSYC_MessageHeader *msg, - struct GNUNET_PSYC_JoinHandle *jh) -{ - struct GNUNET_HashCode slave_key_hash; - GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Got join request from %s.\n", - GNUNET_h2s (&slave_key_hash)); - - GNUNET_PSYC_join_decision (jh, GNUNET_YES, 0, NULL, "_notice_join", NULL, - "you're in", 9); - // FIXME: also test refusing entry -} - - -static void transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n"); @@ -392,9 +376,23 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data) static void -slave_joined (void *cls, uint64_t max_message_id) +slave_join (); + + +static void +join_decision_cb (void *cls, int is_admitted, + const struct GNUNET_PSYC_MessageHeader *join_msg) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Slave got join decision: %d\n", is_admitted); + + if (GNUNET_YES != is_admitted) + { /* First join request is refused, retry. */ + //GNUNET_assert (1 == join_req_count); + slave_join (); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n"); test = TEST_SLAVE_TRANSMIT; @@ -414,20 +412,46 @@ slave_joined (void *cls, uint64_t max_message_id) GNUNET_PSYC_SLAVE_TRANSMIT_NONE); } + +static void +join_request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key, + const struct GNUNET_PSYC_MessageHeader *msg, + struct GNUNET_PSYC_JoinHandle *jh) +{ + struct GNUNET_HashCode slave_key_hash; + GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Got join request from %s.\n", + GNUNET_h2s (&slave_key_hash)); + + /* Reject first request */ + int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO; + GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL); +} + + +static void +slave_connect_cb (void *cls, uint64_t max_message_id) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Slave connected: %lu\n", max_message_id); +} + + static void slave_join () { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n"); - struct GNUNET_PeerIdentity origin; - struct GNUNET_PeerIdentity relays[16]; + struct GNUNET_PeerIdentity origin; // FIXME: this peer struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, "_foo", "bar baz", 7); GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN, "_foo_bar", "foo bar baz", 11); slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, - 16, relays, &slave_message, &slave_joined, NULL, + 0, NULL, &slave_message_cb, + &slave_connect_cb, &join_decision_cb, NULL, "_request_join", env, "some data", 9); GNUNET_ENV_environment_destroy (env); } @@ -485,7 +509,7 @@ master_transmit () static void -master_started (void *cls, uint64_t max_message_id) +master_start_cb (void *cls, uint64_t max_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Master started: %" PRIu64 "\n", max_message_id); @@ -521,8 +545,8 @@ run (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n"); mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, - &master_message, &join_request, - &master_started, NULL); + &master_start_cb, &join_request_cb, + &master_message_cb, NULL); } |