aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Makefile.am2
-rw-r--r--src/consensus/Makefile.am4
-rw-r--r--src/consensus/consensus_api.c6
-rw-r--r--src/consensus/consensus_protocol.h11
-rw-r--r--src/consensus/gnunet-consensus.c6
-rw-r--r--src/consensus/gnunet-service-consensus.c407
-rw-r--r--src/consensus/test_consensus.conf8
-rw-r--r--src/dv/gnunet-service-dv.c4
-rw-r--r--src/include/gnunet_consensus_service.h27
-rw-r--r--src/include/gnunet_mq_lib.h89
-rw-r--r--src/include/gnunet_protocols.h34
-rw-r--r--src/include/gnunet_set_service.h27
-rw-r--r--src/set/gnunet-service-set.c166
-rw-r--r--src/set/gnunet-service-set.h8
-rw-r--r--src/set/gnunet-service-set_union.c116
-rw-r--r--src/set/gnunet-set.c14
-rw-r--r--src/set/set.h62
-rw-r--r--src/set/set_api.c156
-rw-r--r--src/set/test_set.conf2
-rw-r--r--src/set/test_set_api.c23
-rw-r--r--src/stream/stream_api.c69
-rw-r--r--src/util/crypto_hash.c2
-rw-r--r--src/util/mq.c81
-rw-r--r--src/util/test_mq.c30
24 files changed, 773 insertions, 581 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 6819ec90b8..2f4397dbef 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -3,7 +3,7 @@
#endif
if HAVE_EXPERIMENTAL
- EXP_DIR = gns consensus dv set experimentation
+ EXP_DIR = gns set dv consensus experimentation
endif
if LINUX
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index a0edb1d656..914fbdef81 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -61,6 +61,8 @@ gnunet_service_consensus_LDADD = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
$(top_builddir)/src/set/libgnunetset.la \
$(GN_LIBINTL)
+gnunet_service_consensus_DEPENDENCIES = \
+ $(top_builddir)/src/set/libgnunetset.la
gnunet_service_evil_consensus_SOURCES = \
gnunet-service-consensus.c
@@ -71,6 +73,8 @@ gnunet_service_evil_consensus_LDADD = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
$(top_builddir)/src/set/libgnunetset.la \
$(GN_LIBINTL)
+gnunet_service_evil_consensus_DEPENDENCIES = \
+ $(top_builddir)/src/set/libgnunetset.la
gnunet_service_evil_consensus_CFLAGS = -DEVIL
libgnunetconsensus_la_SOURCES = \
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 635a610ca5..e3ddb4913d 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -236,9 +236,9 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus)
*/
static void
handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
- struct GNUNET_CONSENSUS_ElementMessage *msg)
+ struct GNUNET_CONSENSUS_ElementMessage *msg)
{
- struct GNUNET_CONSENSUS_Element element;
+ struct GNUNET_SET_Element element;
LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
@@ -424,7 +424,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
*/
void
GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
- const struct GNUNET_CONSENSUS_Element *element,
+ const struct GNUNET_SET_Element *element,
GNUNET_CONSENSUS_InsertDoneCallback idc,
void *idc_cls)
{
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index af4d74100e..4e0673c7d1 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -38,12 +38,15 @@ GNUNET_NETWORK_STRUCT_BEGIN
/**
* Sent as context message for set reconciliation.
*/
-struct ConsensusRoundMessage
+struct GNUNET_CONSENSUS_RoundContextMessage
{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
+ */
struct GNUNET_MessageHeader header;
- uint8_t round;
- uint8_t exp_round;
- uint8_t exp_subround;
+ uint32_t round;
+ uint32_t exp_round;
+ uint32_t exp_subround;
};
GNUNET_NETWORK_STRUCT_END
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index d8c1b14eee..60db8c61af 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -133,7 +133,7 @@ do_consensus ()
{
int j;
struct GNUNET_HashCode *val;
- struct GNUNET_CONSENSUS_Element *element;
+ struct GNUNET_SET_Element *element;
generate_indices(unique_indices);
val = GNUNET_malloc (sizeof *val);
@@ -151,6 +151,8 @@ do_consensus ()
}
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "all elements inserted, calling conclude\n");
+
for (i = 0; i < num_peers; i++)
GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]);
}
@@ -194,7 +196,7 @@ connect_complete (void *cls,
static void
new_element_cb (void *cls,
- const struct GNUNET_CONSENSUS_Element *element)
+ const struct GNUNET_SET_Element *element)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
}
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 44edeb2159..21123b24fb 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2012 Christian Grothoff (and other contributing authors)
+ (C) 2012, 2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -43,9 +43,9 @@
/**
- * Number of exponential rounds, used in the inventory and completion round.
+ * Number of exponential rounds, used in the exp and completion round.
*/
-#define NUM_EXP_ROUNDS (4)
+#define NUM_EXP_ROUNDS 4
/* forward declarations */
@@ -155,17 +155,17 @@ struct ConsensusSession
* Permutation of peers for the current round,
* maps logical index (for current round) to physical index (location in info array)
*/
- int *shuffle;
+ uint32_t *shuffle;
/**
* Current round of the exponential scheme.
*/
- int exp_round;
+ uint32_t exp_round;
/**
* Current sub-round of the exponential scheme.
*/
- int exp_subround;
+ uint32_t exp_subround;
/**
* The partner for the current exp-round
@@ -201,17 +201,6 @@ struct ConsensusPeerInformation
struct GNUNET_PeerIdentity peer_id;
/**
- * Do we connect to the peer, or does the peer connect to us?
- * Only valid for all-to-all phases
- */
- int is_outgoing;
-
- /**
- * Did we receive/send a consensus hello?
- */
- int hello;
-
- /**
* Back-reference to the consensus session,
* to that ConsensusPeerInformation can be used as a closure
*/
@@ -223,22 +212,14 @@ struct ConsensusPeerInformation
int exp_subround_finished;
/**
- * GNUNET_YES if we synced inventory with this peer;
- * GNUNET_NO otherwise.
- */
- int inventory_synced;
-
- /**
- * Round this peer seems to be in, according to the last SE we got.
- * Necessary to store this, as we sometimes need to respond to a request from an
- * older round, while we are already in the next round.
+ * Set operation we are currently executing with this peer.
*/
- enum ConsensusRound apparent_round;
+ struct GNUNET_SET_OperationHandle *set_op;
/**
- * Set operation we are currently executing with this peer.
+ * Has conclude been called on the set_op?
*/
- struct GNUNET_SET_OperationHandle *set_op;
+ int set_op_concluded;
};
@@ -268,9 +249,8 @@ static struct GNUNET_SERVER_Handle *srv;
static struct GNUNET_PeerIdentity my_peer;
-/*
static int
-exp_subround_finished (const struct ConsensusSession *session)
+have_exp_subround_finished (const struct ConsensusSession *session)
{
int not_finished;
not_finished = 0;
@@ -284,25 +264,9 @@ exp_subround_finished (const struct ConsensusSession *session)
return GNUNET_YES;
return GNUNET_NO;
}
-*/
-/*
-static int
-inventory_round_finished (struct ConsensusSession *session)
-{
- int i;
- int finished;
- finished = 0;
- for (i = 0; i < session->num_peers; i++)
- if (GNUNET_YES == session->info[i].inventory_synced)
- finished++;
- if (finished >= (session->num_peers / 2))
- return GNUNET_YES;
- return GNUNET_NO;
-}
-*/
/**
* Destroy a session, free all resources associated with it.
@@ -341,39 +305,6 @@ destroy_session (struct ConsensusSession *session)
/**
- * Start the inventory round, contact all peers we are supposed to contact.
- *
- * @param session the current session
- */
-static void
-start_inventory (struct ConsensusSession *session)
-{
- int i;
- int last;
-
- for (i = 0; i < session->num_peers; i++)
- session->info[i].is_outgoing = GNUNET_NO;
-
- last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
- i = (session->local_peer_idx + 1) % session->num_peers;
- while (i != last)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
- session->info[i].is_outgoing = GNUNET_YES;
- // embrace_peer (&session->info[i]);
- i = (i + 1) % session->num_peers;
- }
- // tie-breaker for even number of peers
- if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
- session->info[last].is_outgoing = GNUNET_YES;
- // embrace_peer (&session->info[last]);
- }
-}
-
-
-/**
* Start the next round.
* This function can be invoked as a timeout task, or called manually (tc will be NULL then).
*
@@ -407,27 +338,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
subround_over (session, NULL);
break;
case CONSENSUS_ROUND_EXCHANGE:
- /* handle two peers specially */
- if (session->num_peers <= 2)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
- //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
- //send_client_conclude_done (session);
- session->current_round = CONSENSUS_ROUND_FINISH;
- return;
- }
- session->current_round = CONSENSUS_ROUND_INVENTORY;
- start_inventory (session);
- break;
- case CONSENSUS_ROUND_INVENTORY:
- session->current_round = CONSENSUS_ROUND_COMPLETION;
- session->exp_round = 0;
- subround_over (session, NULL);
- break;
- case CONSENSUS_ROUND_COMPLETION:
+ /* FIXME: send all elements to client */
session->current_round = CONSENSUS_ROUND_FINISH;
- //send_client_conclude_done (session);
- break;
default:
GNUNET_assert (0);
}
@@ -435,68 +347,91 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
/**
- * Adapt the shuffle of the session for the current round.
+ * Create a new permutation for the session's peers in session->shuffle.
+ * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
+ * both the global session id and the current round index.
+ *
+ * @param session the session to create the new permutation for
*/
static void
shuffle (struct ConsensusSession *session)
{
- /* adapted from random_permute in util/crypto_random.c */
- /* FIXME
- unsigned int *ret;
- unsigned int i;
- unsigned int tmp;
- uint32_t x;
+ uint32_t i;
+ uint32_t randomness[session->num_peers-1];
+
+ if (NULL == session->shuffle)
+ session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
+
+ GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), &session->exp_round, sizeof (uint32_t),
+ &session->global_id, sizeof (struct GNUNET_HashCode));
+
+ for (i = 0; i < session->num_peers; i++)
+ session->shuffle[i] = i;
- GNUNET_assert (n > 0);
- ret = GNUNET_malloc (n * sizeof (unsigned int));
- for (i = 0; i < n; i++)
- ret[i] = i;
- for (i = n - 1; i > 0; i--)
+ for (i = session->num_peers - 1; i > 0; i--)
{
- x = GNUNET_CRYPTO_random_u32 (mode, i + 1);
- tmp = ret[x];
- ret[x] = ret[i];
- ret[i] = tmp;
+ uint32_t x;
+ uint32_t tmp;
+ x = randomness[i-1];
+ tmp = session->shuffle[x];
+ session->shuffle[x] = session->shuffle[i];
+ session->shuffle[i] = tmp;
}
- */
}
/**
* Find and set the partner_incoming and partner_outgoing of our peer,
- * one of them may not exist in most cases.
+ * one of them may not exist (and thus set to NULL) if the number of peers
+ * in the session is not a power of two.
*
* @param session the consensus session
*/
static void
find_partners (struct ConsensusSession *session)
{
- int mark[session->num_peers];
- int i;
-
- memset (mark, 0, session->num_peers * sizeof (int));
- session->partner_incoming = session->partner_outgoing = NULL;
- for (i = 0; i < session->num_peers; i++)
+ int arc;
+ int partner_idx;
+ int largest_arc;
+ int num_ghosts;
+
+ /* distance to neighboring peer in current subround */
+ arc = 1 << session->exp_subround;
+ partner_idx = (session->local_peer_idx + arc) % session->num_peers;
+ largest_arc = 1;
+ while (largest_arc < session->num_peers)
+ largest_arc <<= 1;
+ num_ghosts = largest_arc - session->num_peers;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts);
+
+ if (0 == (session->local_peer_idx & arc))
{
- int arc;
- if (0 != mark[i])
- continue;
- arc = (i + (1 << session->exp_subround)) % session->num_peers;
- mark[i] = mark[arc] = 1;
- GNUNET_assert (i != arc);
- if (i == session->local_peer_idx)
+ /* we are outgoing */
+ session->partner_outgoing = &session->info[session->shuffle[partner_idx]];
+ /* are we a 'ghost' of a peer that would exist if
+ * the number of peers was a power of two, and thus have to partner
+ * with an additional peer?
+ */
+ if (session->local_peer_idx < num_ghosts)
{
- GNUNET_assert (NULL == session->partner_outgoing);
- session->partner_outgoing = &session->info[session->shuffle[arc]];
- session->partner_outgoing->exp_subround_finished = GNUNET_NO;
+ int ghost_partner_idx;
+ ghost_partner_idx = (session->local_peer_idx - arc) % session->num_peers;
+ /* platform dependent; modulo sometimes returns negative values */
+ if (ghost_partner_idx < 0)
+ ghost_partner_idx += arc;
+ session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]];
}
- if (arc == session->local_peer_idx)
+ else
{
- GNUNET_assert (NULL == session->partner_incoming);
- session->partner_incoming = &session->info[session->shuffle[i]];
- session->partner_incoming->exp_subround_finished = GNUNET_NO;
+ session->partner_incoming = NULL;
}
}
+ else
+ {
+ session->partner_outgoing = NULL;
+ session->partner_incoming = &session->info[session->shuffle[partner_idx]];
+ }
}
@@ -508,11 +443,42 @@ find_partners (struct ConsensusSession *session)
* @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
* @param status see enum GNUNET_SET_Status
*/
-static void set_result_cb (void *cls,
- const struct GNUNET_SET_Element *element,
- enum GNUNET_SET_Status status)
+static void
+set_result_cb (void *cls,
+ const struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
{
- /* FIXME */
+ struct ConsensusPeerInformation *cpi = cls;
+
+ switch (status)
+ {
+ case GNUNET_SET_STATUS_OK:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n");
+ break;
+ case GNUNET_SET_STATUS_FAILURE:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n");
+ break;
+ case GNUNET_SET_STATUS_HALF_DONE:
+ case GNUNET_SET_STATUS_DONE:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n");
+ cpi->exp_subround_finished = GNUNET_YES;
+ if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
+ subround_over (cpi->session, NULL);
+ return;
+ default:
+ GNUNET_break (0);
+ return;
+ }
+
+ switch (cpi->session->current_round)
+ {
+ case CONSENSUS_ROUND_EXCHANGE:
+ GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
+ break;
+ default:
+ GNUNET_break (0);
+ return;
+ }
}
@@ -540,14 +506,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
}
- /* check if we are done with the log phase, 2-peer consensus only does one log round */
- if ( (session->exp_round == NUM_EXP_ROUNDS) ||
- ((session->num_peers == 2) && (session->exp_round == 1)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
- round_over (session, NULL);
- return;
- }
if (session->exp_round == 0)
{
/* initialize everything for the log-rounds */
@@ -575,18 +533,27 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
if (NULL != session->partner_outgoing)
{
+ struct GNUNET_CONSENSUS_RoundContextMessage *msg;
+ msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
+ msg->header.size = htons (sizeof *msg);
+ msg->round = htonl (session->current_round);
+ msg->exp_round = htonl (session->exp_round);
+ msg->exp_subround = htonl (session->exp_subround);
+
if (NULL != session->partner_outgoing->set_op)
+ {
GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
+ }
session->partner_outgoing->set_op =
- GNUNET_SET_evaluate (session->element_set,
- &session->partner_outgoing->peer_id,
+ GNUNET_SET_evaluate (&session->partner_outgoing->peer_id,
&session->global_id,
- NULL, /* FIXME */
+ (struct GNUNET_MessageHeader *) msg,
0, /* FIXME */
GNUNET_SET_RESULT_ADDED,
- set_result_cb, session);
-
-
+ set_result_cb, session->partner_outgoing);
+ GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set);
+ session->partner_outgoing->set_op_concluded = GNUNET_YES;
}
#ifdef GNUNET_EXTRA_LOGGING
@@ -642,6 +609,8 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod
int i;
struct GNUNET_HashCode tmp;
+ /* FIXME: use kdf? */
+
session->global_id = *session_id;
for (i = 0; i < session->num_peers; ++i)
{
@@ -727,9 +696,6 @@ initialize_session_peer_list (struct ConsensusSession *session,
}
-
-
-
/**
* Called when another peer wants to do a set operation with the
* local peer.
@@ -750,7 +716,67 @@ set_listen_cb (void *cls,
const struct GNUNET_MessageHeader *context_msg,
struct GNUNET_SET_Request *request)
{
- /* FIXME */
+ struct ConsensusSession *session = cls;
+ struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
+ struct ConsensusPeerInformation *cpi;
+ int index;
+
+ if (NULL == context_msg)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ index = get_peer_idx (other_peer, session);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s (&other_peer->hashPubKey));
+
+ if (index < 0)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ cpi = &session->info[index];
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", session->local_peer_idx, index);
+
+ switch (session->current_round)
+ {
+ case CONSENSUS_ROUND_EXCHANGE:
+ if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohl (msg->exp_round) < session->exp_round)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohl (msg->exp_subround) < session->exp_subround)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (NULL != cpi->set_op)
+ GNUNET_SET_operation_cancel (cpi->set_op);
+ cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+ set_result_cb, &session->info[index]);
+ if (ntohl (msg->exp_subround) == session->exp_subround)
+ {
+ cpi->set_op_concluded = GNUNET_YES;
+ GNUNET_SET_conclude (cpi->set_op, session->element_set);
+ }
+ else
+ {
+ cpi->set_op_concluded = GNUNET_NO;
+ }
+ break;
+ default:
+ GNUNET_break_op (0);
+ return;
+ }
}
@@ -769,7 +795,9 @@ initialize_session (struct ConsensusSession *session,
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
compute_global_id (session, &join_msg->session_id);
- /* check if some local client already owns the session. */
+ /* check if some local client already owns the session.
+ * it is only legal to have a session with an existing global id
+ * if all other sessions with this global id are finished.*/
other_session = sessions_head;
while (NULL != other_session)
{
@@ -789,6 +817,8 @@ initialize_session (struct ConsensusSession *session,
session->local_peer_idx = get_peer_idx (&my_peer, session);
GNUNET_assert (-1 != session->local_peer_idx);
+ session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ GNUNET_assert (NULL != session->element_set);
session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
&session->global_id,
set_listen_cb, session);
@@ -827,6 +857,8 @@ client_join (void *cls,
{
struct ConsensusSession *session;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n");
+
session = get_session_by_client (client);
if (NULL != session)
{
@@ -835,9 +867,13 @@ client_join (void *cls,
return;
}
session = GNUNET_new (struct ConsensusSession);
+ session->client = client;
GNUNET_SERVER_client_keep (client);
GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n");
}
@@ -858,12 +894,7 @@ client_insert (void *cls,
struct GNUNET_SET_Element *element;
ssize_t element_size;
- session = sessions_head;
- while (NULL != session)
- {
- if (session->client == client)
- break;
- }
+ session = get_session_by_client (client);
if (NULL == session)
{
@@ -886,6 +917,7 @@ client_insert (void *cls,
GNUNET_break (0);
return;
}
+
element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
element->type = msg->element_type;
element->size = element_size;
@@ -893,6 +925,8 @@ client_insert (void *cls,
element->data = &element[1];
GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx);
}
@@ -911,10 +945,10 @@ client_conclude (void *cls,
struct ConsensusSession *session;
struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
- cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
+ cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
session = get_session_by_client (client);
-
if (NULL == session)
{
/* client not found */
@@ -922,14 +956,12 @@ client_conclude (void *cls,
GNUNET_SERVER_client_disconnect (client);
return;
}
-
if (CONSENSUS_ROUND_BEGIN != session->current_round)
{
/* client requested conclude twice */
GNUNET_break (0);
return;
}
-
if (session->num_peers <= 1)
{
//send_client_conclude_done (session);
@@ -937,7 +969,7 @@ client_conclude (void *cls,
else
{
session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
- /* the 'begin' round is over, start with the next, real round */
+ /* the 'begin' round is over, start with the next, actual round */
round_over (session, NULL);
}
@@ -964,6 +996,29 @@ shutdown_task (void *cls,
/**
+ * Clean up after a client after it is
+ * disconnected (either by us or by itself)
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ */
+void
+handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+ struct ConsensusSession *session;
+
+ session = get_session_by_client (client);
+ if (NULL == session)
+ return;
+ if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
+ (CONSENSUS_ROUND_FINISH == session->current_round))
+ destroy_session (session);
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
+}
+
+
+/**
* Start processing consensus requests.
*
* @param cls closure
@@ -971,13 +1026,14 @@ shutdown_task (void *cls,
* @param c configuration to use
*/
static void
-run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
+run (void *cls, struct GNUNET_SERVER_Handle *server,
+ const struct GNUNET_CONFIGURATION_Handle *c)
{
static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
- {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
- {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
{&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
+ {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
+ {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
{NULL, NULL, 0, 0}
};
@@ -992,6 +1048,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
}
GNUNET_SERVER_add_handlers (server, server_handlers);
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
+ GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
}
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 437636c995..37facf84e2 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -5,7 +5,7 @@ HOSTNAME = localhost
HOME = $SERVICEHOME
BINARY = gnunet-service-consensus
#PREFIX = gdbserver :12345
-PREFIX = valgrind --leak-check=full
+PREFIX = valgrind
ACCEPT_FROM = 127.0.0.1;
ACCEPT_FROM6 = ::1;
UNIXPATH = /tmp/gnunet-service-consensus.sock
@@ -19,7 +19,11 @@ OPTIONS = -LERROR
[arm]
-DEFAULTSERVICES = core consensus
+DEFAULTSERVICES = core consensus set
+
+[set]
+OPTIONS = -L INFO
+PREFIX = valgrind
[testbed]
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index 3b0f0783ad..c6ba8eec3c 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -1404,7 +1404,6 @@ listen_set_union (void *cls,
neighbor->my_set = GNUNET_SET_create (cfg,
GNUNET_SET_OPERATION_UNION);
neighbor->set_op = GNUNET_SET_accept (request,
- neighbor->my_set /* FIXME: pass later! */,
GNUNET_SET_RESULT_ADDED,
&handle_set_union_result,
neighbor);
@@ -1428,8 +1427,7 @@ initiate_set_union (void *cls,
neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK;
neighbor->my_set = GNUNET_SET_create (cfg,
GNUNET_SET_OPERATION_UNION);
- neighbor->set_op = GNUNET_SET_evaluate (neighbor->my_set /* FIXME: pass later! */,
- &neighbor->peer,
+ neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer,
&neighbor->real_session_id,
NULL,
0 /* FIXME: salt */,
diff --git a/src/include/gnunet_consensus_service.h b/src/include/gnunet_consensus_service.h
index 66d48e0e20..db7509976e 100644
--- a/src/include/gnunet_consensus_service.h
+++ b/src/include/gnunet_consensus_service.h
@@ -39,28 +39,7 @@ extern "C"
#include "gnunet_common.h"
#include "gnunet_time_lib.h"
#include "gnunet_configuration_lib.h"
-
-
-/**
- * An element of the consensus set.
- */
-struct GNUNET_CONSENSUS_Element
-{
- /**
- * The actual data of the element.
- */
- const void *data;
-
- /**
- * Size of the element's data.
- */
- uint16_t size;
-
- /**
- * Application specific element type
- */
- uint16_t type;
-};
+#include "gnunet_set_service.h"
/**
@@ -73,7 +52,7 @@ struct GNUNET_CONSENSUS_Element
* @param element new element, NULL on error
*/
typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls,
- const struct GNUNET_CONSENSUS_Element *element);
+ const struct GNUNET_SET_Element *element);
@@ -138,7 +117,7 @@ typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls,
*/
void
GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
- const struct GNUNET_CONSENSUS_Element *element,
+ const struct GNUNET_SET_Element *element,
GNUNET_CONSENSUS_InsertDoneCallback idc,
void *idc_cls);
diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h
index 59b692cf02..54ea806a5a 100644
--- a/src/include/gnunet_mq_lib.h
+++ b/src/include/gnunet_mq_lib.h
@@ -53,36 +53,6 @@
*/
#define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type)
-/**
- * Append data to the end of an existing MQ message.
- * If the operation is successful, mqm is changed to point to the new MQ message,
- * and GNUNET_OK is returned.
- * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed,
- * the user of this API must take care of disposing the already allocated message
- * (either by sending it, or by using GNUNET_MQ_discard)
- *
- * @param mqm MQ message to augment with additional data
- * @param src source buffer for the additional data
- * @param len length of the additional data
- * @return GNUNET_SYSERR if nesting the message failed,
- * GNUNET_OK on success
- */
-#define GNUNET_MQ_nest(mqm, src, len) GNUNET_MQ_nest_ (&mqm, src, len)
-
-
-/**
- * Append a message to the end of an existing MQ message.
- * If the operation is successful, mqm is changed to point to the new MQ message,
- * and GNUNET_OK is returned.
- * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed,
- * the user of this API must take care of disposing the already allocated message
- * (either by sending it, or by using GNUNET_MQ_discard)
- *
- * @param mqm MQ message to augment with additional data
- * @param mh the message to append, must be of type 'struct GNUNET_MessageHeader *'
- */
-#define GNUNET_MQ_nest_mh(mqm, mh) ((NULL == mh) ? (GNUNET_OK) : GNUNET_MQ_nest((mqm), (mh), ntohs ((mh)->size)))
-
/**
* Allocate a GNUNET_MQ_Message, where the message only consists of a header.
@@ -105,6 +75,40 @@
/**
+ * Allocate a GNUNET_MQ_Message, and append a payload message after the given
+ * message struct.
+ *
+ * @param mvar pointer to a message struct, will be changed to point at the newly allocated message,
+ * whose size is 'sizeof(*mvar) + ntohs (mh->size)'
+ * @param type message type of the allocated message, has no effect on the nested message
+ * @param mh message to nest
+ * @return a newly allocated 'struct GNUNET_MQ_Message *'
+ */
+#define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh)
+
+
+/**
+ * Return a pointer to the message at the end of the given message.
+ *
+ * @param var pointer to a message struct, the type of the expression determines the base size,
+ * the space after the base size is the nested message
+ * @return a 'struct GNUNET_MessageHeader *' that points at the nested message of the given message,
+ * or NULL if the given message in 'var' does not have any space after the message struct
+ */
+#define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var)))
+
+
+struct GNUNET_MessageHeader *
+GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size);
+
+
+struct GNUNET_MQ_Message *
+GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
+ const struct GNUNET_MessageHeader *nested_mh);
+
+
+
+/**
* End-marker for the handlers array
*/
#define GNUNET_MQ_HANDLERS_END {NULL, 0, 0}
@@ -128,7 +132,8 @@ enum GNUNET_MQ_Error
* @param cls closure
* @param msg the received message
*/
-typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg);
+typedef void
+(*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg);
/**
@@ -151,10 +156,12 @@ typedef void
*
* @param cls closure
*/
-typedef void (*GNUNET_MQ_NotifyCallback) (void *cls);
+typedef void
+(*GNUNET_MQ_NotifyCallback) (void *cls);
-typedef void (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
+typedef void
+(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error);
struct GNUNET_MQ_Message
@@ -287,6 +294,7 @@ struct GNUNET_MQ_Handler
};
+
/**
* Create a new message for MQ.
*
@@ -300,21 +308,6 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
/**
- * Resize the the mq message pointed to by mqmp,
- * and append the given data to it.
- *
- * @param mqmp pointer to a mq message pointer
- * @param src source of the data to append
- * @param len length of the data to append
- * @return GNUNET_OK on success,
- * GNUNET_SYSERR on error (e.g. if len is too large)
- */
-int
-GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
- const void *src, uint16_t len);
-
-
-/**
* Discard the message queue message, free all
* allocated resources. Must be called in the event
* that a message is created but should not actually be sent.
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index 15bbae2e8c..85c643f7de 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -1754,11 +1754,18 @@ extern "C"
*/
#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548
+/**
+ * Abort a round, don't send requested elements anymore
+ */
+#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT 547
+
/*******************************************************************************
* SET message types
******************************************************************************/
+#define GNUNET_MESSAGE_TYPE_SET_REJECT 569
+
/**
* Cancel a set operation
*/
@@ -1800,44 +1807,49 @@ extern "C"
#define GNUNET_MESSAGE_TYPE_SET_EVALUATE 577
/**
- * Evaluate a set operation
+ * Start a set operation with the given set
+ */
+#define GNUNET_MESSAGE_TYPE_SET_CONCLUDE 578
+
+/**
+ * Notify the client of a request from a remote peer
*/
-#define GNUNET_MESSAGE_TYPE_SET_REQUEST 578
+#define GNUNET_MESSAGE_TYPE_SET_REQUEST 579
/**
- * Evaluate a set operation.
+ * Create a new local set
*/
-#define GNUNET_MESSAGE_TYPE_SET_CREATE 579
+#define GNUNET_MESSAGE_TYPE_SET_CREATE 580
/**
- * Evaluate a set operation.
+ * Request a set operation from a remote peer.
*/
-#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580
+#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 581
/**
* Strata estimator.
*/
-#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581
+#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 582
/**
* Invertible bloom filter.
*/
-#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582
+#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 583
/**
* Actual set elements.
*/
-#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583
+#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 584
/**
* Requests for the elements with the given hashes.
*/
-#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584
+#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 585
/**
* Operation is done.
*/
-#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585
+#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 586
diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h
index ce413c0de4..2684df00a6 100644
--- a/src/include/gnunet_set_service.h
+++ b/src/include/gnunet_set_service.h
@@ -257,11 +257,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set);
/**
- * Evaluate a set operation with our set and the set of another peer.
+ * Create a set operation for evaluation with another peer.
+ * The evaluation will not start until the client provides
+ * a local set with GNUNET_SET_conclude.
*
- * @param set set to use -- FIXME: remove
- * this argument, use GNUNET_SET_conclude instead!
- * @param salt salt for HKDF (explain more here)
* @param other_peer peer with the other set
* @param app_id hash for the application using the set
* @param context_msg additional information for the request
@@ -275,8 +274,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set);
* @return a handle to cancel the operation
*/
struct GNUNET_SET_OperationHandle *
-GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
- const struct GNUNET_PeerIdentity *other_peer,
+GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
const struct GNUNET_HashCode *app_id,
const struct GNUNET_MessageHeader *context_msg,
uint16_t salt,
@@ -315,13 +313,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh);
/**
- * Accept a request we got via GNUNET_SET_listen. Must be called
- * during GNUNET_SET_listen, as the 'struct GNUNET_SET_Request'
- * becomes invalid afterwards.
+ * Accept a request we got via GNUNET_SET_listen. Must be called during
+ * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
+ * afterwards.
+ * Call GNUNET_SET_conclude to provide the local set to use for the operation,
+ * and to begin the exchange with the remote peer.
*
* @param request request to accept
- * @param set set used for the requested operation -- FIXME: remove
- * this argument, use GNUNET_SET_conclude instead!
* @param result_mode specified how results will be returned,
* see 'GNUNET_SET_ResultMode'.
* @param result_cb callback for the results
@@ -330,7 +328,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh);
*/
struct GNUNET_SET_OperationHandle *
GNUNET_SET_accept (struct GNUNET_SET_Request *request,
- struct GNUNET_SET_Handle *set,
enum GNUNET_SET_ResultMode result_mode,
GNUNET_SET_ResultIterator result_cb,
void *cls);
@@ -353,9 +350,9 @@ GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
/**
- * Cancel the given set operation. FIXME: do clients have
- * to cancel the operatino if the GNUNET_SET_ResultIterator
- * has been called with timeout/error/done?
+ * Cancel the given set operation.
+ * May not be called after the operation's GNUNET_SET_ResultIterator has been
+ * called with a status that indicates error, timeout or done.
*
* @param oh set operation to cancel
*/
diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c
index 2aea503653..4da7188791 100644
--- a/src/set/gnunet-service-set.c
+++ b/src/set/gnunet-service-set.c
@@ -226,6 +226,23 @@ destroy_incoming (struct Incoming *incoming)
GNUNET_free (incoming);
}
+static struct Listener *
+get_listener_by_target (enum GNUNET_SET_OperationType op,
+ const struct GNUNET_HashCode *app_id)
+{
+ struct Listener *l;
+
+ for (l = listeners_head; NULL != l; l = l->next)
+ {
+ if (l->operation != op)
+ continue;
+ if (0 != GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id))
+ continue;
+ return l;
+ }
+ return NULL;
+}
+
/**
* Handle a request for a set operation from
@@ -240,62 +257,33 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh)
struct Incoming *incoming = cls;
const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh;
struct GNUNET_MQ_Message *mqm;
- struct RequestMessage *cmsg;
+ struct GNUNET_SET_RequestMessage *cmsg;
struct Listener *listener;
const struct GNUNET_MessageHeader *context_msg;
- if (ntohs (mh->size) < sizeof *msg)
+ context_msg = GNUNET_MQ_extract_nested_mh (msg);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
+ ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
+ listener = get_listener_by_target (ntohs (msg->operation), &msg->app_id);
+ if (NULL == listener)
{
- /* message is to small for its type */
- GNUNET_break (0);
- destroy_incoming (incoming);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "set operation request from peer failed: "
+ "no set with matching application ID and operation type\n");
return;
}
- else if (ntohs (mh->size) == sizeof *msg)
- {
- /* there is no context message */
- context_msg = NULL;
- }
- else
+ mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg);
+ if (NULL == mqm)
{
- context_msg = &msg[1].header;
- if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size))
- {
- /* size of context message is invalid */
- GNUNET_break (0);
- destroy_incoming (incoming);
- return;
- }
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n",
- ntohs (msg->operation), GNUNET_h2s (&msg->app_id));
-
- /* find the appropriate listener */
- for (listener = listeners_head;
- listener != NULL;
- listener = listener->next)
- {
- if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) ||
- (ntohs (msg->operation) != listener->operation) )
- continue;
- mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST);
- if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg))
- {
- /* FIXME: disconnect the peer */
- GNUNET_MQ_discard (mqm);
- GNUNET_break (0);
- return;
- }
- incoming->accept_id = accept_id++;
- cmsg->accept_id = htonl (incoming->accept_id);
- GNUNET_MQ_send (listener->client_mq, mqm);
+ /* FIXME: disconnect the peer */
+ GNUNET_break_op (0);
return;
}
-
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "set operation request from peer failed: "
- "no set with matching application ID and operation type\n");
+ incoming->accept_id = accept_id++;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id);
+ cmsg->accept_id = htonl (incoming->accept_id);
+ cmsg->peer_id = incoming->peer;
+ GNUNET_MQ_send (listener->client_mq, mqm);
}
@@ -311,7 +299,7 @@ handle_client_create (void *cls,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
- struct SetCreateMessage *msg = (struct SetCreateMessage *) m;
+ struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m;
struct Set *set;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n",
@@ -363,7 +351,7 @@ handle_client_listen (void *cls,
struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *m)
{
- struct ListenMessage *msg = (struct ListenMessage *) m;
+ struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m;
struct Listener *listener;
if (NULL != get_listener (client))
@@ -410,7 +398,7 @@ handle_client_remove (void *cls,
switch (set->operation)
{
case GNUNET_SET_OPERATION_UNION:
- _GSS_union_remove ((struct ElementMessage *) m, set);
+ _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set);
case GNUNET_SET_OPERATION_INTERSECTION:
/* FIXME: cfuchs */
break;
@@ -423,6 +411,38 @@ handle_client_remove (void *cls,
}
+
+/**
+ * Called when the client wants to reject an operation
+ * request from another peer.
+ *
+ * @param cls unused
+ * @param client client that sent the message
+ * @param m message sent by the client
+ */
+static void
+handle_client_reject (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *m)
+{
+ struct Incoming *incoming;
+ struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) m;
+
+ GNUNET_break (0 == ntohl (msg->request_id));
+
+ incoming = get_incoming (ntohl (msg->accept_reject_id));
+ if (NULL == incoming)
+ {
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
+ destroy_incoming (incoming);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+
/**
* Called when a client wants to add an element to a
* set it inhabits.
@@ -448,7 +468,7 @@ handle_client_add (void *cls,
switch (set->operation)
{
case GNUNET_SET_OPERATION_UNION:
- _GSS_union_add ((struct ElementMessage *) m, set);
+ _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set);
case GNUNET_SET_OPERATION_INTERSECTION:
/* FIXME: cfuchs */
break;
@@ -490,7 +510,7 @@ handle_client_evaluate (void *cls,
/* FIXME: cfuchs */
break;
case GNUNET_SET_OPERATION_UNION:
- _GSS_union_evaluate ((struct EvaluateMessage *) m, set);
+ _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set);
break;
default:
GNUNET_assert (0);
@@ -502,23 +522,6 @@ handle_client_evaluate (void *cls,
/**
- * Handle a cancel request from a client.
- *
- * @param cls unused
- * @param client the client
- * @param m the cancel message
- */
-static void
-handle_client_cancel (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *m)
-{
- /* FIXME: implement */
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
-}
-
-
-/**
* Handle an ack from a client.
*
* @param cls unused
@@ -550,25 +553,20 @@ handle_client_accept (void *cls,
{
struct Set *set;
struct Incoming *incoming;
- struct AcceptMessage *msg = (struct AcceptMessage *) mh;
+ struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh;
+ incoming = get_incoming (ntohl (msg->accept_reject_id));
- incoming = get_incoming (ntohl (msg->accept_id));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl (msg->accept_reject_id));
if (NULL == incoming)
{
+
GNUNET_break (0);
GNUNET_SERVER_client_disconnect (client);
return;
}
- if (0 == ntohl (msg->request_id))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n");
- destroy_incoming (incoming);
- GNUNET_SERVER_receive_done (client, GNUNET_OK);
- return;
- }
set = get_set (client);
@@ -687,14 +685,14 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
+ {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
+ {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
+ {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
{handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0},
+ {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
{handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0},
- {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0},
+ {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0},
{handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0},
- {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0},
- {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0},
- {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0},
- {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0},
{NULL, NULL, 0, 0}
};
@@ -705,6 +703,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET,
&stream_listen_cb, NULL,
GNUNET_STREAM_OPTION_END);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n");
}
diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h
index bea77416ed..15199eba49 100644
--- a/src/set/gnunet-service-set.h
+++ b/src/set/gnunet-service-set.h
@@ -217,7 +217,7 @@ _GSS_union_set_create (void);
* @parem set the set to evaluate the operation with
*/
void
-_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set);
+_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set);
/**
@@ -227,7 +227,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set);
* @param set set to add the element to
*/
void
-_GSS_union_add (struct ElementMessage *m, struct Set *set);
+_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set);
/**
@@ -238,7 +238,7 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set);
* @param set set to remove the element from
*/
void
-_GSS_union_remove (struct ElementMessage *m, struct Set *set);
+_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set);
/**
@@ -258,7 +258,7 @@ _GSS_union_set_destroy (struct Set *set);
* @param incoming information about the requesting remote peer
*/
void
-_GSS_union_accept (struct AcceptMessage *m, struct Set *set,
+_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
struct Incoming *incoming);
diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c
index c651a03818..6d9658ee58 100644
--- a/src/set/gnunet-service-set_union.c
+++ b/src/set/gnunet-service-set_union.c
@@ -245,8 +245,7 @@ struct ElementEntry
/**
- * Information about the element used for
- * a specific union operation.
+ * Entries in the key-to-element map of the union set.
*/
struct KeyEntry
{
@@ -401,11 +400,14 @@ destroy_key_to_element_iter (void *cls,
static void
destroy_union_operation (struct UnionEvaluateOperation *eo)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n");
+
if (NULL != eo->mq)
{
GNUNET_MQ_destroy (eo->mq);
eo->mq = NULL;
}
+
if (NULL != eo->socket)
{
GNUNET_STREAM_close (eo->socket);
@@ -433,12 +435,16 @@ destroy_union_operation (struct UnionEvaluateOperation *eo)
eo->key_to_element = NULL;
}
-
GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head,
eo->set->state.u->ops_tail,
eo);
GNUNET_free (eo);
- /* FIXME: free and destroy everything else */
+
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n");
+
+
+ /* FIXME: do a garbage collection of the set generations */
}
@@ -452,7 +458,7 @@ static void
fail_union_operation (struct UnionEvaluateOperation *eo)
{
struct GNUNET_MQ_Message *mqm;
- struct ResultMessage *msg;
+ struct GNUNET_SET_ResultMessage *msg;
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT);
msg->result_status = htons (GNUNET_SET_STATUS_FAILURE);
@@ -495,20 +501,25 @@ send_operation_request (struct UnionEvaluateOperation *eo)
struct GNUNET_MQ_Message *mqm;
struct OperationRequestMessage *msg;
- mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST);
- if (NULL != eo->context_msg)
- if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size)))
- {
- /* the context message is too large */
- GNUNET_break (0);
- GNUNET_SERVER_client_disconnect (eo->set->client);
- GNUNET_MQ_discard (mqm);
- return;
- }
+ mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg);
+
+ if (NULL == mqm)
+ {
+ /* the context message is too large */
+ GNUNET_break (0);
+ GNUNET_SERVER_client_disconnect (eo->set->client);
+ return;
+ }
msg->operation = htons (GNUNET_SET_OPERATION_UNION);
msg->app_id = eo->app_id;
GNUNET_MQ_send (eo->mq, mqm);
+ if (NULL != eo->context_msg)
+ {
+ GNUNET_free (eo->context_msg);
+ eo->context_msg = NULL;
+ }
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n");
}
@@ -537,7 +548,7 @@ insert_element_iterator (void *cls,
{
if (old_k->ibf_key.key_val == new_k->ibf_key.key_val)
{
- new_k->next_colliding = old_k;
+ new_k->next_colliding = old_k->next_colliding;
old_k->next_colliding = new_k;
return GNUNET_NO;
}
@@ -568,12 +579,11 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee)
ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element,
(uint32_t) ibf_key.key_val,
insert_element_iterator, k);
+
/* was the element inserted into a colliding bucket? */
if (GNUNET_SYSERR == ret)
- {
- GNUNET_assert (NULL != k->next_colliding);
return;
- }
+
GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
@@ -781,8 +791,8 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh)
*/
static int
send_element_iterator (void *cls,
- uint32_t key,
- void *value)
+ uint32_t key,
+ void *value)
{
struct SendElementClosure *sec = cls;
struct IBF_Key ibf_key = sec->ibf_key;
@@ -795,15 +805,18 @@ send_element_iterator (void *cls,
{
const struct GNUNET_SET_Element *const element = &ke->element->element;
struct GNUNET_MQ_Message *mqm;
+ struct GNUNET_MessageHeader *mh;
GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val);
- mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
- if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
+ mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS);
+ if (NULL == mqm)
{
+ /* element too large */
GNUNET_break (0);
- GNUNET_MQ_discard (mqm);
continue;
}
+ memcpy (&mh[1], element->data, element->size);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n");
GNUNET_MQ_send (eo->mq, mqm);
ke = ke->next_colliding;
}
@@ -975,34 +988,42 @@ send_client_element (struct UnionEvaluateOperation *eo,
struct GNUNET_SET_Element *element)
{
struct GNUNET_MQ_Message *mqm;
- struct ResultMessage *rm;
+ struct GNUNET_SET_ResultMessage *rm;
GNUNET_assert (0 != eo->request_id);
- mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
- rm->result_status = htons (GNUNET_SET_STATUS_OK);
- rm->request_id = htonl (eo->request_id);
- if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size))
+ mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT);
+ if (NULL == mqm)
{
GNUNET_MQ_discard (mqm);
GNUNET_break (0);
return;
}
-
+ rm->result_status = htons (GNUNET_SET_STATUS_OK);
+ rm->request_id = htonl (eo->request_id);
+ memcpy (&rm[1], element->data, element->size);
GNUNET_MQ_send (eo->set->client_mq, mqm);
}
/**
- * Callback used for notifications
+ * Completion callback for shutdown
*
- * @param cls closure
+ * @param cls the closure from GNUNET_STREAM_shutdown call
+ * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
+ * SHUT_RDWR)
*/
-static void
-client_done_sent_cb (void *cls)
+/*
+static void
+stream_shutdown_cb (void *cls,
+ int operation)
{
//struct UnionEvaluateOperation *eo = cls;
- /* FIXME: destroy eo */
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n");
+
+ // destroy_union_operation (eo);
}
+*/
/**
@@ -1018,16 +1039,15 @@ static void
send_client_done_and_destroy (struct UnionEvaluateOperation *eo)
{
struct GNUNET_MQ_Message *mqm;
- struct ResultMessage *rm;
+ struct GNUNET_SET_ResultMessage *rm;
GNUNET_assert (0 != eo->request_id);
mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT);
rm->request_id = htonl (eo->request_id);
rm->result_status = htons (GNUNET_SET_STATUS_DONE);
- GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo);
GNUNET_MQ_send (eo->set->client_mq, mqm);
- /* FIXME: destroy the eo */
+ // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo);
}
@@ -1199,18 +1219,25 @@ stream_open_cb (void *cls,
* @parem set the set to evaluate the operation with
*/
void
-_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
+_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set)
{
struct UnionEvaluateOperation *eo;
+ struct GNUNET_MessageHeader *context_msg;
eo = GNUNET_new (struct UnionEvaluateOperation);
- eo->peer = m->peer;
+ eo->peer = m->target_peer;
eo->set = set;
eo->request_id = htonl (m->request_id);
GNUNET_assert (0 != eo->request_id);
eo->se = strata_estimator_dup (set->state.u->se);
eo->salt = ntohs (m->salt);
eo->app_id = m->app_id;
+
+ context_msg = GNUNET_MQ_extract_nested_mh (m);
+ if (NULL != context_msg)
+ {
+ eo->context_msg = GNUNET_copy_message (context_msg);
+ }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"evaluating union operation, (app %s)\n",
@@ -1235,7 +1262,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set)
* @param incoming information about the requesting remote peer
*/
void
-_GSS_union_accept (struct AcceptMessage *m, struct Set *set,
+_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set,
struct Incoming *incoming)
{
struct UnionEvaluateOperation *eo;
@@ -1250,7 +1277,6 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set,
GNUNET_assert (0 != ntohl (m->request_id));
eo->request_id = ntohl (m->request_id);
eo->se = strata_estimator_dup (set->state.u->se);
- eo->set = set; // FIXME: redundant!?
eo->mq = incoming->mq;
/* transfer ownership of mq and socket from incoming to eo */
incoming->mq = NULL;
@@ -1299,7 +1325,7 @@ _GSS_union_set_create (void)
* @param set set to add the element to
*/
void
-_GSS_union_add (struct ElementMessage *m, struct Set *set)
+_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set)
{
struct ElementEntry *ee;
struct ElementEntry *ee_dup;
@@ -1357,7 +1383,9 @@ _GSS_union_set_destroy (struct Set *set)
destroy_elements (set->state.u);
while (NULL != set->state.u->ops_head)
+ {
destroy_union_operation (set->state.u->ops_head);
+ }
}
/**
@@ -1368,7 +1396,7 @@ _GSS_union_set_destroy (struct Set *set)
* @param set set to remove the element from
*/
void
-_GSS_union_remove (struct ElementMessage *m, struct Set *set)
+_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set)
{
struct GNUNET_HashCode hash;
struct ElementEntry *ee;
diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c
index 5f2d1c9761..ae84610fc5 100644
--- a/src/set/gnunet-set.c
+++ b/src/set/gnunet-set.c
@@ -91,11 +91,12 @@ listen_cb (void *cls,
const struct GNUNET_MessageHeader *context_msg,
struct GNUNET_SET_Request *request)
{
+ struct GNUNET_SET_OperationHandle *oh;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
GNUNET_SET_listen_cancel (listen_handle);
- GNUNET_SET_accept (request, set2,
- GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
+ oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
+ GNUNET_SET_conclude (oh, set2);
}
@@ -107,11 +108,14 @@ listen_cb (void *cls,
static void
start (void *cls)
{
+ struct GNUNET_SET_OperationHandle *oh;
+
listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
&app_id, listen_cb, NULL);
- GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
- GNUNET_SET_RESULT_ADDED,
- result_cb_set1, NULL);
+ oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
+ GNUNET_SET_RESULT_ADDED,
+ result_cb_set1, NULL);
+ GNUNET_SET_conclude (oh, set1);
}
diff --git a/src/set/set.h b/src/set/set.h
index ad2200de93..7ec3e6cb25 100644
--- a/src/set/set.h
+++ b/src/set/set.h
@@ -29,17 +29,12 @@
#include "platform.h"
#include "gnunet_common.h"
-
-/**
- * The service sends up to GNUNET_SET_ACK_WINDOW messages per client handle,
- * the client should send an ack every GNUNET_SET_ACK_WINDOW/2 messages.
- */
-#define GNUNET_SET_ACK_WINDOW 8
+#define GNUNET_SET_ACK_WINDOW 10
GNUNET_NETWORK_STRUCT_BEGIN
-struct SetCreateMessage
+struct GNUNET_SET_CreateMessage
{
/**
* Type: GNUNET_MESSAGE_TYPE_SET_CREATE
@@ -54,7 +49,7 @@ struct SetCreateMessage
};
-struct ListenMessage
+struct GNUNET_SET_ListenMessage
{
/**
* Type: GNUNET_MESSAGE_TYPE_SET_LISTEN
@@ -74,32 +69,31 @@ struct ListenMessage
};
-struct AcceptMessage
+struct GNUNET_SET_AcceptRejectMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT
+ * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT or
+ * GNUNET_MESSAGE_TYPE_SET_REJECT
*/
struct GNUNET_MessageHeader header;
/**
- * Request id that will be sent along with
- * results for the accepted operation.
- * Chosen by the client.
- * Must be 0 if the request has been rejected.
+ * ID of the incoming request we want to accept / reject.
*/
- uint32_t request_id GNUNET_PACKED;
+ uint32_t accept_reject_id GNUNET_PACKED;
/**
- * ID of the incoming request we want to accept / reject.
+ * Request ID to identify responses,
+ * must be 0 if we don't accept the request.
*/
- uint32_t accept_id GNUNET_PACKED;
+ uint32_t request_id GNUNET_PACKED;
};
/**
* A request for an operation with another client.
*/
-struct RequestMessage
+struct GNUNET_SET_RequestMessage
{
/**
* Type: GNUNET_MESSAGE_TYPE_SET_Request.
@@ -107,21 +101,21 @@ struct RequestMessage
struct GNUNET_MessageHeader header;
/**
- * ID of the request we want to accept,
- * chosen by the service.
+ * Identity of the requesting peer.
*/
- uint32_t accept_id GNUNET_PACKED;
+ struct GNUNET_PeerIdentity peer_id;
/**
- * Identity of the requesting peer.
+ * ID of the to identify the request when accepting or
+ * rejecting it.
*/
- struct GNUNET_PeerIdentity peer_id;
+ uint32_t accept_id GNUNET_PACKED;
/* rest: nested context message */
};
-struct EvaluateMessage
+struct GNUNET_SET_EvaluateMessage
{
/**
* Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE
@@ -136,7 +130,7 @@ struct EvaluateMessage
/**
* Peer to evaluate the operation with
*/
- struct GNUNET_PeerIdentity peer;
+ struct GNUNET_PeerIdentity target_peer;
/**
* Application id
@@ -157,7 +151,7 @@ struct EvaluateMessage
};
-struct ResultMessage
+struct GNUNET_SET_ResultMessage
{
/**
* Type: GNUNET_MESSAGE_TYPE_SET_RESULT
@@ -184,7 +178,7 @@ struct ResultMessage
};
-struct ElementMessage
+struct GNUNET_SET_ElementMessage
{
/**
* Type: GNUNET_MESSAGE_TYPE_SET_ADD or
@@ -200,20 +194,6 @@ struct ElementMessage
};
-struct CancelMessage
-{
- /**
- * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL
- */
- struct GNUNET_MessageHeader header;
-
- /**
- * id we want to cancel result belongs to
- */
- uint32_t request_id GNUNET_PACKED;
-};
-
-
GNUNET_NETWORK_STRUCT_END
#endif
diff --git a/src/set/set_api.c b/src/set/set_api.c
index 5838680b9e..c74933aa0b 100644
--- a/src/set/set_api.c
+++ b/src/set/set_api.c
@@ -33,6 +33,7 @@
#define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__)
+
/**
* Opaque handle to a set.
*/
@@ -52,13 +53,33 @@ struct GNUNET_SET_Request
int accepted;
};
-
struct GNUNET_SET_OperationHandle
{
GNUNET_SET_ResultIterator result_cb;
void *result_cls;
+
+ /**
+ * Local set used for the operation,
+ * NULL if no set has been provided by conclude yet.
+ */
struct GNUNET_SET_Handle *set;
+
+ /**
+ * Request ID to identify the operation within the set.
+ */
uint32_t request_id;
+
+ /**
+ * Message sent to the server on calling conclude,
+ * NULL if conclude has been called.
+ */
+ struct GNUNET_MQ_Message *conclude_mqm;
+
+ /**
+ * Address of the request if in the conclude message,
+ * used to patch the request id into the message when the set is known.
+ */
+ uint32_t *request_id_addr;
};
@@ -83,18 +104,21 @@ struct GNUNET_SET_ListenHandle
static void
handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct ResultMessage *msg = (struct ResultMessage *) mh;
+ struct GNUNET_SET_ResultMessage *msg = (struct GNUNET_SET_ResultMessage *) mh;
struct GNUNET_SET_Handle *set = cls;
struct GNUNET_SET_OperationHandle *oh;
struct GNUNET_SET_Element e;
+
+ GNUNET_assert (NULL != set);
+ GNUNET_assert (NULL != set->mq);
+
if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2)
{
struct GNUNET_MQ_Message *mqm;
mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK);
GNUNET_MQ_send (set->mq, mqm);
}
-
oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id));
GNUNET_assert (NULL != oh);
/* status is not STATUS_OK => there's no attached element,
@@ -109,7 +133,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
}
e.data = &msg[1];
- e.size = ntohs (mh->size) - sizeof (struct ResultMessage);
+ e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage);
e.type = msg->element_type;
if (NULL != oh->result_cb)
oh->result_cb (oh->result_cls, &e, htons (msg->result_status));
@@ -124,28 +148,34 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh)
static void
handle_request (void *cls, const struct GNUNET_MessageHeader *mh)
{
- struct RequestMessage *msg = (struct RequestMessage *) mh;
+ struct GNUNET_SET_RequestMessage *msg = (struct GNUNET_SET_RequestMessage *) mh;
struct GNUNET_SET_ListenHandle *lh = cls;
struct GNUNET_SET_Request *req;
+ struct GNUNET_MessageHeader *context_msg;
+ LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n");
req = GNUNET_new (struct GNUNET_SET_Request);
req->accept_id = ntohl (msg->accept_id);
+ context_msg = GNUNET_MQ_extract_nested_mh (msg);
/* calling GNUNET_SET_accept in the listen cb will set req->accepted */
- lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req);
+ lh->listen_cb (lh->listen_cls, &msg->peer_id, context_msg, req);
if (GNUNET_NO == req->accepted)
{
struct GNUNET_MQ_Message *mqm;
- struct AcceptMessage *amsg;
-
- mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+ struct GNUNET_SET_AcceptRejectMessage *amsg;
+
+ mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT);
/* no request id, as we refused */
amsg->request_id = htonl (0);
- amsg->accept_id = msg->accept_id;
+ amsg->accept_reject_id = msg->accept_id;
GNUNET_MQ_send (lh->mq, mqm);
GNUNET_free (req);
+ LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n");
}
+ LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n");
+
/* the accept-case is handled in GNUNET_SET_accept,
* as we have the accept message available there */
}
@@ -168,7 +198,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
{
struct GNUNET_SET_Handle *set;
struct GNUNET_MQ_Message *mqm;
- struct SetCreateMessage *msg;
+ struct GNUNET_SET_CreateMessage *msg;
static const struct GNUNET_MQ_Handler mq_handlers[] = {
{handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT},
GNUNET_MQ_HANDLERS_END
@@ -179,6 +209,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n");
GNUNET_assert (NULL != set->client);
set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set);
+ GNUNET_assert (NULL != set->mq);
mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE);
msg->operation = htons (op);
GNUNET_MQ_send (set->mq, mqm);
@@ -204,7 +235,7 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set,
void *cont_cls)
{
struct GNUNET_MQ_Message *mqm;
- struct ElementMessage *msg;
+ struct GNUNET_SET_ElementMessage *msg;
mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD);
msg->element_type = element->type;
@@ -232,7 +263,7 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set,
void *cont_cls)
{
struct GNUNET_MQ_Message *mqm;
- struct ElementMessage *msg;
+ struct GNUNET_SET_ElementMessage *msg;
mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE);
msg->element_type = element->type;
@@ -256,10 +287,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
/**
- * Evaluate a set operation with our set and the set of another peer.
+ * Create a set operation for evaluation with another peer.
+ * The evaluation will not start until the client provides
+ * a local set with GNUNET_SET_conclude.
*
- * @param set set to use
- * @param salt salt for HKDF (explain more here)
* @param other_peer peer with the other set
* @param app_id hash for the application using the set
* @param context_msg additional information for the request
@@ -273,8 +304,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set)
* @return a handle to cancel the operation
*/
struct GNUNET_SET_OperationHandle *
-GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
- const struct GNUNET_PeerIdentity *other_peer,
+GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer,
const struct GNUNET_HashCode *app_id,
const struct GNUNET_MessageHeader *context_msg,
uint16_t salt,
@@ -283,24 +313,24 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set,
void *result_cls)
{
struct GNUNET_MQ_Message *mqm;
- struct EvaluateMessage *msg;
struct GNUNET_SET_OperationHandle *oh;
+ struct GNUNET_SET_EvaluateMessage *msg;
oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
oh->result_cb = result_cb;
oh->result_cls = result_cls;
- oh->set = set;
- mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE);
- msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh));
- msg->peer = *other_peer;
- msg->app_id = *app_id;
-
+ mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg);
+
if (NULL != context_msg)
- if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size)))
- GNUNET_assert (0);
-
- GNUNET_MQ_send (set->mq, mqm);
+ LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n");
+
+ msg->app_id = *app_id;
+ msg->target_peer = *other_peer;
+ msg->salt = salt;
+ msg->reserved = 0;
+ oh->conclude_mqm = mqm;
+ oh->request_id_addr = &msg->request_id;
return oh;
}
@@ -327,7 +357,7 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
{
struct GNUNET_SET_ListenHandle *lh;
struct GNUNET_MQ_Message *mqm;
- struct ListenMessage *msg;
+ struct GNUNET_SET_ListenMessage *msg;
static const struct GNUNET_MQ_Handler mq_handlers[] = {
{handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST},
GNUNET_MQ_HANDLERS_END
@@ -363,10 +393,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
/**
- * Accept a request we got via GNUNET_SET_listen.
+ * Accept a request we got via GNUNET_SET_listen. Must be called during
+ * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid
+ * afterwards.
+ * Call GNUNET_SET_conclude to provide the local set to use for the operation,
+ * and to begin the exchange with the remote peer.
*
* @param request request to accept
- * @param set set used for the requested operation
* @param result_mode specified how results will be returned,
* see 'GNUNET_SET_ResultMode'.
* @param result_cb callback for the results
@@ -375,28 +408,26 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh)
*/
struct GNUNET_SET_OperationHandle *
GNUNET_SET_accept (struct GNUNET_SET_Request *request,
- struct GNUNET_SET_Handle *set,
enum GNUNET_SET_ResultMode result_mode,
GNUNET_SET_ResultIterator result_cb,
- void *result_cls)
+ void *cls)
{
struct GNUNET_MQ_Message *mqm;
- struct AcceptMessage *msg;
struct GNUNET_SET_OperationHandle *oh;
+ struct GNUNET_SET_AcceptRejectMessage *msg;
- /* don't accept a request twice! */
GNUNET_assert (GNUNET_NO == request->accepted);
request->accepted = GNUNET_YES;
oh = GNUNET_new (struct GNUNET_SET_OperationHandle);
oh->result_cb = result_cb;
- oh->result_cls = result_cls;
- oh->set = set;
+ oh->result_cls = cls;
- mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT);
- msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh));
- msg->accept_id = htonl (request->accept_id);
- GNUNET_MQ_send (set->mq, mqm);
+ mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT);
+ msg->accept_reject_id = htonl (request->accept_id);
+
+ oh->conclude_mqm = mqm;
+ oh->request_id_addr = &msg->request_id;
return oh;
}
@@ -413,10 +444,43 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh)
struct GNUNET_MQ_Message *mqm;
struct GNUNET_SET_OperationHandle *h_assoc;
- h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
- GNUNET_assert (h_assoc == oh);
- mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
- GNUNET_MQ_send (oh->set->mq, mqm);
+ if (NULL != oh->set)
+ {
+ h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id);
+ GNUNET_assert (h_assoc == oh);
+ mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL);
+ GNUNET_MQ_send (oh->set->mq, mqm);
+ }
+
+ if (NULL != oh->conclude_mqm)
+ GNUNET_MQ_discard (oh->conclude_mqm);
+
GNUNET_free (oh);
}
+
+/**
+ * Conclude the given set operation using the given set.
+ * This function is called once we have fully constructed
+ * the set that we want to use for the operation. At this
+ * time, the P2P protocol can then begin to exchange the
+ * set information and call the result callback with the
+ * result information.
+ *
+ * @param oh handle to the set operation
+ * @param set the set to use for the operation
+ */
+void
+GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh,
+ struct GNUNET_SET_Handle *set)
+{
+ GNUNET_assert (NULL == oh->set);
+ GNUNET_assert (NULL != oh->conclude_mqm);
+ oh->set = set;
+ oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh);
+ *oh->request_id_addr = htonl (oh->request_id);
+ GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm);
+ oh->conclude_mqm = NULL;
+ oh->request_id_addr = NULL;
+}
+
diff --git a/src/set/test_set.conf b/src/set/test_set.conf
index 34b7a8d2fd..7bc26ed7ed 100644
--- a/src/set/test_set.conf
+++ b/src/set/test_set.conf
@@ -8,8 +8,8 @@ PORT = 2106
HOSTNAME = localhost
HOME = $SERVICEHOME
BINARY = gnunet-service-set
-#PREFIX = gdbserver :12345
#PREFIX = valgrind --leak-check=full
+#PREFIX = gdbserver :1234
ACCEPT_FROM = 127.0.0.1;
ACCEPT_FROM6 = ::1;
UNIXPATH = /tmp/gnunet-service-set.sock
diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c
index bf0d656972..f773cebdf9 100644
--- a/src/set/test_set_api.c
+++ b/src/set/test_set_api.c
@@ -20,7 +20,7 @@
/**
* @file set/test_set_api.c
- * @brief testcase for consensus_api.c
+ * @brief testcase for set_api.c
*/
#include "platform.h"
#include "gnunet_util_lib.h"
@@ -89,11 +89,13 @@ listen_cb (void *cls,
const struct GNUNET_MessageHeader *context_msg,
struct GNUNET_SET_Request *request)
{
+ struct GNUNET_SET_OperationHandle *oh;
+
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n");
GNUNET_SET_listen_cancel (listen_handle);
- GNUNET_SET_accept (request, set2,
- GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
+ oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL);
+ GNUNET_SET_conclude (oh, set2);
}
@@ -105,11 +107,14 @@ listen_cb (void *cls,
static void
start (void *cls)
{
+ struct GNUNET_SET_OperationHandle *oh;
+
listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION,
&app_id, listen_cb, NULL);
- GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42,
- GNUNET_SET_RESULT_ADDED,
- result_cb_set1, NULL);
+ oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42,
+ GNUNET_SET_RESULT_ADDED,
+ result_cb_set1, NULL);
+ GNUNET_SET_conclude (oh, set1);
}
@@ -168,12 +173,14 @@ run (void *cls,
struct GNUNET_TESTING_Peer *peer)
{
- static const char* app_str = "gnunet-set";
-
config = cfg;
+ GNUNET_CRYPTO_get_host_identity (cfg, &local_id);
+ printf ("my id (from CRYPTO): %s\n", GNUNET_h2s (&local_id.hashPubKey));
GNUNET_TESTING_peer_get_identity (peer, &local_id);
+ printf ("my id (from TESTING): %s\n", GNUNET_h2s (&local_id.hashPubKey));
set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id);
init_set1 ();
}
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index b4a47b53d4..34f1ea0fae 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -3785,7 +3785,29 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size
struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state;
struct GNUNET_MQ_Message *mqm;
- GNUNET_assert (GNUNET_STREAM_OK == status);
+ switch (status)
+ {
+ case GNUNET_STREAM_OK:
+ break;
+ case GNUNET_STREAM_SHUTDOWN:
+ /* FIXME: call shutdown handler */
+ return;
+ case GNUNET_STREAM_TIMEOUT:
+ if (NULL == mq->error_handler)
+ LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n");
+ else
+ mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
+ return;
+ case GNUNET_STREAM_SYSERR:
+ if (NULL == mq->error_handler)
+ LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n");
+ else
+ mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE);
+ return;
+ default:
+ GNUNET_assert (0);
+ return;
+ }
/* call cb for message we finished sending */
mqm = mq->current_msg;
@@ -3863,21 +3885,53 @@ mq_stream_mst_callback (void *cls, void *client,
*/
static size_t
mq_stream_data_processor (void *cls,
- enum GNUNET_STREAM_Status status,
- const void *data,
- size_t size)
+ enum GNUNET_STREAM_Status status,
+ const void *data,
+ size_t size)
{
struct GNUNET_MQ_MessageQueue *mq = cls;
struct MQStreamState *mss;
int ret;
+
+ switch (status)
+ {
+ case GNUNET_STREAM_OK:
+ break;
+ case GNUNET_STREAM_SHUTDOWN:
+ /* FIXME: call shutdown handler */
+ return 0;
+ case GNUNET_STREAM_TIMEOUT:
+ if (NULL == mq->error_handler)
+ LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n");
+ else
+ mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT);
+ return 0;
+ case GNUNET_STREAM_SYSERR:
+ if (NULL == mq->error_handler)
+ LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n");
+ else
+ mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+ return 0;
+ default:
+ GNUNET_assert (0);
+ return 0;
+ }
mss = (struct MQStreamState *) mq->impl_state;
GNUNET_assert (GNUNET_STREAM_OK == status);
ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO);
- GNUNET_assert (GNUNET_OK == ret);
+ if (GNUNET_OK != ret)
+ {
+ if (NULL == mq->error_handler)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "read error (message stream malformed), but no error handler installed for message queue\n");
+ else
+ mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+ return 0;
+ }
/* we always read all data */
- mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
- mq_stream_data_processor, mq);
+ mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL,
+ mq_stream_data_processor, mq);
return size;
}
@@ -3935,6 +3989,7 @@ GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket,
mq->destroy_impl = mq_stream_destroy_impl;
mq->handlers = msg_handlers;
mq->handlers_cls = cls;
+ mq->error_handler = error_handler;
if (NULL != msg_handlers)
{
mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq);
diff --git a/src/util/crypto_hash.c b/src/util/crypto_hash.c
index fca66aed4e..2e436c4549 100644
--- a/src/util/crypto_hash.c
+++ b/src/util/crypto_hash.c
@@ -339,7 +339,7 @@ GNUNET_CRYPTO_hash_distance_u32 (const struct GNUNET_HashCode * a,
*/
void
GNUNET_CRYPTO_hash_create_random (enum GNUNET_CRYPTO_Quality mode,
- struct GNUNET_HashCode * result)
+ struct GNUNET_HashCode *result)
{
int i;
diff --git a/src/util/mq.c b/src/util/mq.c
index 36cacd30b0..dc87b97114 100644
--- a/src/util/mq.c
+++ b/src/util/mq.c
@@ -119,33 +119,31 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type)
}
-int
-GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp,
- const void *data, uint16_t len)
+struct GNUNET_MQ_Message *
+GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type,
+ const struct GNUNET_MessageHeader *nested_mh)
{
- size_t new_size;
- size_t old_size;
-
- GNUNET_assert (NULL != mqmp);
- /* there's no data to append => do nothing */
- if (NULL == data)
- return GNUNET_OK;
- old_size = ntohs ((*mqmp)->mh->size);
- /* message too large to concatenate? */
- if (((uint16_t) (old_size + len)) < len)
- return GNUNET_SYSERR;
- new_size = old_size + len;
- *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size);
- (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1];
- memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size);
- (*mqmp)->mh->size = htons (new_size);
- return GNUNET_OK;
-}
+ struct GNUNET_MQ_Message *mqm;
+ uint16_t size;
+
+ if (NULL == nested_mh)
+ return GNUNET_MQ_msg_ (mhp, base_size, type);
+ size = base_size + ntohs (nested_mh->size);
+ /* check for uint16_t overflow */
+ if (size < base_size)
+ return NULL;
+
+ mqm = GNUNET_MQ_msg_ (mhp, size, type);
+ memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size));
+
+ return mqm;
+}
-/*** Transmit a queued message to the session's client.
+/**
+ * Transmit a queued message to the session's client.
*
* @param cls consensus session
* @param size number of bytes available in buf
@@ -265,7 +263,8 @@ handle_client_message (void *cls,
{
if (NULL == mq->error_handler)
LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n");
- mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
+ else
+ mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ);
return;
}
@@ -479,3 +478,39 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq)
GNUNET_free (mq);
}
+
+
+
+struct GNUNET_MessageHeader *
+GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size)
+{
+ uint16_t whole_size;
+ uint16_t nested_size;
+ struct GNUNET_MessageHeader *nested_msg;
+
+ whole_size = ntohs (mh->size);
+ GNUNET_assert (whole_size >= base_size);
+
+ nested_size = whole_size - base_size;
+
+ if (0 == nested_size)
+ return NULL;
+
+ if (nested_size < sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
+ return NULL;
+ }
+
+ nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size);
+
+ if (ntohs (nested_msg->size) != nested_size)
+ {
+ GNUNET_break_op (0);
+ nested_msg->size = htons (nested_size);
+ }
+
+ return nested_msg;
+}
+
+
diff --git a/src/util/test_mq.c b/src/util/test_mq.c
index 161b40a207..55cd80ef14 100644
--- a/src/util/test_mq.c
+++ b/src/util/test_mq.c
@@ -58,35 +58,6 @@ void
test2 (void)
{
struct GNUNET_MQ_Message *mqm;
- struct MyMessage *mm;
- int res;
- char *s = "foo";
-
- mqm = GNUNET_MQ_msg (mm, 42);
- res = GNUNET_MQ_nest (mqm, s, strlen(s));
- GNUNET_assert (GNUNET_OK == res);
- res = GNUNET_MQ_nest (mqm, s, strlen(s));
- GNUNET_assert (GNUNET_OK == res);
- res = GNUNET_MQ_nest (mqm, NULL, 0);
- GNUNET_assert (GNUNET_OK == res);
-
- GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs (mm->header.size));
-
- res = GNUNET_MQ_nest_mh (mqm, &mm->header);
- GNUNET_assert (GNUNET_OK == res);
- GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs (mm->header.size));
-
- res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0);
- GNUNET_assert (GNUNET_OK != res);
-
- GNUNET_MQ_discard (mqm);
-}
-
-
-void
-test3 (void)
-{
- struct GNUNET_MQ_Message *mqm;
struct GNUNET_MessageHeader *mh;
mqm = GNUNET_MQ_msg_header (42);
@@ -107,7 +78,6 @@ main (int argc, char **argv)
GNUNET_log_setup ("test-mq", "INFO", NULL);
test1 ();
test2 ();
- test3 ();
return 0;
}