aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_multicast_service.h67
-rw-r--r--src/include/gnunet_psyc_service.h119
-rw-r--r--src/multicast/Makefile.am2
-rw-r--r--src/multicast/gnunet-service-multicast.c160
-rw-r--r--src/multicast/multicast.h51
-rw-r--r--src/multicast/multicast_api.c120
-rw-r--r--src/psyc/gnunet-service-psyc.c261
-rw-r--r--src/psyc/psyc.h19
-rw-r--r--src/psyc/psyc_api.c147
-rw-r--r--src/psyc/test_psyc.c86
10 files changed, 665 insertions, 367 deletions
diff --git a/src/include/gnunet_multicast_service.h b/src/include/gnunet_multicast_service.h
index 7ceb2e352c..5079a087b6 100644
--- a/src/include/gnunet_multicast_service.h
+++ b/src/include/gnunet_multicast_service.h
@@ -230,7 +230,7 @@ GNUNET_NETWORK_STRUCT_END
/**
* Handle that identifies a join request.
*
- * Used to match calls to #GNUNET_MULTICAST_JoinCallback to the
+ * Used to match calls to #GNUNET_MULTICAST_JoinRequestCallback to the
* corresponding calls to #GNUNET_MULTICAST_join_decision().
*/
struct GNUNET_MULTICAST_JoinHandle;
@@ -240,11 +240,12 @@ struct GNUNET_MULTICAST_JoinHandle;
* Function to call with the decision made for a join request.
*
* Must be called once and only once in response to an invocation of the
- * #GNUNET_MULTICAST_JoinCallback.
+ * #GNUNET_MULTICAST_JoinRequestCallback.
*
* @param jh Join request handle.
- * @param is_admitted #GNUNET_YES if joining is approved,
- * #GNUNET_NO if it is disapproved
+ * @param is_admitted #GNUNET_YES if the join is approved,
+ * #GNUNET_NO if it is disapproved,
+ * #GNUNET_SYSERR if we cannot answer the request.
* @param relay_count Number of relays given.
* @param relays Array of suggested peers that might be useful relays to use
* when joining the multicast group (essentially a list of peers that
@@ -261,7 +262,7 @@ struct GNUNET_MULTICAST_JoinHandle;
struct GNUNET_MULTICAST_ReplayHandle *
GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
int is_admitted,
- unsigned int relay_count,
+ uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_resp);
@@ -273,16 +274,40 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
* with the decision.
*
* @param cls Closure.
- * @param peer Identity of the member that wants to join.
- * @param member_key Requesting member's public key.
+ * @param member_key Public key of the member requesting join.
* @param join_msg Application-dependent join message from the new member.
* @param jh Join handle to pass to GNUNET_MULTICAST_join_decison().
*/
typedef void
-(*GNUNET_MULTICAST_JoinCallback) (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
- const struct GNUNET_MessageHeader *join_msg,
- struct GNUNET_MULTICAST_JoinHandle *jh);
+(*GNUNET_MULTICAST_JoinRequestCallback) (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *member_key,
+ const struct GNUNET_MessageHeader *join_msg,
+ struct GNUNET_MULTICAST_JoinHandle *jh);
+
+
+/**
+ * Method called to inform about the decision in response to a join request.
+ *
+ * If @a is_admitted is not #GNUNET_YES, then the multicast service disconnects
+ * the client and the multicast member handle returned by
+ * GNUNET_MULTICAST_member_join() is invalidated.
+ *
+ * @param cls Closure.
+ * @param is_admitted #GNUNET_YES or #GNUNET_NO or #GNUNET_SYSERR
+ * @param peer The peer we are connected to and the join decision is from.
+ * @param relay_count Number of peers in the @a relays array.
+ * @param relays Peer identities of members of the group, which serve as relays
+ * and can be used to join the group at. If empty, only the origin can
+ * be used to connect to the group.
+ * @param join_msg Application-dependent join message from the origin.
+ */
+typedef void
+(*GNUNET_MULTICAST_JoinDecisionCallback) (void *cls,
+ int is_admitted,
+ const struct GNUNET_PeerIdentity *peer,
+ uint16_t relay_count,
+ const struct GNUNET_PeerIdentity *relays,
+ const struct GNUNET_MessageHeader *join_msg);
/**
@@ -542,9 +567,9 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
* @param cfg Configuration to use.
* @param priv_key ECC key that will be used to sign messages for this
* multicast session; public key is used to identify the multicast group;
- * @param next_fragment_id Next fragment ID to continue counting fragments from
- * when restarting the origin. 1 for a new group.
- * @param join_cb Function called to approve / disapprove joining of a peer.
+ * @param max_fragment_id Maximum fragment ID already sent to the group.
+ * 0 for a new group.
+ * @param join_request_cb Function called to approve / disapprove joining of a peer.
* @param member_test_cb Function multicast can use to test group membership.
* @param replay_frag_cb Function that can be called to replay a message fragment.
* @param replay_msg_cb Function that can be called to replay a message.
@@ -558,8 +583,8 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
struct GNUNET_MULTICAST_Origin *
GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
- uint64_t next_fragment_id,
- GNUNET_MULTICAST_JoinCallback join_cb,
+ uint64_t max_fragment_id,
+ GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
@@ -667,11 +692,12 @@ GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *origin);
* of multicast messages.
* @param relay_count Number of peers in the @a relays array.
* @param relays Peer identities of members of the group, which serve as relays
- * and can be used to join the group at. and send the @a join_request to.
+ * and can be used to join the group at and send the @a join_request to.
* If empty, the @a join_request is sent directly to the @a origin.
* @param join_msg Application-dependent join message to be passed to the
* @a origin.
- * @param join_cb Function called to approve / disapprove joining of a peer.
+ * @param join_request_cb Function called to approve / disapprove joining of a peer.
+ * @param join_decision_cb Function called to inform about the join decision.
* @param mem_test_cb Function multicast can use to test group membership.
* @param replay_frag_cb Function that can be called to replay message fragments
* this peer already knows from this group. NULL if this
@@ -690,10 +716,11 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EddsaPublicKey *group_key,
const struct GNUNET_CRYPTO_EddsaPrivateKey *member_key,
const struct GNUNET_PeerIdentity *origin,
- uint32_t relay_count,
+ uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_request,
- GNUNET_MULTICAST_JoinCallback join_cb,
+ GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
+ GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
GNUNET_MULTICAST_MembershipTestCallback mem_test_cb,
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index 2ae20adc57..4f4c99c1f9 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -325,17 +325,18 @@ typedef void
/**
* Method called from PSYC upon receiving a join request.
*
- * @param cls Closure.
- * @param slave requesting to join.
+ * @param cls Closure.
+ * @param slave_key Public key of the slave requesting join.
* @param join_msg Join message sent along with the request.
- * @param jh Join handle to use with GNUNET_PSYC_join_decision()
+ * @param jh Join handle to use with GNUNET_PSYC_join_decision()
*/
typedef void
-(*GNUNET_PSYC_JoinCallback) (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey
- *slave_key,
- const struct GNUNET_PSYC_MessageHeader *join_msg,
- struct GNUNET_PSYC_JoinHandle *jh);
+(*GNUNET_PSYC_JoinRequestCallback) (void *cls,
+ const struct
+ GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ const struct
+ GNUNET_PSYC_MessageHeader *join_msg,
+ struct GNUNET_PSYC_JoinHandle *jh);
/**
@@ -344,32 +345,30 @@ typedef void
* Must be called once and only once in response to an invocation of the
* #GNUNET_PSYC_JoinCallback.
*
- * @param jh Join request handle.
- * @param is_admitted #GNUNET_YES if joining is approved,
- * #GNUNET_NO if it is disapproved.
- * @param relay_count Number of relays given.
- * @param relays Array of suggested peers that might be useful relays to use
+ * @param jh Join request handle.
+ * @param is_admitted #GNUNET_YES if the join is approved,
+ * #GNUNET_NO if it is disapproved,
+ * #GNUNET_SYSERR if we cannot answer the request.
+ * @param relay_count Number of relays given.
+ * @param relays Array of suggested peers that might be useful relays to use
* when joining the multicast group (essentially a list of peers that
* are already part of the multicast group and might thus be willing
* to help with routing). If empty, only this local peer (which must
* be the multicast origin) is a good candidate for building the
* multicast tree. Note that it is unnecessary to specify our own
* peer identity in this array.
- * @param method_name Method name for the message transmitted with the response.
- * @param env Environment containing transient variables for the message,
- * or NULL.
- * @param data Data of the message.
- * @param data_size Size of @a data.
+ * @param join_resp Application-dependent join response message to send along
+ * with the decision.
+ *
+ * @return #GNUNET_OK on success,
+ * #GNUNET_SYSERR if @a join_resp is too large.
*/
-void
+int
GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
int is_admitted,
uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
- const char *method_name,
- const struct GNUNET_ENV_Environment *env,
- const void *data,
- size_t data_size);
+ const struct GNUNET_PSYC_MessageHeader *join_resp);
/**
@@ -400,29 +399,31 @@ typedef void
* or part messages, the respective methods must call other PSYC functions to
* inform PSYC about the meaning of the respective events.
*
- * @param cfg Configuration to use (to connect to PSYC service).
- * @param channel_key ECC key that will be used to sign messages for this
+ * @param cfg Configuration to use (to connect to PSYC service).
+ * @param channel_key ECC key that will be used to sign messages for this
* PSYC session. The public key is used to identify the PSYC channel.
* Note that end-users will usually not use the private key directly, but
* rather look it up in GNS for places managed by other users, or select
* a file with the private key(s) when setting up their own channels
* FIXME: we'll likely want to use NOT the p521 curve here, but a cheaper
* one in the future.
- * @param policy Channel policy specifying join and history restrictions.
+ * @param policy Channel policy specifying join and history restrictions.
* Used to automate join decisions.
- * @param message_cb Function to invoke on message parts received from slaves.
- * @param join_cb Function to invoke when a peer wants to join.
- * @param master_started_cb Function to invoke after the channel master started.
- * @param cls Closure for @a method and @a join_cb.
+ * @param master_start_cb Function to invoke after the channel master started.
+ * @param join_request_cb Function to invoke when a slave wants to join.
+ * @param message_cb Function to invoke on message parts sent to the channel
+ * and received from slaves
+ * @param cls Closure for @a method and @a join_cb.
+ *
* @return Handle for the channel master, NULL on error.
*/
struct GNUNET_PSYC_Master *
GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key,
enum GNUNET_PSYC_Policy policy,
+ GNUNET_PSYC_MasterStartCallback master_start_cb,
+ GNUNET_PSYC_JoinRequestCallback join_request_cb,
GNUNET_PSYC_MessageCallback message_cb,
- GNUNET_PSYC_JoinCallback join_cb,
- GNUNET_PSYC_MasterStartCallback master_started_cb,
void *cls);
@@ -580,13 +581,30 @@ struct GNUNET_PSYC_Slave;
/**
- * Function called after the slave joined.
+ * Function called after the slave connected to the PSYC service.
*
* @param cls Closure.
* @param max_message_id Last message ID sent to the channel.
*/
typedef void
-(*GNUNET_PSYC_SlaveJoinCallback) (void *cls, uint64_t max_message_id);
+(*GNUNET_PSYC_SlaveConnectCallback) (void *cls, uint64_t max_message_id);
+
+
+/**
+ * Method called to inform about the decision in response to a join request.
+ *
+ * If @a is_admitted is not #GNUNET_YES, then sending messages to the channel is
+ * not possible, but earlier history can be still queried.
+ *
+ * @param cls Closure.
+ * @param is_admitted #GNUNET_YES or #GNUNET_NO or #GNUNET_SYSERR
+ * @param join_msg Application-dependent join message from the origin.
+ */
+typedef void
+(*GNUNET_PSYC_JoinDecisionCallback) (void *cls,
+ int is_admitted,
+ const struct
+ GNUNET_PSYC_MessageHeader *join_msg);
/**
@@ -599,24 +617,28 @@ typedef void
* notification on failure (as the channel may simply take days to approve,
* and disapproval is simply being ignored).
*
- * @param cfg Configuration to use.
- * @param channel_key ECC public key that identifies the channel we wish to join.
- * @param slave_key ECC private-public key pair that identifies the slave, and
+ * @param cfg Configuration to use.
+ * @param channel_key ECC public key that identifies the channel we wish to join.
+ * @param slave_key ECC private-public key pair that identifies the slave, and
* used by multicast to sign the join request and subsequent unicast
* requests sent to the master.
- * @param origin Peer identity of the origin.
- * @param relay_count Number of peers in the @a relays array.
- * @param relays Peer identities of members of the multicast group, which serve
+ * @param origin Peer identity of the origin.
+ * @param relay_count Number of peers in the @a relays array.
+ * @param relays Peer identities of members of the multicast group, which serve
* as relays and used to join the group at.
- * @param message_cb Function to invoke on message parts received from the
+ * @param message_cb Function to invoke on message parts received from the
* channel, typically at least contains method handlers for @e join and
* @e part.
- * @param slave_joined_cb Function invoked once we have joined the channel.
- * @param cls Closure for @a message_cb and @a slave_joined_cb.
- * @param method_name Method name for the join request.
- * @param env Environment containing transient variables for the request, or NULL.
- * @param data Payload for the join message.
- * @param data_size Number of bytes in @a data.
+ * @param slave_connect_cb Function invoked once we have connected to the
+ * PSYC service.
+ * @param join_decision_cb Function invoked once we have received a join
+ * decision.
+ * @param cls Closure for @a message_cb and @a slave_joined_cb.
+ * @param method_name Method name for the join request.
+ * @param env Environment containing transient variables for the request, or NULL.
+ * @param data Payload for the join message.
+ * @param data_size Number of bytes in @a data.
+ *
* @return Handle for the slave, NULL on error.
*/
struct GNUNET_PSYC_Slave *
@@ -627,7 +649,8 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_MessageCallback message_cb,
- GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
+ GNUNET_PSYC_SlaveConnectCallback slave_connect_cb,
+ GNUNET_PSYC_JoinDecisionCallback join_decision_cb,
void *cls,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
diff --git a/src/multicast/Makefile.am b/src/multicast/Makefile.am
index b2c1702e46..8ccc7b88a6 100644
--- a/src/multicast/Makefile.am
+++ b/src/multicast/Makefile.am
@@ -46,10 +46,12 @@ gnunet_service_multicast_SOURCES = \
gnunet-service-multicast.c
gnunet_service_multicast_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
$(GN_LIBINTL)
gnunet_service_multicast_DEPENDENCIES = \
$(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
$(top_builddir)/src/statistics/libgnunetstatistics.la
diff --git a/src/multicast/gnunet-service-multicast.c b/src/multicast/gnunet-service-multicast.c
index d412d3f8ed..5421c1b2b0 100644
--- a/src/multicast/gnunet-service-multicast.c
+++ b/src/multicast/gnunet-service-multicast.c
@@ -27,6 +27,7 @@
#include "gnunet_util_lib.h"
#include "gnunet_signatures.h"
#include "gnunet_statistics_service.h"
+#include "gnunet_core_service.h"
#include "gnunet_multicast_service.h"
#include "multicast.h"
@@ -36,6 +37,22 @@
static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
+ * Server handle.
+ */
+static struct GNUNET_SERVER_Handle *server;
+
+/**
+ * Core handle.
+ * Only used during initialization.
+ */
+static struct GNUNET_CORE_Handle *core;
+
+/**
+ * Identity of this peer.
+ */
+static struct GNUNET_PeerIdentity this_peer;
+
+/**
* Handle to the statistics service.
*/
static struct GNUNET_STATISTICS_Handle *stats;
@@ -62,7 +79,6 @@ static struct GNUNET_CONTAINER_MultiHashMap *members;
*/
static struct GNUNET_CONTAINER_MultiHashMap *group_members;
-
/**
* List of connected clients.
*/
@@ -147,7 +163,7 @@ struct Member
/**
* Join request sent to the origin / members.
*/
- struct MulticastJoinRequestMessage *join_request;
+ struct MulticastJoinRequestMessage *join_req;
/**
* Join decision sent in reply to our request.
@@ -155,7 +171,7 @@ struct Member
* Only a positive decision is stored here, in case of a negative decision the
* client is disconnected.
*/
- struct MulticastJoinDecisionMessage *join_decision;
+ struct MulticastJoinDecisionMessageHeader *join_dcsn;
/**
* Last request fragment ID sent to the origin.
@@ -171,9 +187,19 @@ struct Member
* @param tc unused
*/
static void
-cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- /* FIXME: do clean up here */
+ if (NULL != core)
+ {
+ GNUNET_CORE_disconnect (core);
+ core = NULL;
+ }
+ if (NULL != stats)
+ {
+ GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
+ stats = NULL;
+ }
+ /* FIXME: do more clean up here */
}
/**
@@ -206,8 +232,11 @@ cleanup_member (struct Member *mem)
grp_mem);
GNUNET_CONTAINER_multihashmap_destroy (grp_mem);
}
- if (NULL != mem->join_decision)
- GNUNET_free (mem->join_decision);
+ if (NULL != mem->join_dcsn)
+ {
+ GNUNET_free (mem->join_dcsn);
+ mem->join_dcsn = NULL;
+ }
GNUNET_CONTAINER_multihashmap_remove (members, &grp->pub_key_hash, mem);
}
@@ -329,7 +358,7 @@ member_message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
const struct GNUNET_MessageHeader *msg = cls;
struct Member *mem = member;
- if (NULL != mem->join_decision)
+ if (NULL != mem->join_dcsn)
{ /* Only send message to admitted members */
message_to_clients (&mem->grp, msg);
}
@@ -374,7 +403,7 @@ message_to_origin (struct Group *grp, const struct GNUNET_MessageHeader *msg)
* Handle a connecting client starting an origin.
*/
static void
-handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
+client_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
const struct MulticastOriginStartMessage *
@@ -424,8 +453,8 @@ handle_origin_start (void *cls, struct GNUNET_SERVER_Client *client,
* Handle a connecting client joining a group.
*/
static void
-handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *m)
+client_member_join (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
{
const struct MulticastMemberJoinMessage *
msg = (const struct MulticastMemberJoinMessage *) m;
@@ -487,16 +516,16 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
GNUNET_SERVER_client_set_user_context (client, grp);
- if (NULL != mem->join_decision)
+ if (NULL != mem->join_dcsn)
{ /* Already got a join decision, send it to client. */
GNUNET_SERVER_notification_context_add (nc, client);
GNUNET_SERVER_notification_context_unicast (nc, client,
(struct GNUNET_MessageHeader *)
- mem->join_decision,
+ mem->join_dcsn,
GNUNET_NO);
}
else if (grp->clients_head == grp->clients_tail)
- { /* First client, send join request. */
+ { /* First client of the group, send join request. */
struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &msg[1];
uint32_t relay_count = ntohs (msg->relay_count);
uint16_t relay_size = relay_count * sizeof (*relays);
@@ -515,6 +544,7 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
req->header.size = htons (sizeof (*req) + join_msg_size);
req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST);
req->group_key = grp->pub_key;
+ req->member_peer = this_peer;
GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &req->member_key);
if (0 < join_msg_size)
memcpy (&req[1], join_msg, join_msg_size);
@@ -531,14 +561,14 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
GNUNET_assert (0);
}
- if (NULL != mem->join_request)
- GNUNET_free (mem->join_request);
- mem->join_request = req;
+ if (NULL != mem->join_req)
+ GNUNET_free (mem->join_req);
+ mem->join_req = req;
if (GNUNET_YES
== GNUNET_CONTAINER_multihashmap_contains (origins, &grp->pub_key_hash))
{ /* Local origin */
- message_to_origin (grp, (struct GNUNET_MessageHeader *) mem->join_request);
+ message_to_origin (grp, (struct GNUNET_MessageHeader *) mem->join_req);
}
else
{
@@ -553,34 +583,15 @@ handle_member_join (void *cls, struct GNUNET_SERVER_Client *client,
* Join decision from client.
*/
static void
-handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
+client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
struct Group *
grp = GNUNET_SERVER_client_get_user_context (client, struct Group);
- const struct MulticastClientJoinDecisionMessage *
- cl_dcsn = (const struct MulticastClientJoinDecisionMessage *) m;
-
- struct GNUNET_PeerIdentity *relays = (struct GNUNET_PeerIdentity *) &cl_dcsn[1];
- uint32_t relay_count = ntohs (cl_dcsn->relay_count);
- uint16_t relay_size = relay_count * sizeof (*relays);
-
- struct GNUNET_MessageHeader *join_msg = NULL;
- uint16_t join_msg_size = 0;
- if (sizeof (*cl_dcsn) + relay_size + sizeof (*m) <= ntohs (m->size))
- {
- join_msg = (struct GNUNET_MessageHeader *)
- (((char *) &cl_dcsn[1]) + relay_size);
- join_msg_size = ntohs (join_msg->size);
- }
-
- int keep_dcsn = GNUNET_NO;
- struct MulticastJoinDecisionMessage *
- dcsn = GNUNET_malloc (sizeof (*dcsn) + join_msg_size);
- dcsn->header.size = htons (sizeof (*dcsn) + join_msg_size);
- dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
- dcsn->is_admitted = cl_dcsn->is_admitted;
- memcpy (&dcsn[1], join_msg, join_msg_size);
+ const struct MulticastJoinDecisionMessageHeader *
+ hdcsn = (const struct MulticastJoinDecisionMessageHeader *) m;
+ const struct MulticastJoinDecisionMessage *
+ dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%p Got join decision from client for group %s..\n",
@@ -595,7 +606,7 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
if (NULL != grp_mem)
{
struct GNUNET_HashCode member_key_hash;
- GNUNET_CRYPTO_hash (&cl_dcsn->member_key, sizeof (cl_dcsn->member_key),
+ GNUNET_CRYPTO_hash (&hdcsn->member_key, sizeof (hdcsn->member_key),
&member_key_hash);
struct Member *
mem = GNUNET_CONTAINER_multihashmap_get (grp_mem, &member_key_hash);
@@ -604,19 +615,21 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
grp, GNUNET_h2s (&member_key_hash), mem);
if (NULL != mem)
{
- message_to_clients (grp, (struct GNUNET_MessageHeader *) dcsn);
- if (GNUNET_YES == dcsn->is_admitted)
+ message_to_clients (&mem->grp, (struct GNUNET_MessageHeader *) hdcsn);
+ if (GNUNET_YES == ntohl (dcsn->is_admitted))
{ /* Member admitted, store join_decision. */
- mem->join_decision = dcsn;
- keep_dcsn = GNUNET_YES;
+ uint16_t dcsn_size = ntohs (dcsn->header.size);
+ mem->join_dcsn = GNUNET_malloc (dcsn_size);
+ memcpy (mem->join_dcsn, dcsn, dcsn_size);
}
else
{ /* Refused entry, disconnect clients. */
struct ClientList *cl = mem->grp.clients_head;
while (NULL != cl)
{
- GNUNET_SERVER_client_disconnect (cl->client);
+ struct GNUNET_SERVER_Client *client = cl->client;
cl = cl->next;
+ GNUNET_SERVER_client_disconnect (client);
}
}
}
@@ -624,10 +637,8 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
}
else
{
- /* FIXME: send join decision to remote peers */
+ /* FIXME: send join decision to hdcsn->peer */
}
- if (GNUNET_NO == keep_dcsn)
- GNUNET_free (dcsn);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -635,7 +646,7 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
* Incoming message from a client.
*/
static void
-handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
+client_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
struct Group *
@@ -670,7 +681,7 @@ handle_multicast_message (void *cls, struct GNUNET_SERVER_Client *client,
* Incoming request from a client.
*/
static void
-handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
+client_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
struct Group *
@@ -708,37 +719,34 @@ handle_multicast_request (void *cls, struct GNUNET_SERVER_Client *client,
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+
/**
- * Process multicast requests.
- *
- * @param cls closure
- * @param server the initialized server
- * @param cfg configuration to use
+ * Core connected.
*/
static void
-run (void *cls, struct GNUNET_SERVER_Handle *server,
- const struct GNUNET_CONFIGURATION_Handle *c)
+core_connected_cb (void *cls, const struct GNUNET_PeerIdentity *my_identity)
{
+ this_peer = *my_identity;
+
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- { &handle_origin_start, NULL,
+ { &client_origin_start, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_ORIGIN_START, 0 },
- { &handle_member_join, NULL,
+ { &client_member_join, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_MEMBER_JOIN, 0 },
- { &handle_join_decision, NULL,
+ { &client_join_decision, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION, 0 },
- { &handle_multicast_message, NULL,
+ { &client_multicast_message, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE, 0 },
- { &handle_multicast_request, NULL,
+ { &client_multicast_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST, 0 },
{NULL, NULL, 0, 0}
};
- cfg = c;
stats = GNUNET_STATISTICS_create ("multicast", cfg);
origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
@@ -747,12 +755,30 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &cleanup_task,
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
NULL);
}
/**
+ * Service started.
+ *
+ * @param cls closure
+ * @param server the initialized server
+ * @param cfg configuration to use
+ */
+static void
+run (void *cls, struct GNUNET_SERVER_Handle *srv,
+ const struct GNUNET_CONFIGURATION_Handle *c)
+{
+ cfg = c;
+ server = srv;
+ core = GNUNET_CORE_connect (cfg, NULL, core_connected_cb, NULL, NULL,
+ NULL, GNUNET_NO, NULL, GNUNET_NO, NULL);
+}
+
+
+/**
* The main function for the multicast service.
*
* @param argc number of arguments from the command line
diff --git a/src/multicast/multicast.h b/src/multicast/multicast.h
index 85c5714e6e..76492e868e 100644
--- a/src/multicast/multicast.h
+++ b/src/multicast/multicast.h
@@ -77,7 +77,7 @@ struct MulticastJoinRequestMessage
/**
- * Header of a join decision sent to a remote peer.
+ * Header of a join decision message sent to a peer requesting join.
*/
struct MulticastJoinDecisionMessage
{
@@ -87,19 +87,28 @@ struct MulticastJoinDecisionMessage
struct GNUNET_MessageHeader header;
/**
- * #GNUNET_YES if the peer was admitted.
+ * #GNUNET_YES if the peer was admitted
+ * #GNUNET_NO if entry was refused,
+ * #GNUNET_SYSERR if the request could not be answered.
*/
- uint8_t is_admitted;
+ int32_t is_admitted;
+
+ /**
+ * Number of relays given.
+ */
+ uint32_t relay_count;
+
+ /* Followed by relay_count peer identities */
/* Followed by the join response message */
};
/**
- * Message sent from the client to the service to notify the service
- * about a join decision.
+ * Header added to a struct MulticastJoinDecisionMessage
+ * when sent between the client and service.
*/
-struct MulticastClientJoinDecisionMessage
+struct MulticastJoinDecisionMessageHeader
{
/**
* Type: GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION
@@ -107,29 +116,18 @@ struct MulticastClientJoinDecisionMessage
struct GNUNET_MessageHeader header;
/**
- * Number of relays given.
+ * C->S: Peer to send the join decision to.
+ * S->C: Peer we received the join decision from.
*/
- uint32_t relay_count;
+ struct GNUNET_PeerIdentity peer;
/**
- * Public key of the joining member.
+ * C->S: Public key of the member requesting join.
+ * S->C: Unused.
*/
struct GNUNET_CRYPTO_EddsaPublicKey member_key;
- /**
- * Peer identity of the joining member.
- */
- struct GNUNET_PeerIdentity member_peer;
-
- /**
- * #GNUNET_YES if the peer was admitted.
- */
- uint8_t is_admitted;
-
- /* Followed by relay_count peer identities */
-
- /* Followed by the join response message */
-
+ /* Followed by struct MulticastJoinDecisionMessage */
};
@@ -139,7 +137,6 @@ struct MulticastClientJoinDecisionMessage
*/
struct MulticastMembershipTestResultMessage
{
-
/**
* Type: GNUNET_MESSAGE_TYPE_MULTICAST_MEMBERSHIP_TEST_RESULT
*/
@@ -151,11 +148,11 @@ struct MulticastMembershipTestResultMessage
uint32_t uid;
/**
- * #GNUNET_YES if the peer is a member, #GNUNET_NO if peer was not a member,
- * #GNUNET_SYSERR if we cannot answer the test.
+ * #GNUNET_YES if the peer is a member
+ * #GNUNET_NO if peer is not a member,
+ * #GNUNET_SYSERR if the test could not be answered.
*/
int32_t is_admitted;
-
};
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 501ff4b701..e568e77ee1 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -132,7 +132,7 @@ struct GNUNET_MULTICAST_Group
struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
struct GNUNET_HashCode pub_key_hash;
- GNUNET_MULTICAST_JoinCallback join_cb;
+ GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb;
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb;
@@ -177,10 +177,7 @@ struct GNUNET_MULTICAST_Member
struct GNUNET_MULTICAST_Group grp;
struct GNUNET_MULTICAST_MemberTransmitHandle tmit;
- struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
- struct GNUNET_PeerIdentity origin;
- struct GNUNET_PeerIdentity relays;
- uint32_t relay_count;
+ GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
uint64_t next_fragment_id;
};
@@ -197,12 +194,12 @@ struct GNUNET_MULTICAST_JoinHandle
struct GNUNET_MULTICAST_Group *group;
/**
- * Public key of the joining member.
+ * Public key of the member requesting join.
*/
struct GNUNET_CRYPTO_EddsaPublicKey member_key;
/**
- * Peer identity of the joining member.
+ * Peer identity of the member requesting join.
*/
struct GNUNET_PeerIdentity member_peer;
};
@@ -476,8 +473,9 @@ request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin)
"Calling request callback for a request of type %u and size %u.\n",
ntohs (req->header.type), ntohs (req->header.size));
- orig->request_cb (orig->grp.cb_cls, &req->member_key,
- (const struct GNUNET_MessageHeader *) req, 0);
+ if (NULL != orig->request_cb)
+ orig->request_cb (orig->grp.cb_cls, &req->member_key,
+ (const struct GNUNET_MessageHeader *) req, 0);
return GNUNET_YES;
}
@@ -501,7 +499,8 @@ join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
msg = (const struct GNUNET_MessageHeader *) &req[1];
- grp->join_cb (grp->cb_cls, &req->member_key, msg, jh);
+ if (NULL != grp->join_req_cb)
+ grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh);
return GNUNET_YES;
}
@@ -513,15 +512,44 @@ static int
join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
void *member)
{
- const struct MulticastJoinDecisionMessage *dcsn = cls;
+ const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
+ const struct MulticastJoinDecisionMessage *
+ dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
struct GNUNET_MULTICAST_Member *mem = member;
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
- const struct GNUNET_MessageHeader *msg = NULL;
- if (sizeof (*dcsn) + sizeof (*msg) <= ntohs (dcsn->header.size))
- msg = (const struct GNUNET_MessageHeader *) &dcsn[1];
+ uint16_t dcsn_size = ntohs (dcsn->header.size);
+ int is_admitted = ntohl (dcsn->is_admitted);
+
+ const struct GNUNET_MessageHeader *join_resp = NULL;
+ uint16_t join_resp_size = 0;
+
+ uint16_t relay_count = ntohl (dcsn->relay_count);
+ const struct GNUNET_PeerIdentity *relays = NULL;
+ uint16_t relay_size = relay_count * sizeof (*relays);
+ if (0 < relay_count && dcsn_size < sizeof (*dcsn) + relay_size)
+ relays = (struct GNUNET_PeerIdentity *) &dcsn[1];
+
+ if (sizeof (*dcsn) + relay_size + sizeof (*join_resp) <= dcsn_size)
+ {
+ join_resp = (const struct GNUNET_MessageHeader *) &dcsn[1];
+ join_resp_size = ntohs (join_resp->size);
+ }
+ if (dcsn_size < sizeof (*dcsn) + relay_size + join_resp_size)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received invalid join decision message from multicast.\n");
+ GNUNET_break_op (0);
+ is_admitted = GNUNET_SYSERR;
+ }
+
+ if (NULL != mem->join_dcsn_cb)
+ mem->join_dcsn_cb (grp->cb_cls, is_admitted, &hdcsn->peer,
+ relay_count, relays, join_resp);
+
+ if (GNUNET_YES != is_admitted)
+ GNUNET_MULTICAST_member_part (mem);
- // FIXME: grp->join_decision_cb (grp->cb_cls, msg);
return GNUNET_YES;
}
@@ -599,7 +627,6 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
GNUNET_break (0);
break;
}
-
if (NULL != origins)
GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
request_cb, (void *) msg);
@@ -615,9 +642,11 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
break;
case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION:
- if (NULL != origins)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- join_decision_cb, (void *) msg);
+ if (GNUNET_NO != grp->is_origin)
+ {
+ GNUNET_break (0);
+ break;
+ }
if (NULL != members)
GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
join_decision_cb, (void *) msg);
@@ -636,11 +665,12 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
* Function to call with the decision made for a join request.
*
* Must be called once and only once in response to an invocation of the
- * #GNUNET_MULTICAST_JoinCallback.
+ * #GNUNET_MULTICAST_JoinRequestCallback.
*
* @param jh Join request handle.
- * @param is_admitted #GNUNET_YES if joining is approved,
- * #GNUNET_NO if it is disapproved
+ * @param is_admitted #GNUNET_YES if the join is approved,
+ * #GNUNET_NO if it is disapproved,
+ * #GNUNET_SYSERR if we cannot answer the request.
* @param relay_count Number of relays given.
* @param relays Array of suggested peers that might be useful relays to use
* when joining the multicast group (essentially a list of peers that
@@ -657,25 +687,31 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
struct GNUNET_MULTICAST_ReplayHandle *
GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
int is_admitted,
- unsigned int relay_count,
+ uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_resp)
{
struct GNUNET_MULTICAST_Group *grp = jh->group;
uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
uint16_t relay_size = relay_count * sizeof (*relays);
- struct MulticastClientJoinDecisionMessage * dcsn;
+ struct MulticastJoinDecisionMessageHeader * hdcsn;
+ struct MulticastJoinDecisionMessage *dcsn;
struct MessageQueue *
- mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn)
+ mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn)
+ relay_size + join_resp_size);
- dcsn = (struct MulticastClientJoinDecisionMessage *) &mq[1];
+ hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1];
+ hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+ hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
+ + relay_size + join_resp_size);
+ hdcsn->member_key = jh->member_key;
+ hdcsn->peer = jh->member_peer;
+
+ dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
- dcsn->member_key = jh->member_key;
- dcsn->member_peer = jh->member_peer;
- dcsn->is_admitted = is_admitted;
- dcsn->relay_count = relay_count;
+ dcsn->is_admitted = htonl (is_admitted);
+ dcsn->relay_count = htonl (relay_count);
if (0 < relay_size)
memcpy (&dcsn[1], relays, relay_size);
if (0 < join_resp_size)
@@ -763,7 +799,7 @@ GNUNET_MULTICAST_replay_response2 (struct GNUNET_MULTICAST_ReplayHandle *rh,
* multicast session; public key is used to identify the multicast group;
* @param max_fragment_id Maximum fragment ID already sent to the group.
* 0 for a new group.
- * @param join_cb Function called to approve / disapprove joining of a peer.
+ * @param join_request_cb Function called to approve / disapprove joining of a peer.
* @param member_test_cb Function multicast can use to test group membership.
* @param replay_frag_cb Function that can be called to replay a message fragment.
* @param replay_msg_cb Function that can be called to replay a message.
@@ -779,7 +815,7 @@ struct GNUNET_MULTICAST_Origin *
GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EddsaPrivateKey *priv_key,
uint64_t max_fragment_id,
- GNUNET_MULTICAST_JoinCallback join_cb,
+ GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
@@ -801,7 +837,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
grp->cfg = cfg;
grp->cb_cls = cls;
- grp->join_cb = join_cb;
+ grp->join_req_cb = join_request_cb;
grp->member_test_cb = member_test_cb;
grp->replay_frag_cb = replay_frag_cb;
grp->replay_msg_cb = replay_msg_cb;
@@ -963,7 +999,8 @@ GNUNET_MULTICAST_origin_to_all_cancel (struct GNUNET_MULTICAST_OriginTransmitHan
* If empty, the @a join_request is sent directly to the @a origin.
* @param join_msg Application-dependent join message to be passed to the peer
* @a origin.
- * @param join_cb Function called to approve / disapprove joining of a peer.
+ * @param join_request_cb Function called to approve / disapprove joining of a peer.
+ * @param join_decision_cb Function called to inform about the join decision.
* @param member_test_cb Function multicast can use to test group membership.
* @param replay_frag_cb Function that can be called to replay message fragments
* this peer already knows from this group. NULL if this
@@ -982,10 +1019,11 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EddsaPublicKey *group_key,
const struct GNUNET_CRYPTO_EddsaPrivateKey *member_key,
const struct GNUNET_PeerIdentity *origin,
- uint32_t relay_count,
+ uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_msg,
- GNUNET_MULTICAST_JoinCallback join_cb,
+ GNUNET_MULTICAST_JoinRequestCallback join_request_cb,
+ GNUNET_MULTICAST_JoinDecisionCallback join_decision_cb,
GNUNET_MULTICAST_MembershipTestCallback member_test_cb,
GNUNET_MULTICAST_ReplayFragmentCallback replay_frag_cb,
GNUNET_MULTICAST_ReplayMessageCallback replay_msg_cb,
@@ -1014,18 +1052,14 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
grp->cfg = cfg;
grp->pub_key = *group_key;
- grp->join_cb = join_cb;
+ mem->join_dcsn_cb = join_decision_cb;
+ grp->join_req_cb = join_request_cb;
grp->member_test_cb = member_test_cb;
grp->replay_frag_cb = replay_frag_cb;
grp->message_cb = message_cb;
grp->cb_cls = cls;
- mem->origin = *origin;
- mem->relay_count = relay_count;
- mem->relays = *relays;
- mem->priv_key = *member_key;
-
- GNUNET_CRYPTO_eddsa_key_get_public (&mem->priv_key, &grp->pub_key);
+ GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key);
GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
if (NULL == members)
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 2b9ee71354..cef89b8289 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -387,6 +387,11 @@ struct Slave
struct GNUNET_MessageHeader *join_req;
/**
+ * Join decision received from multicast.
+ */
+ struct SlaveJoinDecision *join_dcsn;
+
+ /**
* Maximum request ID for this channel.
*/
uint64_t max_request_id;
@@ -397,6 +402,10 @@ static inline void
transmit_message (struct Channel *ch);
+static uint64_t
+message_queue_drop (struct Channel *ch);
+
+
/**
* Task run during shutdown.
*
@@ -413,7 +422,7 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
}
if (NULL != stats)
{
- GNUNET_STATISTICS_destroy (stats, GNUNET_NO);
+ GNUNET_STATISTICS_destroy (stats, GNUNET_YES);
stats = NULL;
}
}
@@ -471,7 +480,8 @@ cleanup_slave (struct Slave *slv)
static void
cleanup_channel (struct Channel *ch)
{
- /* FIXME: fragment_cache_clear */
+ message_queue_drop (ch);
+ GNUNET_CONTAINER_multihashmap_remove_all (recv_cache, &ch->pub_key_hash);
if (NULL != ch->store_op)
GNUNET_PSYCSTORE_operation_cancel (ch->store_op);
@@ -570,7 +580,7 @@ struct JoinMemTestClosure
/**
- * Membership test result callback used for join requests.m
+ * Membership test result callback used for join requests.
*/
static void
join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
@@ -585,8 +595,7 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
&slave_key_hash);
GNUNET_CONTAINER_multihashmap_put (mst->join_reqs, &slave_key_hash, jcls->jh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- msg_to_clients (jcls->ch,
- (struct GNUNET_MessageHeader *) jcls->master_join_req);
+ msg_to_clients (jcls->ch, &jcls->master_join_req->header);
}
else
{
@@ -602,9 +611,10 @@ join_mem_test_cb (void *cls, int64_t result, const char *err_msg)
* Incoming join request from multicast.
*/
static void
-join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- const struct GNUNET_MessageHeader *join_msg,
- struct GNUNET_MULTICAST_JoinHandle *jh)
+mcast_join_request_cb (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ const struct GNUNET_MessageHeader *join_msg,
+ struct GNUNET_MULTICAST_JoinHandle *jh)
{
struct Channel *ch = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%p Got join request.\n", ch);
@@ -643,21 +653,58 @@ join_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
}
+/**
+ * Join decision received from multicast.
+ */
+static void
+mcast_join_decision_cb (void *cls, int is_admitted,
+ const struct GNUNET_PeerIdentity *peer,
+ uint16_t relay_count,
+ const struct GNUNET_PeerIdentity *relays,
+ const struct GNUNET_MessageHeader *join_resp)
+{
+ struct Slave *slv = cls;
+ struct Channel *ch = &slv->ch;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Got join decision: %d\n", slv, is_admitted);
+
+ uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
+ struct SlaveJoinDecision *
+ dcsn = slv->join_dcsn = GNUNET_malloc (sizeof (*dcsn) + join_resp_size);
+ dcsn->header.size = htons (sizeof (*dcsn) + join_resp_size);
+ dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
+ dcsn->is_admitted = htonl (is_admitted);
+ if (0 < join_resp_size)
+ memcpy (&dcsn[1], join_resp, join_resp_size);
+
+ msg_to_clients (ch, &dcsn->header);
+
+ if (GNUNET_YES == is_admitted)
+ {
+ ch->ready = GNUNET_YES;
+ }
+ else
+ {
+ slv->member = NULL;
+ }
+}
+
+
static void
-membership_test_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t message_id, uint64_t group_generation,
- struct GNUNET_MULTICAST_MembershipTestHandle *mth)
+mcast_membership_test_cb (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ uint64_t message_id, uint64_t group_generation,
+ struct GNUNET_MULTICAST_MembershipTestHandle *mth)
{
}
static void
-replay_fragment_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t fragment_id, uint64_t flags,
- struct GNUNET_MULTICAST_ReplayHandle *rh)
+mcast_replay_fragment_cb (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ uint64_t fragment_id, uint64_t flags,
+ struct GNUNET_MULTICAST_ReplayHandle *rh)
{
@@ -665,12 +712,12 @@ replay_fragment_cb (void *cls,
static void
-replay_message_cb (void *cls,
- const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- uint64_t message_id,
- uint64_t fragment_offset,
- uint64_t flags,
- struct GNUNET_MULTICAST_ReplayHandle *rh)
+mcast_replay_message_cb (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ uint64_t message_id,
+ uint64_t fragment_offset,
+ uint64_t flags,
+ struct GNUNET_MULTICAST_ReplayHandle *rh)
{
}
@@ -744,7 +791,7 @@ mmsg_to_clients (struct Channel *ch,
pmsg->message_id = mmsg->message_id;
memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
- msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
+ msg_to_clients (ch, &pmsg->header);
GNUNET_free (pmsg);
}
@@ -988,6 +1035,7 @@ fragment_queue_run (struct Channel *ch, uint64_t msg_id,
* has already been delivered to the client.
*
* @param ch Channel.
+ *
* @return Number of messages removed from queue and sent to client.
*/
static uint64_t
@@ -1061,6 +1109,43 @@ message_queue_run (struct Channel *ch)
/**
+ * Drop message queue of a channel.
+ *
+ * Remove all messages in queue without sending it to clients.
+ *
+ * @param ch Channel.
+ *
+ * @return Number of messages removed from queue.
+ */
+static uint64_t
+message_queue_drop (struct Channel *ch)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Dropping message queue.\n", ch);
+ uint64_t n = 0;
+ uint64_t msg_id;
+ while (GNUNET_YES == GNUNET_CONTAINER_heap_peek2 (ch->recv_msgs, NULL,
+ &msg_id))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%p Dropping message %" PRIu64 " from queue.\n", ch, msg_id);
+ struct GNUNET_HashCode msg_id_hash;
+ hash_key_from_hll (&msg_id_hash, msg_id);
+
+ struct FragmentQueue *
+ fragq = GNUNET_CONTAINER_multihashmap_get (ch->recv_frags, &msg_id_hash);
+
+ fragment_queue_run (ch, msg_id, fragq, GNUNET_YES);
+ GNUNET_CONTAINER_heap_remove_root (ch->recv_msgs);
+ n++;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Removed %" PRIu64 " messages from queue.\n", ch, n);
+ return n;
+}
+
+
+/**
* Handle incoming message from multicast.
*
* @param ch Channel.
@@ -1069,7 +1154,7 @@ message_queue_run (struct Channel *ch)
* @return #GNUNET_OK or #GNUNET_SYSERR
*/
static int
-handle_multicast_message (struct Channel *ch,
+client_multicast_message (struct Channel *ch,
const struct GNUNET_MULTICAST_MessageHeader *mmsg)
{
GNUNET_PSYCSTORE_fragment_store (store, &ch->pub_key, mmsg, 0, NULL, NULL);
@@ -1106,7 +1191,7 @@ handle_multicast_message (struct Channel *ch,
* Store it using PSYCstore and send it to the client of the channel.
*/
static void
-message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+mcast_message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
{
struct Channel *ch = cls;
uint16_t type = ntohs (msg->type);
@@ -1120,7 +1205,7 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
{
case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
{
- handle_multicast_message (ch, (const struct
+ client_multicast_message (ch, (const struct
GNUNET_MULTICAST_MessageHeader *) msg);
break;
}
@@ -1141,9 +1226,10 @@ message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
* @param flags Request flags.
*/
static void
-request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- const struct GNUNET_MessageHeader *msg,
- enum GNUNET_MULTICAST_MessageFlags flags)
+mcast_request_cb (void *cls,
+ const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ const struct GNUNET_MessageHeader *msg,
+ enum GNUNET_MULTICAST_MessageFlags flags)
{
struct Master *mst = cls;
struct Channel *ch = &mst->ch;
@@ -1183,7 +1269,7 @@ request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
pmsg->flags = htonl (GNUNET_PSYC_MESSAGE_REQUEST);
memcpy (&pmsg[1], &req[1], size - sizeof (*req));
- msg_to_clients (ch, (const struct GNUNET_MessageHeader *) pmsg);
+ msg_to_clients (ch, &pmsg->header);
GNUNET_free (pmsg);
break;
}
@@ -1221,11 +1307,13 @@ master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
ch->max_state_message_id = max_state_message_id;
mst->max_group_generation = max_group_generation;
mst->origin
- = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
- max_fragment_id,
- join_cb, membership_test_cb,
- replay_fragment_cb, replay_message_cb,
- request_cb, message_cb, ch);
+ = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, max_fragment_id,
+ &mcast_join_request_cb,
+ &mcast_membership_test_cb,
+ &mcast_replay_fragment_cb,
+ &mcast_replay_message_cb,
+ &mcast_request_cb,
+ &mcast_message_cb, ch);
ch->ready = GNUNET_YES;
}
else
@@ -1266,11 +1354,13 @@ slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
= GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
&slv->origin,
slv->relay_count, slv->relays,
- slv->join_req, join_cb,
- membership_test_cb,
- replay_fragment_cb, replay_message_cb,
- message_cb, ch);
- ch->ready = GNUNET_YES;
+ slv->join_req,
+ &mcast_join_request_cb,
+ &mcast_join_decision_cb,
+ &mcast_membership_test_cb,
+ &mcast_replay_fragment_cb,
+ &mcast_replay_message_cb,
+ &mcast_message_cb, ch);
}
else
{
@@ -1297,7 +1387,7 @@ channel_init (struct Channel *ch)
* Handle a connecting client starting a channel master.
*/
static void
-handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
+client_master_start (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
const struct MasterStartRequest *req
@@ -1363,7 +1453,7 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
* Handle a connecting client joining as a channel slave.
*/
static void
-handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
+client_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
const struct SlaveJoinRequest *req
@@ -1389,6 +1479,8 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
{
slv = GNUNET_new (struct Slave);
slv->priv_key = req->slave_key;
+ slv->pub_key = slv_pub_key;
+ slv->pub_key_hash = slv_pub_key_hash;
slv->origin = req->origin;
slv->relay_count = ntohl (req->relay_count);
if (0 < slv->relay_count)
@@ -1411,10 +1503,10 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
if (NULL == ch_slv)
{
ch_slv = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
- GNUNET_CONTAINER_multihashmap_put (channel_slaves, &pub_key_hash, ch_slv,
+ GNUNET_CONTAINER_multihashmap_put (channel_slaves, &ch->pub_key_hash, ch_slv,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
}
- GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv_pub_key_hash, ch,
+ GNUNET_CONTAINER_multihashmap_put (ch_slv, &slv->pub_key_hash, ch,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_FAST);
GNUNET_CONTAINER_multihashmap_put (slaves, &ch->pub_key_hash, ch,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
@@ -1434,6 +1526,29 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
GNUNET_SERVER_notification_context_add (nc, client);
GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
GNUNET_NO);
+
+ if (NULL == slv->member)
+ {
+ slv->member
+ = GNUNET_MULTICAST_member_join (cfg, &ch->pub_key, &slv->priv_key,
+ &slv->origin,
+ slv->relay_count, slv->relays,
+ slv->join_req,
+ &mcast_join_request_cb,
+ &mcast_join_decision_cb,
+ &mcast_membership_test_cb,
+ &mcast_replay_fragment_cb,
+ &mcast_replay_message_cb,
+ &mcast_message_cb, ch);
+
+ }
+ else if (NULL != slv->join_dcsn)
+ {
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client,
+ &slv->join_dcsn->header,
+ GNUNET_NO);
+ }
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1451,7 +1566,7 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
struct JoinDecisionClosure
{
- uint8_t is_admitted;
+ int32_t is_admitted;
struct GNUNET_MessageHeader *msg;
};
@@ -1460,8 +1575,8 @@ struct JoinDecisionClosure
* Iterator callback for responding to join requests of a slave.
*/
static int
-join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *jh)
+send_join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
+ void *jh)
{
struct JoinDecisionClosure *jcls = cls;
// FIXME: add relays
@@ -1474,7 +1589,7 @@ join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
* Join decision from client.
*/
static void
-handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
+client_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
struct Channel *
@@ -1484,7 +1599,7 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
struct MasterJoinDecision *dcsn = (struct MasterJoinDecision *) msg;
struct JoinDecisionClosure jcls;
- jcls.is_admitted = dcsn->is_admitted;
+ jcls.is_admitted = ntohl (dcsn->is_admitted);
jcls.msg
= (sizeof (*dcsn) + sizeof (struct GNUNET_PSYC_MessageHeader)
<= ntohs (msg->size))
@@ -1494,8 +1609,17 @@ handle_join_decision (void *cls, struct GNUNET_SERVER_Client *client,
struct GNUNET_HashCode slave_key_hash;
GNUNET_CRYPTO_hash (&dcsn->slave_key, sizeof (dcsn->slave_key),
&slave_key_hash);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p Got join decision (%d) from client for channel %s..\n",
+ mst, jcls.is_admitted, GNUNET_h2s (&ch->pub_key_hash));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%p ..and slave %s.\n",
+ mst, GNUNET_h2s (&slave_key_hash));
+
GNUNET_CONTAINER_multihashmap_get_multiple (mst->join_reqs, &slave_key_hash,
- &join_decision_cb, &jcls);
+ &send_join_decision_cb, &jcls);
+ GNUNET_CONTAINER_multihashmap_remove_all (mst->join_reqs, &slave_key_hash);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -1758,10 +1882,10 @@ transmit_cancel (struct Channel *ch, struct GNUNET_SERVER_Client *client)
/**
- * Incoming message from a client.
+ * Incoming message from a master or slave client.
*/
static void
-handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
+client_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
struct Channel *
@@ -1775,8 +1899,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
if (GNUNET_YES != ch->ready)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "%p Dropping message from client, channel is not ready yet.\n",
- ch);
+ "%p Channel is not ready, dropping message from client.\n", ch);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return;
}
@@ -1784,7 +1907,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
uint16_t size = ntohs (msg->size);
if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < size - sizeof (*msg))
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large\n", ch);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "%p Message payload too large.\n", ch);
GNUNET_break (0);
transmit_cancel (ch, client);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
@@ -1817,7 +1940,7 @@ handle_psyc_message (void *cls, struct GNUNET_SERVER_Client *client,
* Client requests to add a slave to the membership database.
*/
static void
-handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
+client_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
@@ -1828,7 +1951,7 @@ handle_slave_add (void *cls, struct GNUNET_SERVER_Client *client,
* Client requests to remove a slave from the membership database.
*/
static void
-handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
+client_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
@@ -1839,7 +1962,7 @@ handle_slave_remove (void *cls, struct GNUNET_SERVER_Client *client,
* Client requests channel history from PSYCstore.
*/
static void
-handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
+client_story_request (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
@@ -1850,7 +1973,7 @@ handle_story_request (void *cls, struct GNUNET_SERVER_Client *client,
* Client requests best matching state variable from PSYCstore.
*/
static void
-handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
+client_state_get (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
@@ -1861,7 +1984,7 @@ handle_state_get (void *cls, struct GNUNET_SERVER_Client *client,
* Client requests state variables with a given prefix from PSYCstore.
*/
static void
-handle_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
+client_state_get_prefix (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *msg)
{
@@ -1880,31 +2003,31 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *c)
{
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- { &handle_master_start, NULL,
+ { &client_master_start, NULL,
GNUNET_MESSAGE_TYPE_PSYC_MASTER_START, 0 },
- { &handle_slave_join, NULL,
+ { &client_slave_join, NULL,
GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
- { &handle_join_decision, NULL,
+ { &client_join_decision, NULL,
GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION, 0 },
- { &handle_psyc_message, NULL,
+ { &client_psyc_message, NULL,
GNUNET_MESSAGE_TYPE_PSYC_MESSAGE, 0 },
- { &handle_slave_add, NULL,
+ { &client_slave_add, NULL,
GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_ADD, 0 },
- { &handle_slave_remove, NULL,
+ { &client_slave_remove, NULL,
GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM, 0 },
- { &handle_story_request, NULL,
+ { &client_story_request, NULL,
GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST, 0 },
- { &handle_state_get, NULL,
+ { &client_state_get, NULL,
GNUNET_MESSAGE_TYPE_PSYC_STATE_GET, 0 },
- { &handle_state_get_prefix, NULL,
+ { &client_state_get_prefix, NULL,
GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX, 0 }
};
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 47f2a01224..66c8de8983 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -250,19 +250,36 @@ struct MasterJoinDecision
struct GNUNET_MessageHeader header;
/**
+ * #GNUNET_YES if the slave was admitted.
+ */
+ int32_t is_admitted;
+
+ /**
* Public key of the joining slave.
*/
struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
+ /* Followed by struct GNUNET_MessageHeader join_response */
+};
+
+
+struct SlaveJoinDecision
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION
+ */
+ struct GNUNET_MessageHeader header;
+
/**
* #GNUNET_YES if the slave was admitted.
*/
- uint8_t is_admitted;
+ int32_t is_admitted;
/* Followed by struct GNUNET_MessageHeader join_response */
};
+
GNUNET_NETWORK_STRUCT_END
#endif
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index ee49a584fe..7ec9d21b72 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -198,9 +198,9 @@ struct GNUNET_PSYC_Master
GNUNET_PSYC_MasterStartCallback start_cb;
/**
- * Join handler callback.
+ * Join request callback.
*/
- GNUNET_PSYC_JoinCallback join_cb;
+ GNUNET_PSYC_JoinRequestCallback join_req_cb;
};
@@ -211,14 +211,16 @@ struct GNUNET_PSYC_Slave
{
struct GNUNET_PSYC_Channel ch;
- GNUNET_PSYC_SlaveJoinCallback join_cb;
+ GNUNET_PSYC_SlaveConnectCallback connect_cb;
+
+ GNUNET_PSYC_JoinDecisionCallback join_dcsn_cb;
};
/**
* Handle that identifies a join request.
*
- * Used to match calls to #GNUNET_PSYC_JoinCallback to the
+ * Used to match calls to #GNUNET_PSYC_JoinRequestCallback to the
* corresponding calls to GNUNET_PSYC_join_decision().
*/
struct GNUNET_PSYC_JoinHandle
@@ -922,7 +924,22 @@ handle_psyc_join_request (struct GNUNET_PSYC_Master *mst,
jh->mst = mst;
jh->slave_key = req->slave_key;
- mst->join_cb (mst->ch.cb_cls, &req->slave_key, msg, jh);
+ if (NULL != mst->join_req_cb)
+ mst->join_req_cb (mst->ch.cb_cls, &req->slave_key, msg, jh);
+}
+
+
+static void
+handle_psyc_join_decision (struct GNUNET_PSYC_Slave *slv,
+ const struct SlaveJoinDecision *dcsn)
+{
+ struct GNUNET_PSYC_MessageHeader *msg = NULL;
+ if (ntohs (dcsn->header.size) <= sizeof (*dcsn) + sizeof (*msg))
+ msg = (struct GNUNET_PSYC_MessageHeader *) &dcsn[1];
+
+ struct GNUNET_PSYC_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+ if (NULL != slv->join_dcsn_cb)
+ slv->join_dcsn_cb (slv->ch.cb_cls, ntohl (dcsn->is_admitted), msg);
}
@@ -971,6 +988,9 @@ message_handler (void *cls,
case GNUNET_MESSAGE_TYPE_PSYC_JOIN_REQUEST:
size_min = sizeof (struct MasterJoinRequest);
break;
+ case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
+ size_min = sizeof (struct SlaveJoinDecision);
+ break;
default:
GNUNET_break_op (0);
return;
@@ -995,8 +1015,8 @@ message_handler (void *cls,
case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
{
struct CountersResult *cres = (struct CountersResult *) msg;
- if (NULL != slv->join_cb)
- slv->join_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
+ if (NULL != slv->connect_cb)
+ slv->connect_cb (ch->cb_cls, GNUNET_ntohll (cres->max_message_id));
break;
}
case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK:
@@ -1013,6 +1033,11 @@ message_handler (void *cls,
handle_psyc_join_request ((struct GNUNET_PSYC_Master *) ch,
(const struct MasterJoinRequest *) msg);
break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION:
+ handle_psyc_join_decision ((struct GNUNET_PSYC_Slave *) ch,
+ (const struct SlaveJoinDecision *) msg);
+ break;
}
if (NULL != ch->client)
@@ -1175,20 +1200,20 @@ disconnect (void *c)
* or part messages, the respective methods must call other PSYC functions to
* inform PSYC about the meaning of the respective events.
*
- * @param cfg Configuration to use (to connect to PSYC service).
- * @param channel_key ECC key that will be used to sign messages for this
+ * @param cfg Configuration to use (to connect to PSYC service).
+ * @param channel_key ECC key that will be used to sign messages for this
* PSYC session. The public key is used to identify the PSYC channel.
* Note that end-users will usually not use the private key directly, but
* rather look it up in GNS for places managed by other users, or select
* a file with the private key(s) when setting up their own channels
* FIXME: we'll likely want to use NOT the p521 curve here, but a cheaper
* one in the future.
- * @param policy Channel policy specifying join and history restrictions.
+ * @param policy Channel policy specifying join and history restrictions.
* Used to automate join decisions.
- * @param message_cb Function to invoke on message parts received from slaves.
- * @param join_cb Function to invoke when a peer wants to join.
- * @param master_started_cb Function to invoke after the channel master started.
- * @param cls Closure for @a master_started_cb and @a join_cb.
+ * @param message_cb Function to invoke on message parts received from slaves.
+ * @param join_request_cb Function to invoke when a slave wants to join.
+ * @param master_start_cb Function to invoke after the channel master started.
+ * @param cls Closure for @a method and @a join_cb.
*
* @return Handle for the channel master, NULL on error.
*/
@@ -1196,9 +1221,9 @@ struct GNUNET_PSYC_Master *
GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key,
enum GNUNET_PSYC_Policy policy,
+ GNUNET_PSYC_MasterStartCallback start_cb,
+ GNUNET_PSYC_JoinRequestCallback join_request_cb,
GNUNET_PSYC_MessageCallback message_cb,
- GNUNET_PSYC_JoinCallback join_cb,
- GNUNET_PSYC_MasterStartCallback master_started_cb,
void *cls)
{
struct GNUNET_PSYC_Master *mst = GNUNET_malloc (sizeof (*mst));
@@ -1210,8 +1235,8 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
req->channel_key = *channel_key;
req->policy = policy;
- mst->start_cb = master_started_cb;
- mst->join_cb = join_cb;
+ mst->start_cb = start_cb;
+ mst->join_req_cb = join_request_cb;
ch->message_cb = message_cb;
ch->cb_cls = cls;
ch->cfg = cfg;
@@ -1244,8 +1269,9 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
* #GNUNET_PSYC_JoinCallback.
*
* @param jh Join request handle.
- * @param is_admitted #GNUNET_YES if joining is approved,
- * #GNUNET_NO if it is disapproved.
+ * @param is_admitted #GNUNET_YES if the join is approved,
+ * #GNUNET_NO if it is disapproved,
+ * #GNUNET_SYSERR if we cannot answer the request.
* @param relay_count Number of relays given.
* @param relays Array of suggested peers that might be useful relays to use
* when joining the multicast group (essentially a list of peers that
@@ -1254,48 +1280,42 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *master)
* be the multicast origin) is a good candidate for building the
* multicast tree. Note that it is unnecessary to specify our own
* peer identity in this array.
- * @param method_name Method name for the message transmitted with the response.
- * @param env Environment containing transient variables for the message, or NULL.
- * @param data Data of the message.
- * @param data_size Size of @a data.
+ * @param join_resp Application-dependent join response message.
+ *
+ * @return #GNUNET_OK on success,
+ * #GNUNET_SYSERR if the message is too large.
*/
-void
+int
GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
int is_admitted,
uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
- const char *method_name,
- const struct GNUNET_ENV_Environment *env,
- const void *data,
- size_t data_size)
+ const struct GNUNET_PSYC_MessageHeader *join_resp)
{
struct GNUNET_PSYC_Channel *ch = &jh->mst->ch;
-
struct MasterJoinDecision *dcsn;
- struct GNUNET_PSYC_MessageHeader *pmsg = NULL;
- uint16_t pmsg_size = 0;
-/* FIXME:
- sizeof (*pmsg)
- + sizeof (struct GNUNET_PSYC_MessageMethod)
- + vars_size
- + sizeof (struct GNUNET_MessageHeader) + data_size
- + sizeof (struct GNUNET_MessageHeader);
-*/
+ uint16_t join_resp_size
+ = (NULL != join_resp) ? ntohs (join_resp->header.size) : 0;
uint16_t relay_size = relay_count * sizeof (*relays);
- struct MessageQueue *
- mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn) + relay_size + pmsg_size);
+
+ if (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
+ < sizeof (*dcsn) + relay_size + join_resp_size)
+ return GNUNET_SYSERR;
+
+ struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + sizeof (*dcsn)
+ + relay_size + join_resp_size);
dcsn = (struct MasterJoinDecision *) &mq[1];
- dcsn->header.size = htons (sizeof (*dcsn) + relay_size + pmsg_size);
+ dcsn->header.size = htons (sizeof (*dcsn) + relay_size + join_resp_size);
dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_JOIN_DECISION);
- dcsn->is_admitted = (GNUNET_YES == is_admitted) ? GNUNET_YES : GNUNET_NO;
+ dcsn->is_admitted = htonl (is_admitted);
dcsn->slave_key = jh->slave_key;
- /* FIXME: add message parts to pmsg */
- if (0 < pmsg_size)
- memcpy (&dcsn[1], pmsg, pmsg_size);
+ if (0 < join_resp_size)
+ memcpy (&dcsn[1], join_resp, join_resp_size);
GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, mq);
transmit_next (ch);
+ return GNUNET_OK;
}
@@ -1359,24 +1379,27 @@ GNUNET_PSYC_master_transmit_cancel (struct GNUNET_PSYC_MasterTransmitHandle *th)
* notification on failure (as the channel may simply take days to approve,
* and disapproval is simply being ignored).
*
- * @param cfg Configuration to use.
- * @param channel_key ECC public key that identifies the channel we wish to join.
- * @param slave_key ECC private-public key pair that identifies the slave, and
+ * @param cfg Configuration to use.
+ * @param channel_key ECC public key that identifies the channel we wish to join.
+ * @param slave_key ECC private-public key pair that identifies the slave, and
* used by multicast to sign the join request and subsequent unicast
* requests sent to the master.
- * @param origin Peer identity of the origin.
- * @param relay_count Number of peers in the @a relays array.
- * @param relays Peer identities of members of the multicast group, which serve
+ * @param origin Peer identity of the origin.
+ * @param relay_count Number of peers in the @a relays array.
+ * @param relays Peer identities of members of the multicast group, which serve
* as relays and used to join the group at.
- * @param message_cb Function to invoke on message parts received from the
+ * @param message_cb Function to invoke on message parts received from the
* channel, typically at least contains method handlers for @e join and
* @e part.
- * @param slave_joined_cb Function invoked once we have joined the channel.
- * @param cls Closure for @a message_cb and @a slave_joined_cb.
- * @param method_name Method name for the join request.
- * @param env Environment containing transient variables for the request, or NULL.
- * @param data Payload for the join message.
- * @param data_size Number of bytes in @a data.
+ * @param slave_connect_cb Function invoked once we have connected to the
+ * PSYC service.
+ * @param join_decision_cb Function invoked once we have received a join
+ * decision.
+ * @param cls Closure for @a message_cb and @a slave_joined_cb.
+ * @param method_name Method name for the join request.
+ * @param env Environment containing transient variables for the request, or NULL.
+ * @param data Payload for the join message.
+ * @param data_size Number of bytes in @a data.
*
* @return Handle for the slave, NULL on error.
*/
@@ -1388,7 +1411,8 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_MessageCallback message_cb,
- GNUNET_PSYC_SlaveJoinCallback slave_joined_cb,
+ GNUNET_PSYC_SlaveConnectCallback connect_cb,
+ GNUNET_PSYC_JoinDecisionCallback join_decision_cb,
void *cls,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
@@ -1408,7 +1432,8 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
req->relay_count = htonl (relay_count);
memcpy (&req[1], relays, relay_count * sizeof (*relays));
- slv->join_cb = slave_joined_cb;
+ slv->connect_cb = connect_cb;
+ slv->join_dcsn_cb = join_decision_cb;
ch->message_cb = message_cb;
ch->cb_cls = cls;
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index 4195e464bf..8b4aad773d 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -35,9 +35,9 @@
#include "gnunet_env_lib.h"
#include "gnunet_psyc_service.h"
-#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
+#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
-#define DEBUG_SERVICE 0
+#define DEBUG_SERVICE 1
/**
@@ -79,6 +79,7 @@ struct TransmitClosure
struct TransmitClosure *tmit;
+static int join_req_count;
enum
{
@@ -167,8 +168,8 @@ end ()
static void
-master_message (void *cls, uint64_t message_id, uint32_t flags,
- const struct GNUNET_MessageHeader *msg)
+master_message_cb (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
{
if (NULL == msg)
{
@@ -211,8 +212,8 @@ master_message (void *cls, uint64_t message_id, uint32_t flags,
static void
-slave_message (void *cls, uint64_t message_id, uint32_t flags,
- const struct GNUNET_MessageHeader *msg)
+slave_message_cb (void *cls, uint64_t message_id, uint32_t flags,
+ const struct GNUNET_MessageHeader *msg)
{
if (NULL == msg)
{
@@ -243,23 +244,6 @@ slave_message (void *cls, uint64_t message_id, uint32_t flags,
static void
-join_request (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
- const struct GNUNET_PSYC_MessageHeader *msg,
- struct GNUNET_PSYC_JoinHandle *jh)
-{
- struct GNUNET_HashCode slave_key_hash;
- GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got join request from %s.\n",
- GNUNET_h2s (&slave_key_hash));
-
- GNUNET_PSYC_join_decision (jh, GNUNET_YES, 0, NULL, "_notice_join", NULL,
- "you're in", 9);
- // FIXME: also test refusing entry
-}
-
-
-static void
transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmission resumed.\n");
@@ -392,9 +376,23 @@ tmit_notify_data (void *cls, uint16_t *data_size, void *data)
static void
-slave_joined (void *cls, uint64_t max_message_id)
+slave_join ();
+
+
+static void
+join_decision_cb (void *cls, int is_admitted,
+ const struct GNUNET_PSYC_MessageHeader *join_msg)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave joined: %lu\n", max_message_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Slave got join decision: %d\n", is_admitted);
+
+ if (GNUNET_YES != is_admitted)
+ { /* First join request is refused, retry. */
+ //GNUNET_assert (1 == join_req_count);
+ slave_join ();
+ return;
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Slave sending request to master.\n");
test = TEST_SLAVE_TRANSMIT;
@@ -414,20 +412,46 @@ slave_joined (void *cls, uint64_t max_message_id)
GNUNET_PSYC_SLAVE_TRANSMIT_NONE);
}
+
+static void
+join_request_cb (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *slave_key,
+ const struct GNUNET_PSYC_MessageHeader *msg,
+ struct GNUNET_PSYC_JoinHandle *jh)
+{
+ struct GNUNET_HashCode slave_key_hash;
+ GNUNET_CRYPTO_hash (slave_key, sizeof (*slave_key), &slave_key_hash);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Got join request from %s.\n",
+ GNUNET_h2s (&slave_key_hash));
+
+ /* Reject first request */
+ int is_admitted = (0 < join_req_count++) ? GNUNET_YES : GNUNET_NO;
+ GNUNET_PSYC_join_decision (jh, is_admitted, 0, NULL, NULL);
+}
+
+
+static void
+slave_connect_cb (void *cls, uint64_t max_message_id)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Slave connected: %lu\n", max_message_id);
+}
+
+
static void
slave_join ()
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Joining slave.\n");
- struct GNUNET_PeerIdentity origin;
- struct GNUNET_PeerIdentity relays[16];
+ struct GNUNET_PeerIdentity origin; // FIXME: this peer
struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
"_foo", "bar baz", 7);
GNUNET_ENV_environment_add (env, GNUNET_ENV_OP_ASSIGN,
"_foo_bar", "foo bar baz", 11);
slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
- 16, relays, &slave_message, &slave_joined, NULL,
+ 0, NULL, &slave_message_cb,
+ &slave_connect_cb, &join_decision_cb, NULL,
"_request_join", env, "some data", 9);
GNUNET_ENV_environment_destroy (env);
}
@@ -485,7 +509,7 @@ master_transmit ()
static void
-master_started (void *cls, uint64_t max_message_id)
+master_start_cb (void *cls, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Master started: %" PRIu64 "\n", max_message_id);
@@ -521,8 +545,8 @@ run (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Starting master.\n");
mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE,
- &master_message, &join_request,
- &master_started, NULL);
+ &master_start_cb, &join_request_cb,
+ &master_message_cb, NULL);
}