aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-06-03 10:53:49 +0000
commit68403fa780bf94ace2ebc13c2c09463cbbc0b57c (patch)
tree3442e4f25de90eab67c4f9813cb6e433c50b7482 /src/consensus
parentfae7f583f2e11cac15fefcbefef64287ab6915d3 (diff)
- conclude for SET
- consensus with SET
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/Makefile.am4
-rw-r--r--src/consensus/consensus_api.c6
-rw-r--r--src/consensus/consensus_protocol.h11
-rw-r--r--src/consensus/gnunet-consensus.c6
-rw-r--r--src/consensus/gnunet-service-consensus.c407
-rw-r--r--src/consensus/test_consensus.conf8
6 files changed, 256 insertions, 186 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index a0edb1d656..914fbdef81 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -61,6 +61,8 @@ gnunet_service_consensus_LDADD = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
$(top_builddir)/src/set/libgnunetset.la \
$(GN_LIBINTL)
+gnunet_service_consensus_DEPENDENCIES = \
+ $(top_builddir)/src/set/libgnunetset.la
gnunet_service_evil_consensus_SOURCES = \
gnunet-service-consensus.c
@@ -71,6 +73,8 @@ gnunet_service_evil_consensus_LDADD = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
$(top_builddir)/src/set/libgnunetset.la \
$(GN_LIBINTL)
+gnunet_service_evil_consensus_DEPENDENCIES = \
+ $(top_builddir)/src/set/libgnunetset.la
gnunet_service_evil_consensus_CFLAGS = -DEVIL
libgnunetconsensus_la_SOURCES = \
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 635a610ca5..e3ddb4913d 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -236,9 +236,9 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus)
*/
static void
handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
- struct GNUNET_CONSENSUS_ElementMessage *msg)
+ struct GNUNET_CONSENSUS_ElementMessage *msg)
{
- struct GNUNET_CONSENSUS_Element element;
+ struct GNUNET_SET_Element element;
LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
@@ -424,7 +424,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg,
*/
void
GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus,
- const struct GNUNET_CONSENSUS_Element *element,
+ const struct GNUNET_SET_Element *element,
GNUNET_CONSENSUS_InsertDoneCallback idc,
void *idc_cls)
{
diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h
index af4d74100e..4e0673c7d1 100644
--- a/src/consensus/consensus_protocol.h
+++ b/src/consensus/consensus_protocol.h
@@ -38,12 +38,15 @@ GNUNET_NETWORK_STRUCT_BEGIN
/**
* Sent as context message for set reconciliation.
*/
-struct ConsensusRoundMessage
+struct GNUNET_CONSENSUS_RoundContextMessage
{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT
+ */
struct GNUNET_MessageHeader header;
- uint8_t round;
- uint8_t exp_round;
- uint8_t exp_subround;
+ uint32_t round;
+ uint32_t exp_round;
+ uint32_t exp_subround;
};
GNUNET_NETWORK_STRUCT_END
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index d8c1b14eee..60db8c61af 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -133,7 +133,7 @@ do_consensus ()
{
int j;
struct GNUNET_HashCode *val;
- struct GNUNET_CONSENSUS_Element *element;
+ struct GNUNET_SET_Element *element;
generate_indices(unique_indices);
val = GNUNET_malloc (sizeof *val);
@@ -151,6 +151,8 @@ do_consensus ()
}
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "all elements inserted, calling conclude\n");
+
for (i = 0; i < num_peers; i++)
GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]);
}
@@ -194,7 +196,7 @@ connect_complete (void *cls,
static void
new_element_cb (void *cls,
- const struct GNUNET_CONSENSUS_Element *element)
+ const struct GNUNET_SET_Element *element)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
}
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index 44edeb2159..21123b24fb 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2012 Christian Grothoff (and other contributing authors)
+ (C) 2012, 2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -43,9 +43,9 @@
/**
- * Number of exponential rounds, used in the inventory and completion round.
+ * Number of exponential rounds, used in the exp and completion round.
*/
-#define NUM_EXP_ROUNDS (4)
+#define NUM_EXP_ROUNDS 4
/* forward declarations */
@@ -155,17 +155,17 @@ struct ConsensusSession
* Permutation of peers for the current round,
* maps logical index (for current round) to physical index (location in info array)
*/
- int *shuffle;
+ uint32_t *shuffle;
/**
* Current round of the exponential scheme.
*/
- int exp_round;
+ uint32_t exp_round;
/**
* Current sub-round of the exponential scheme.
*/
- int exp_subround;
+ uint32_t exp_subround;
/**
* The partner for the current exp-round
@@ -201,17 +201,6 @@ struct ConsensusPeerInformation
struct GNUNET_PeerIdentity peer_id;
/**
- * Do we connect to the peer, or does the peer connect to us?
- * Only valid for all-to-all phases
- */
- int is_outgoing;
-
- /**
- * Did we receive/send a consensus hello?
- */
- int hello;
-
- /**
* Back-reference to the consensus session,
* to that ConsensusPeerInformation can be used as a closure
*/
@@ -223,22 +212,14 @@ struct ConsensusPeerInformation
int exp_subround_finished;
/**
- * GNUNET_YES if we synced inventory with this peer;
- * GNUNET_NO otherwise.
- */
- int inventory_synced;
-
- /**
- * Round this peer seems to be in, according to the last SE we got.
- * Necessary to store this, as we sometimes need to respond to a request from an
- * older round, while we are already in the next round.
+ * Set operation we are currently executing with this peer.
*/
- enum ConsensusRound apparent_round;
+ struct GNUNET_SET_OperationHandle *set_op;
/**
- * Set operation we are currently executing with this peer.
+ * Has conclude been called on the set_op?
*/
- struct GNUNET_SET_OperationHandle *set_op;
+ int set_op_concluded;
};
@@ -268,9 +249,8 @@ static struct GNUNET_SERVER_Handle *srv;
static struct GNUNET_PeerIdentity my_peer;
-/*
static int
-exp_subround_finished (const struct ConsensusSession *session)
+have_exp_subround_finished (const struct ConsensusSession *session)
{
int not_finished;
not_finished = 0;
@@ -284,25 +264,9 @@ exp_subround_finished (const struct ConsensusSession *session)
return GNUNET_YES;
return GNUNET_NO;
}
-*/
-/*
-static int
-inventory_round_finished (struct ConsensusSession *session)
-{
- int i;
- int finished;
- finished = 0;
- for (i = 0; i < session->num_peers; i++)
- if (GNUNET_YES == session->info[i].inventory_synced)
- finished++;
- if (finished >= (session->num_peers / 2))
- return GNUNET_YES;
- return GNUNET_NO;
-}
-*/
/**
* Destroy a session, free all resources associated with it.
@@ -341,39 +305,6 @@ destroy_session (struct ConsensusSession *session)
/**
- * Start the inventory round, contact all peers we are supposed to contact.
- *
- * @param session the current session
- */
-static void
-start_inventory (struct ConsensusSession *session)
-{
- int i;
- int last;
-
- for (i = 0; i < session->num_peers; i++)
- session->info[i].is_outgoing = GNUNET_NO;
-
- last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers;
- i = (session->local_peer_idx + 1) % session->num_peers;
- while (i != last)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i);
- session->info[i].is_outgoing = GNUNET_YES;
- // embrace_peer (&session->info[i]);
- i = (i + 1) % session->num_peers;
- }
- // tie-breaker for even number of peers
- if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i);
- session->info[last].is_outgoing = GNUNET_YES;
- // embrace_peer (&session->info[last]);
- }
-}
-
-
-/**
* Start the next round.
* This function can be invoked as a timeout task, or called manually (tc will be NULL then).
*
@@ -407,27 +338,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
subround_over (session, NULL);
break;
case CONSENSUS_ROUND_EXCHANGE:
- /* handle two peers specially */
- if (session->num_peers <= 2)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx);
- //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session);
- //send_client_conclude_done (session);
- session->current_round = CONSENSUS_ROUND_FINISH;
- return;
- }
- session->current_round = CONSENSUS_ROUND_INVENTORY;
- start_inventory (session);
- break;
- case CONSENSUS_ROUND_INVENTORY:
- session->current_round = CONSENSUS_ROUND_COMPLETION;
- session->exp_round = 0;
- subround_over (session, NULL);
- break;
- case CONSENSUS_ROUND_COMPLETION:
+ /* FIXME: send all elements to client */
session->current_round = CONSENSUS_ROUND_FINISH;
- //send_client_conclude_done (session);
- break;
default:
GNUNET_assert (0);
}
@@ -435,68 +347,91 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
/**
- * Adapt the shuffle of the session for the current round.
+ * Create a new permutation for the session's peers in session->shuffle.
+ * Uses a Fisher-Yates shuffle with pseudo-randomness coming from
+ * both the global session id and the current round index.
+ *
+ * @param session the session to create the new permutation for
*/
static void
shuffle (struct ConsensusSession *session)
{
- /* adapted from random_permute in util/crypto_random.c */
- /* FIXME
- unsigned int *ret;
- unsigned int i;
- unsigned int tmp;
- uint32_t x;
+ uint32_t i;
+ uint32_t randomness[session->num_peers-1];
+
+ if (NULL == session->shuffle)
+ session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle));
+
+ GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), &session->exp_round, sizeof (uint32_t),
+ &session->global_id, sizeof (struct GNUNET_HashCode));
+
+ for (i = 0; i < session->num_peers; i++)
+ session->shuffle[i] = i;
- GNUNET_assert (n > 0);
- ret = GNUNET_malloc (n * sizeof (unsigned int));
- for (i = 0; i < n; i++)
- ret[i] = i;
- for (i = n - 1; i > 0; i--)
+ for (i = session->num_peers - 1; i > 0; i--)
{
- x = GNUNET_CRYPTO_random_u32 (mode, i + 1);
- tmp = ret[x];
- ret[x] = ret[i];
- ret[i] = tmp;
+ uint32_t x;
+ uint32_t tmp;
+ x = randomness[i-1];
+ tmp = session->shuffle[x];
+ session->shuffle[x] = session->shuffle[i];
+ session->shuffle[i] = tmp;
}
- */
}
/**
* Find and set the partner_incoming and partner_outgoing of our peer,
- * one of them may not exist in most cases.
+ * one of them may not exist (and thus set to NULL) if the number of peers
+ * in the session is not a power of two.
*
* @param session the consensus session
*/
static void
find_partners (struct ConsensusSession *session)
{
- int mark[session->num_peers];
- int i;
-
- memset (mark, 0, session->num_peers * sizeof (int));
- session->partner_incoming = session->partner_outgoing = NULL;
- for (i = 0; i < session->num_peers; i++)
+ int arc;
+ int partner_idx;
+ int largest_arc;
+ int num_ghosts;
+
+ /* distance to neighboring peer in current subround */
+ arc = 1 << session->exp_subround;
+ partner_idx = (session->local_peer_idx + arc) % session->num_peers;
+ largest_arc = 1;
+ while (largest_arc < session->num_peers)
+ largest_arc <<= 1;
+ num_ghosts = largest_arc - session->num_peers;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts);
+
+ if (0 == (session->local_peer_idx & arc))
{
- int arc;
- if (0 != mark[i])
- continue;
- arc = (i + (1 << session->exp_subround)) % session->num_peers;
- mark[i] = mark[arc] = 1;
- GNUNET_assert (i != arc);
- if (i == session->local_peer_idx)
+ /* we are outgoing */
+ session->partner_outgoing = &session->info[session->shuffle[partner_idx]];
+ /* are we a 'ghost' of a peer that would exist if
+ * the number of peers was a power of two, and thus have to partner
+ * with an additional peer?
+ */
+ if (session->local_peer_idx < num_ghosts)
{
- GNUNET_assert (NULL == session->partner_outgoing);
- session->partner_outgoing = &session->info[session->shuffle[arc]];
- session->partner_outgoing->exp_subround_finished = GNUNET_NO;
+ int ghost_partner_idx;
+ ghost_partner_idx = (session->local_peer_idx - arc) % session->num_peers;
+ /* platform dependent; modulo sometimes returns negative values */
+ if (ghost_partner_idx < 0)
+ ghost_partner_idx += arc;
+ session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]];
}
- if (arc == session->local_peer_idx)
+ else
{
- GNUNET_assert (NULL == session->partner_incoming);
- session->partner_incoming = &session->info[session->shuffle[i]];
- session->partner_incoming->exp_subround_finished = GNUNET_NO;
+ session->partner_incoming = NULL;
}
}
+ else
+ {
+ session->partner_outgoing = NULL;
+ session->partner_incoming = &session->info[session->shuffle[partner_idx]];
+ }
}
@@ -508,11 +443,42 @@ find_partners (struct ConsensusSession *session)
* @param element a result element, only valid if status is GNUNET_SET_STATUS_OK
* @param status see enum GNUNET_SET_Status
*/
-static void set_result_cb (void *cls,
- const struct GNUNET_SET_Element *element,
- enum GNUNET_SET_Status status)
+static void
+set_result_cb (void *cls,
+ const struct GNUNET_SET_Element *element,
+ enum GNUNET_SET_Status status)
{
- /* FIXME */
+ struct ConsensusPeerInformation *cpi = cls;
+
+ switch (status)
+ {
+ case GNUNET_SET_STATUS_OK:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n");
+ break;
+ case GNUNET_SET_STATUS_FAILURE:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n");
+ break;
+ case GNUNET_SET_STATUS_HALF_DONE:
+ case GNUNET_SET_STATUS_DONE:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n");
+ cpi->exp_subround_finished = GNUNET_YES;
+ if (have_exp_subround_finished (cpi->session) == GNUNET_YES)
+ subround_over (cpi->session, NULL);
+ return;
+ default:
+ GNUNET_break (0);
+ return;
+ }
+
+ switch (cpi->session->current_round)
+ {
+ case CONSENSUS_ROUND_EXCHANGE:
+ GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL);
+ break;
+ default:
+ GNUNET_break (0);
+ return;
+ }
}
@@ -540,14 +506,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_SCHEDULER_cancel (session->round_timeout_tid);
session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK;
}
- /* check if we are done with the log phase, 2-peer consensus only does one log round */
- if ( (session->exp_round == NUM_EXP_ROUNDS) ||
- ((session->num_peers == 2) && (session->exp_round == 1)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx);
- round_over (session, NULL);
- return;
- }
if (session->exp_round == 0)
{
/* initialize everything for the log-rounds */
@@ -575,18 +533,27 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
if (NULL != session->partner_outgoing)
{
+ struct GNUNET_CONSENSUS_RoundContextMessage *msg;
+ msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT);
+ msg->header.size = htons (sizeof *msg);
+ msg->round = htonl (session->current_round);
+ msg->exp_round = htonl (session->exp_round);
+ msg->exp_subround = htonl (session->exp_subround);
+
if (NULL != session->partner_outgoing->set_op)
+ {
GNUNET_SET_operation_cancel (session->partner_outgoing->set_op);
+ }
session->partner_outgoing->set_op =
- GNUNET_SET_evaluate (session->element_set,
- &session->partner_outgoing->peer_id,
+ GNUNET_SET_evaluate (&session->partner_outgoing->peer_id,
&session->global_id,
- NULL, /* FIXME */
+ (struct GNUNET_MessageHeader *) msg,
0, /* FIXME */
GNUNET_SET_RESULT_ADDED,
- set_result_cb, session);
-
-
+ set_result_cb, session->partner_outgoing);
+ GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set);
+ session->partner_outgoing->set_op_concluded = GNUNET_YES;
}
#ifdef GNUNET_EXTRA_LOGGING
@@ -642,6 +609,8 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod
int i;
struct GNUNET_HashCode tmp;
+ /* FIXME: use kdf? */
+
session->global_id = *session_id;
for (i = 0; i < session->num_peers; ++i)
{
@@ -727,9 +696,6 @@ initialize_session_peer_list (struct ConsensusSession *session,
}
-
-
-
/**
* Called when another peer wants to do a set operation with the
* local peer.
@@ -750,7 +716,67 @@ set_listen_cb (void *cls,
const struct GNUNET_MessageHeader *context_msg,
struct GNUNET_SET_Request *request)
{
- /* FIXME */
+ struct ConsensusSession *session = cls;
+ struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg;
+ struct ConsensusPeerInformation *cpi;
+ int index;
+
+ if (NULL == context_msg)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ index = get_peer_idx (other_peer, session);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s (&other_peer->hashPubKey));
+
+ if (index < 0)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+
+ cpi = &session->info[index];
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", session->local_peer_idx, index);
+
+ switch (session->current_round)
+ {
+ case CONSENSUS_ROUND_EXCHANGE:
+ if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohl (msg->exp_round) < session->exp_round)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (ntohl (msg->exp_subround) < session->exp_subround)
+ {
+ GNUNET_break_op (0);
+ return;
+ }
+ if (NULL != cpi->set_op)
+ GNUNET_SET_operation_cancel (cpi->set_op);
+ cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED,
+ set_result_cb, &session->info[index]);
+ if (ntohl (msg->exp_subround) == session->exp_subround)
+ {
+ cpi->set_op_concluded = GNUNET_YES;
+ GNUNET_SET_conclude (cpi->set_op, session->element_set);
+ }
+ else
+ {
+ cpi->set_op_concluded = GNUNET_NO;
+ }
+ break;
+ default:
+ GNUNET_break_op (0);
+ return;
+ }
}
@@ -769,7 +795,9 @@ initialize_session (struct ConsensusSession *session,
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers);
compute_global_id (session, &join_msg->session_id);
- /* check if some local client already owns the session. */
+ /* check if some local client already owns the session.
+ * it is only legal to have a session with an existing global id
+ * if all other sessions with this global id are finished.*/
other_session = sessions_head;
while (NULL != other_session)
{
@@ -789,6 +817,8 @@ initialize_session (struct ConsensusSession *session,
session->local_peer_idx = get_peer_idx (&my_peer, session);
GNUNET_assert (-1 != session->local_peer_idx);
+ session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION);
+ GNUNET_assert (NULL != session->element_set);
session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION,
&session->global_id,
set_listen_cb, session);
@@ -827,6 +857,8 @@ client_join (void *cls,
{
struct ConsensusSession *session;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n");
+
session = get_session_by_client (client);
if (NULL != session)
{
@@ -835,9 +867,13 @@ client_join (void *cls,
return;
}
session = GNUNET_new (struct ConsensusSession);
+ session->client = client;
GNUNET_SERVER_client_keep (client);
GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session);
initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n");
}
@@ -858,12 +894,7 @@ client_insert (void *cls,
struct GNUNET_SET_Element *element;
ssize_t element_size;
- session = sessions_head;
- while (NULL != session)
- {
- if (session->client == client)
- break;
- }
+ session = get_session_by_client (client);
if (NULL == session)
{
@@ -886,6 +917,7 @@ client_insert (void *cls,
GNUNET_break (0);
return;
}
+
element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size);
element->type = msg->element_type;
element->size = element_size;
@@ -893,6 +925,8 @@ client_insert (void *cls,
element->data = &element[1];
GNUNET_SET_add_element (session->element_set, element, NULL, NULL);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx);
}
@@ -911,10 +945,10 @@ client_conclude (void *cls,
struct ConsensusSession *session;
struct GNUNET_CONSENSUS_ConcludeMessage *cmsg;
- cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n");
+ cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message;
session = get_session_by_client (client);
-
if (NULL == session)
{
/* client not found */
@@ -922,14 +956,12 @@ client_conclude (void *cls,
GNUNET_SERVER_client_disconnect (client);
return;
}
-
if (CONSENSUS_ROUND_BEGIN != session->current_round)
{
/* client requested conclude twice */
GNUNET_break (0);
return;
}
-
if (session->num_peers <= 1)
{
//send_client_conclude_done (session);
@@ -937,7 +969,7 @@ client_conclude (void *cls,
else
{
session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout);
- /* the 'begin' round is over, start with the next, real round */
+ /* the 'begin' round is over, start with the next, actual round */
round_over (session, NULL);
}
@@ -964,6 +996,29 @@ shutdown_task (void *cls,
/**
+ * Clean up after a client after it is
+ * disconnected (either by us or by itself)
+ *
+ * @param cls closure, unused
+ * @param client the client to clean up after
+ */
+void
+handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
+{
+ struct ConsensusSession *session;
+
+ session = get_session_by_client (client);
+ if (NULL == session)
+ return;
+ if ((CONSENSUS_ROUND_BEGIN == session->current_round) ||
+ (CONSENSUS_ROUND_FINISH == session->current_round))
+ destroy_session (session);
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n");
+}
+
+
+/**
* Start processing consensus requests.
*
* @param cls closure
@@ -971,13 +1026,14 @@ shutdown_task (void *cls,
* @param c configuration to use
*/
static void
-run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c)
+run (void *cls, struct GNUNET_SERVER_Handle *server,
+ const struct GNUNET_CONFIGURATION_Handle *c)
{
static const struct GNUNET_SERVER_MessageHandler server_handlers[] = {
- {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
- {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
{&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE,
sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)},
+ {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0},
+ {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0},
{NULL, NULL, 0, 0}
};
@@ -992,6 +1048,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU
}
GNUNET_SERVER_add_handlers (server, server_handlers);
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL);
+ GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n");
}
diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf
index 437636c995..37facf84e2 100644
--- a/src/consensus/test_consensus.conf
+++ b/src/consensus/test_consensus.conf
@@ -5,7 +5,7 @@ HOSTNAME = localhost
HOME = $SERVICEHOME
BINARY = gnunet-service-consensus
#PREFIX = gdbserver :12345
-PREFIX = valgrind --leak-check=full
+PREFIX = valgrind
ACCEPT_FROM = 127.0.0.1;
ACCEPT_FROM6 = ::1;
UNIXPATH = /tmp/gnunet-service-consensus.sock
@@ -19,7 +19,11 @@ OPTIONS = -LERROR
[arm]
-DEFAULTSERVICES = core consensus
+DEFAULTSERVICES = core consensus set
+
+[set]
+OPTIONS = -L INFO
+PREFIX = valgrind
[testbed]