diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-06-03 10:53:49 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-06-03 10:53:49 +0000 |
commit | 68403fa780bf94ace2ebc13c2c09463cbbc0b57c (patch) | |
tree | 3442e4f25de90eab67c4f9813cb6e433c50b7482 /src | |
parent | fae7f583f2e11cac15fefcbefef64287ab6915d3 (diff) |
- conclude for SET
- consensus with SET
Diffstat (limited to 'src')
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/consensus/Makefile.am | 4 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 6 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 11 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 6 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 407 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 8 | ||||
-rw-r--r-- | src/dv/gnunet-service-dv.c | 4 | ||||
-rw-r--r-- | src/include/gnunet_consensus_service.h | 27 | ||||
-rw-r--r-- | src/include/gnunet_mq_lib.h | 89 | ||||
-rw-r--r-- | src/include/gnunet_protocols.h | 34 | ||||
-rw-r--r-- | src/include/gnunet_set_service.h | 27 | ||||
-rw-r--r-- | src/set/gnunet-service-set.c | 166 | ||||
-rw-r--r-- | src/set/gnunet-service-set.h | 8 | ||||
-rw-r--r-- | src/set/gnunet-service-set_union.c | 116 | ||||
-rw-r--r-- | src/set/gnunet-set.c | 14 | ||||
-rw-r--r-- | src/set/set.h | 62 | ||||
-rw-r--r-- | src/set/set_api.c | 156 | ||||
-rw-r--r-- | src/set/test_set.conf | 2 | ||||
-rw-r--r-- | src/set/test_set_api.c | 23 | ||||
-rw-r--r-- | src/stream/stream_api.c | 69 | ||||
-rw-r--r-- | src/util/crypto_hash.c | 2 | ||||
-rw-r--r-- | src/util/mq.c | 81 | ||||
-rw-r--r-- | src/util/test_mq.c | 30 |
24 files changed, 773 insertions, 581 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 6819ec90b8..2f4397dbef 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -3,7 +3,7 @@ #endif if HAVE_EXPERIMENTAL - EXP_DIR = gns consensus dv set experimentation + EXP_DIR = gns set dv consensus experimentation endif if LINUX diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index a0edb1d656..914fbdef81 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am @@ -61,6 +61,8 @@ gnunet_service_consensus_LDADD = \ $(top_builddir)/src/mesh/libgnunetmesh.la \ $(top_builddir)/src/set/libgnunetset.la \ $(GN_LIBINTL) +gnunet_service_consensus_DEPENDENCIES = \ + $(top_builddir)/src/set/libgnunetset.la gnunet_service_evil_consensus_SOURCES = \ gnunet-service-consensus.c @@ -71,6 +73,8 @@ gnunet_service_evil_consensus_LDADD = \ $(top_builddir)/src/mesh/libgnunetmesh.la \ $(top_builddir)/src/set/libgnunetset.la \ $(GN_LIBINTL) +gnunet_service_evil_consensus_DEPENDENCIES = \ + $(top_builddir)/src/set/libgnunetset.la gnunet_service_evil_consensus_CFLAGS = -DEVIL libgnunetconsensus_la_SOURCES = \ diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 635a610ca5..e3ddb4913d 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -236,9 +236,9 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) */ static void handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, - struct GNUNET_CONSENSUS_ElementMessage *msg) + struct GNUNET_CONSENSUS_ElementMessage *msg) { - struct GNUNET_CONSENSUS_Element element; + struct GNUNET_SET_Element element; LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); @@ -424,7 +424,7 @@ GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, */ void GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, - const struct GNUNET_CONSENSUS_Element *element, + const struct GNUNET_SET_Element *element, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls) { diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index af4d74100e..4e0673c7d1 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h @@ -38,12 +38,15 @@ GNUNET_NETWORK_STRUCT_BEGIN /** * Sent as context message for set reconciliation. */ -struct ConsensusRoundMessage +struct GNUNET_CONSENSUS_RoundContextMessage { + /** + * Type: GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT + */ struct GNUNET_MessageHeader header; - uint8_t round; - uint8_t exp_round; - uint8_t exp_subround; + uint32_t round; + uint32_t exp_round; + uint32_t exp_subround; }; GNUNET_NETWORK_STRUCT_END diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index d8c1b14eee..60db8c61af 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c @@ -133,7 +133,7 @@ do_consensus () { int j; struct GNUNET_HashCode *val; - struct GNUNET_CONSENSUS_Element *element; + struct GNUNET_SET_Element *element; generate_indices(unique_indices); val = GNUNET_malloc (sizeof *val); @@ -151,6 +151,8 @@ do_consensus () } } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "all elements inserted, calling conclude\n"); + for (i = 0; i < num_peers; i++) GNUNET_CONSENSUS_conclude (consensus_handles[i], conclude_timeout, conclude_cb, consensus_handles[i]); } @@ -194,7 +196,7 @@ connect_complete (void *cls, static void new_element_cb (void *cls, - const struct GNUNET_CONSENSUS_Element *element) + const struct GNUNET_SET_Element *element) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); } diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index 44edeb2159..21123b24fb 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2012 Christian Grothoff (and other contributing authors) + (C) 2012, 2013 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -43,9 +43,9 @@ /** - * Number of exponential rounds, used in the inventory and completion round. + * Number of exponential rounds, used in the exp and completion round. */ -#define NUM_EXP_ROUNDS (4) +#define NUM_EXP_ROUNDS 4 /* forward declarations */ @@ -155,17 +155,17 @@ struct ConsensusSession * Permutation of peers for the current round, * maps logical index (for current round) to physical index (location in info array) */ - int *shuffle; + uint32_t *shuffle; /** * Current round of the exponential scheme. */ - int exp_round; + uint32_t exp_round; /** * Current sub-round of the exponential scheme. */ - int exp_subround; + uint32_t exp_subround; /** * The partner for the current exp-round @@ -201,17 +201,6 @@ struct ConsensusPeerInformation struct GNUNET_PeerIdentity peer_id; /** - * Do we connect to the peer, or does the peer connect to us? - * Only valid for all-to-all phases - */ - int is_outgoing; - - /** - * Did we receive/send a consensus hello? - */ - int hello; - - /** * Back-reference to the consensus session, * to that ConsensusPeerInformation can be used as a closure */ @@ -223,22 +212,14 @@ struct ConsensusPeerInformation int exp_subround_finished; /** - * GNUNET_YES if we synced inventory with this peer; - * GNUNET_NO otherwise. - */ - int inventory_synced; - - /** - * Round this peer seems to be in, according to the last SE we got. - * Necessary to store this, as we sometimes need to respond to a request from an - * older round, while we are already in the next round. + * Set operation we are currently executing with this peer. */ - enum ConsensusRound apparent_round; + struct GNUNET_SET_OperationHandle *set_op; /** - * Set operation we are currently executing with this peer. + * Has conclude been called on the set_op? */ - struct GNUNET_SET_OperationHandle *set_op; + int set_op_concluded; }; @@ -268,9 +249,8 @@ static struct GNUNET_SERVER_Handle *srv; static struct GNUNET_PeerIdentity my_peer; -/* static int -exp_subround_finished (const struct ConsensusSession *session) +have_exp_subround_finished (const struct ConsensusSession *session) { int not_finished; not_finished = 0; @@ -284,25 +264,9 @@ exp_subround_finished (const struct ConsensusSession *session) return GNUNET_YES; return GNUNET_NO; } -*/ -/* -static int -inventory_round_finished (struct ConsensusSession *session) -{ - int i; - int finished; - finished = 0; - for (i = 0; i < session->num_peers; i++) - if (GNUNET_YES == session->info[i].inventory_synced) - finished++; - if (finished >= (session->num_peers / 2)) - return GNUNET_YES; - return GNUNET_NO; -} -*/ /** * Destroy a session, free all resources associated with it. @@ -341,39 +305,6 @@ destroy_session (struct ConsensusSession *session) /** - * Start the inventory round, contact all peers we are supposed to contact. - * - * @param session the current session - */ -static void -start_inventory (struct ConsensusSession *session) -{ - int i; - int last; - - for (i = 0; i < session->num_peers; i++) - session->info[i].is_outgoing = GNUNET_NO; - - last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; - i = (session->local_peer_idx + 1) % session->num_peers; - while (i != last) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all\n", session->local_peer_idx, i); - session->info[i].is_outgoing = GNUNET_YES; - // embrace_peer (&session->info[i]); - i = (i + 1) % session->num_peers; - } - // tie-breaker for even number of peers - if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d contacting P%d in all-to-all (tie-breaker)\n", session->local_peer_idx, i); - session->info[last].is_outgoing = GNUNET_YES; - // embrace_peer (&session->info[last]); - } -} - - -/** * Start the next round. * This function can be invoked as a timeout task, or called manually (tc will be NULL then). * @@ -407,27 +338,8 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) subround_over (session, NULL); break; case CONSENSUS_ROUND_EXCHANGE: - /* handle two peers specially */ - if (session->num_peers <= 2) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: 2-peer consensus done\n", session->local_peer_idx); - //GNUNET_CONTAINER_multihashmap_iterate (session->values, send_client_elements_iter, session); - //send_client_conclude_done (session); - session->current_round = CONSENSUS_ROUND_FINISH; - return; - } - session->current_round = CONSENSUS_ROUND_INVENTORY; - start_inventory (session); - break; - case CONSENSUS_ROUND_INVENTORY: - session->current_round = CONSENSUS_ROUND_COMPLETION; - session->exp_round = 0; - subround_over (session, NULL); - break; - case CONSENSUS_ROUND_COMPLETION: + /* FIXME: send all elements to client */ session->current_round = CONSENSUS_ROUND_FINISH; - //send_client_conclude_done (session); - break; default: GNUNET_assert (0); } @@ -435,68 +347,91 @@ round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) /** - * Adapt the shuffle of the session for the current round. + * Create a new permutation for the session's peers in session->shuffle. + * Uses a Fisher-Yates shuffle with pseudo-randomness coming from + * both the global session id and the current round index. + * + * @param session the session to create the new permutation for */ static void shuffle (struct ConsensusSession *session) { - /* adapted from random_permute in util/crypto_random.c */ - /* FIXME - unsigned int *ret; - unsigned int i; - unsigned int tmp; - uint32_t x; + uint32_t i; + uint32_t randomness[session->num_peers-1]; + + if (NULL == session->shuffle) + session->shuffle = GNUNET_malloc (session->num_peers * sizeof (*session->shuffle)); + + GNUNET_CRYPTO_kdf (randomness, sizeof (randomness), &session->exp_round, sizeof (uint32_t), + &session->global_id, sizeof (struct GNUNET_HashCode)); + + for (i = 0; i < session->num_peers; i++) + session->shuffle[i] = i; - GNUNET_assert (n > 0); - ret = GNUNET_malloc (n * sizeof (unsigned int)); - for (i = 0; i < n; i++) - ret[i] = i; - for (i = n - 1; i > 0; i--) + for (i = session->num_peers - 1; i > 0; i--) { - x = GNUNET_CRYPTO_random_u32 (mode, i + 1); - tmp = ret[x]; - ret[x] = ret[i]; - ret[i] = tmp; + uint32_t x; + uint32_t tmp; + x = randomness[i-1]; + tmp = session->shuffle[x]; + session->shuffle[x] = session->shuffle[i]; + session->shuffle[i] = tmp; } - */ } /** * Find and set the partner_incoming and partner_outgoing of our peer, - * one of them may not exist in most cases. + * one of them may not exist (and thus set to NULL) if the number of peers + * in the session is not a power of two. * * @param session the consensus session */ static void find_partners (struct ConsensusSession *session) { - int mark[session->num_peers]; - int i; - - memset (mark, 0, session->num_peers * sizeof (int)); - session->partner_incoming = session->partner_outgoing = NULL; - for (i = 0; i < session->num_peers; i++) + int arc; + int partner_idx; + int largest_arc; + int num_ghosts; + + /* distance to neighboring peer in current subround */ + arc = 1 << session->exp_subround; + partner_idx = (session->local_peer_idx + arc) % session->num_peers; + largest_arc = 1; + while (largest_arc < session->num_peers) + largest_arc <<= 1; + num_ghosts = largest_arc - session->num_peers; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "num ghosts: %d\n", num_ghosts); + + if (0 == (session->local_peer_idx & arc)) { - int arc; - if (0 != mark[i]) - continue; - arc = (i + (1 << session->exp_subround)) % session->num_peers; - mark[i] = mark[arc] = 1; - GNUNET_assert (i != arc); - if (i == session->local_peer_idx) + /* we are outgoing */ + session->partner_outgoing = &session->info[session->shuffle[partner_idx]]; + /* are we a 'ghost' of a peer that would exist if + * the number of peers was a power of two, and thus have to partner + * with an additional peer? + */ + if (session->local_peer_idx < num_ghosts) { - GNUNET_assert (NULL == session->partner_outgoing); - session->partner_outgoing = &session->info[session->shuffle[arc]]; - session->partner_outgoing->exp_subround_finished = GNUNET_NO; + int ghost_partner_idx; + ghost_partner_idx = (session->local_peer_idx - arc) % session->num_peers; + /* platform dependent; modulo sometimes returns negative values */ + if (ghost_partner_idx < 0) + ghost_partner_idx += arc; + session->partner_incoming = &session->info[session->shuffle[ghost_partner_idx]]; } - if (arc == session->local_peer_idx) + else { - GNUNET_assert (NULL == session->partner_incoming); - session->partner_incoming = &session->info[session->shuffle[i]]; - session->partner_incoming->exp_subround_finished = GNUNET_NO; + session->partner_incoming = NULL; } } + else + { + session->partner_outgoing = NULL; + session->partner_incoming = &session->info[session->shuffle[partner_idx]]; + } } @@ -508,11 +443,42 @@ find_partners (struct ConsensusSession *session) * @param element a result element, only valid if status is GNUNET_SET_STATUS_OK * @param status see enum GNUNET_SET_Status */ -static void set_result_cb (void *cls, - const struct GNUNET_SET_Element *element, - enum GNUNET_SET_Status status) +static void +set_result_cb (void *cls, + const struct GNUNET_SET_Element *element, + enum GNUNET_SET_Status status) { - /* FIXME */ + struct ConsensusPeerInformation *cpi = cls; + + switch (status) + { + case GNUNET_SET_STATUS_OK: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: element\n"); + break; + case GNUNET_SET_STATUS_FAILURE: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: failure\n"); + break; + case GNUNET_SET_STATUS_HALF_DONE: + case GNUNET_SET_STATUS_DONE: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set result: done\n"); + cpi->exp_subround_finished = GNUNET_YES; + if (have_exp_subround_finished (cpi->session) == GNUNET_YES) + subround_over (cpi->session, NULL); + return; + default: + GNUNET_break (0); + return; + } + + switch (cpi->session->current_round) + { + case CONSENSUS_ROUND_EXCHANGE: + GNUNET_SET_add_element (cpi->session->element_set, element, NULL, NULL); + break; + default: + GNUNET_break (0); + return; + } } @@ -540,14 +506,6 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_SCHEDULER_cancel (session->round_timeout_tid); session->round_timeout_tid = GNUNET_SCHEDULER_NO_TASK; } - /* check if we are done with the log phase, 2-peer consensus only does one log round */ - if ( (session->exp_round == NUM_EXP_ROUNDS) || - ((session->num_peers == 2) && (session->exp_round == 1))) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d: exp-round over\n", session->local_peer_idx); - round_over (session, NULL); - return; - } if (session->exp_round == 0) { /* initialize everything for the log-rounds */ @@ -575,18 +533,27 @@ subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if (NULL != session->partner_outgoing) { + struct GNUNET_CONSENSUS_RoundContextMessage *msg; + msg = GNUNET_new (struct GNUNET_CONSENSUS_RoundContextMessage); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT); + msg->header.size = htons (sizeof *msg); + msg->round = htonl (session->current_round); + msg->exp_round = htonl (session->exp_round); + msg->exp_subround = htonl (session->exp_subround); + if (NULL != session->partner_outgoing->set_op) + { GNUNET_SET_operation_cancel (session->partner_outgoing->set_op); + } session->partner_outgoing->set_op = - GNUNET_SET_evaluate (session->element_set, - &session->partner_outgoing->peer_id, + GNUNET_SET_evaluate (&session->partner_outgoing->peer_id, &session->global_id, - NULL, /* FIXME */ + (struct GNUNET_MessageHeader *) msg, 0, /* FIXME */ GNUNET_SET_RESULT_ADDED, - set_result_cb, session); - - + set_result_cb, session->partner_outgoing); + GNUNET_SET_conclude (session->partner_outgoing->set_op, session->element_set); + session->partner_outgoing->set_op_concluded = GNUNET_YES; } #ifdef GNUNET_EXTRA_LOGGING @@ -642,6 +609,8 @@ compute_global_id (struct ConsensusSession *session, const struct GNUNET_HashCod int i; struct GNUNET_HashCode tmp; + /* FIXME: use kdf? */ + session->global_id = *session_id; for (i = 0; i < session->num_peers; ++i) { @@ -727,9 +696,6 @@ initialize_session_peer_list (struct ConsensusSession *session, } - - - /** * Called when another peer wants to do a set operation with the * local peer. @@ -750,7 +716,67 @@ set_listen_cb (void *cls, const struct GNUNET_MessageHeader *context_msg, struct GNUNET_SET_Request *request) { - /* FIXME */ + struct ConsensusSession *session = cls; + struct GNUNET_CONSENSUS_RoundContextMessage *msg = (struct GNUNET_CONSENSUS_RoundContextMessage *) context_msg; + struct ConsensusPeerInformation *cpi; + int index; + + if (NULL == context_msg) + { + GNUNET_break_op (0); + return; + } + + index = get_peer_idx (other_peer, session); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "result from %s\n", GNUNET_h2s (&other_peer->hashPubKey)); + + if (index < 0) + { + GNUNET_break_op (0); + return; + } + + cpi = &session->info[index]; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%d got result from P%d\n", session->local_peer_idx, index); + + switch (session->current_round) + { + case CONSENSUS_ROUND_EXCHANGE: + if (ntohl (msg->round) != CONSENSUS_ROUND_EXCHANGE) + { + GNUNET_break_op (0); + return; + } + if (ntohl (msg->exp_round) < session->exp_round) + { + GNUNET_break_op (0); + return; + } + if (ntohl (msg->exp_subround) < session->exp_subround) + { + GNUNET_break_op (0); + return; + } + if (NULL != cpi->set_op) + GNUNET_SET_operation_cancel (cpi->set_op); + cpi->set_op = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, + set_result_cb, &session->info[index]); + if (ntohl (msg->exp_subround) == session->exp_subround) + { + cpi->set_op_concluded = GNUNET_YES; + GNUNET_SET_conclude (cpi->set_op, session->element_set); + } + else + { + cpi->set_op_concluded = GNUNET_NO; + } + break; + default: + GNUNET_break_op (0); + return; + } } @@ -769,7 +795,9 @@ initialize_session (struct ConsensusSession *session, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session with %u peers\n", session->num_peers); compute_global_id (session, &join_msg->session_id); - /* check if some local client already owns the session. */ + /* check if some local client already owns the session. + * it is only legal to have a session with an existing global id + * if all other sessions with this global id are finished.*/ other_session = sessions_head; while (NULL != other_session) { @@ -789,6 +817,8 @@ initialize_session (struct ConsensusSession *session, session->local_peer_idx = get_peer_idx (&my_peer, session); GNUNET_assert (-1 != session->local_peer_idx); + session->element_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); + GNUNET_assert (NULL != session->element_set); session->set_listener = GNUNET_SET_listen (cfg, GNUNET_SET_OPERATION_UNION, &session->global_id, set_listen_cb, session); @@ -827,6 +857,8 @@ client_join (void *cls, { struct ConsensusSession *session; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join message sent by client\n"); + session = get_session_by_client (client); if (NULL != session) { @@ -835,9 +867,13 @@ client_join (void *cls, return; } session = GNUNET_new (struct ConsensusSession); + session->client = client; GNUNET_SERVER_client_keep (client); GNUNET_CONTAINER_DLL_insert (sessions_head, sessions_tail, session); initialize_session (session, (struct GNUNET_CONSENSUS_JoinMessage *) m); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "join done\n"); } @@ -858,12 +894,7 @@ client_insert (void *cls, struct GNUNET_SET_Element *element; ssize_t element_size; - session = sessions_head; - while (NULL != session) - { - if (session->client == client) - break; - } + session = get_session_by_client (client); if (NULL == session) { @@ -886,6 +917,7 @@ client_insert (void *cls, GNUNET_break (0); return; } + element = GNUNET_malloc (sizeof (struct GNUNET_SET_Element) + element_size); element->type = msg->element_type; element->size = element_size; @@ -893,6 +925,8 @@ client_insert (void *cls, element->data = &element[1]; GNUNET_SET_add_element (session->element_set, element, NULL, NULL); GNUNET_SERVER_receive_done (client, GNUNET_OK); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "P%u: element added\n", session->local_peer_idx); } @@ -911,10 +945,10 @@ client_conclude (void *cls, struct ConsensusSession *session; struct GNUNET_CONSENSUS_ConcludeMessage *cmsg; - cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "conclude requested\n"); + cmsg = (struct GNUNET_CONSENSUS_ConcludeMessage *) message; session = get_session_by_client (client); - if (NULL == session) { /* client not found */ @@ -922,14 +956,12 @@ client_conclude (void *cls, GNUNET_SERVER_client_disconnect (client); return; } - if (CONSENSUS_ROUND_BEGIN != session->current_round) { /* client requested conclude twice */ GNUNET_break (0); return; } - if (session->num_peers <= 1) { //send_client_conclude_done (session); @@ -937,7 +969,7 @@ client_conclude (void *cls, else { session->conclude_timeout = GNUNET_TIME_relative_ntoh (cmsg->timeout); - /* the 'begin' round is over, start with the next, real round */ + /* the 'begin' round is over, start with the next, actual round */ round_over (session, NULL); } @@ -964,6 +996,29 @@ shutdown_task (void *cls, /** + * Clean up after a client after it is + * disconnected (either by us or by itself) + * + * @param cls closure, unused + * @param client the client to clean up after + */ +void +handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) +{ + struct ConsensusSession *session; + + session = get_session_by_client (client); + if (NULL == session) + return; + if ((CONSENSUS_ROUND_BEGIN == session->current_round) || + (CONSENSUS_ROUND_FINISH == session->current_round)) + destroy_session (session); + else + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "client disconnected, but waiting for consensus to finish\n"); +} + + +/** * Start processing consensus requests. * * @param cls closure @@ -971,13 +1026,14 @@ shutdown_task (void *cls, * @param c configuration to use */ static void -run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *c) +run (void *cls, struct GNUNET_SERVER_Handle *server, + const struct GNUNET_CONFIGURATION_Handle *c) { static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { - {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, - {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, {&client_conclude, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE, sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)}, + {&client_insert, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT, 0}, + {&client_join, NULL, GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN, 0}, {NULL, NULL, 0, 0} }; @@ -992,6 +1048,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU } GNUNET_SERVER_add_handlers (server, server_handlers); GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); + GNUNET_SERVER_disconnect_notify (server, handle_client_disconnect, NULL); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "consensus running\n"); } diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 437636c995..37facf84e2 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf @@ -5,7 +5,7 @@ HOSTNAME = localhost HOME = $SERVICEHOME BINARY = gnunet-service-consensus #PREFIX = gdbserver :12345 -PREFIX = valgrind --leak-check=full +PREFIX = valgrind ACCEPT_FROM = 127.0.0.1; ACCEPT_FROM6 = ::1; UNIXPATH = /tmp/gnunet-service-consensus.sock @@ -19,7 +19,11 @@ OPTIONS = -LERROR [arm] -DEFAULTSERVICES = core consensus +DEFAULTSERVICES = core consensus set + +[set] +OPTIONS = -L INFO +PREFIX = valgrind [testbed] diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index 3b0f0783ad..c6ba8eec3c 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -1404,7 +1404,6 @@ listen_set_union (void *cls, neighbor->my_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); neighbor->set_op = GNUNET_SET_accept (request, - neighbor->my_set /* FIXME: pass later! */, GNUNET_SET_RESULT_ADDED, &handle_set_union_result, neighbor); @@ -1428,8 +1427,7 @@ initiate_set_union (void *cls, neighbor->initiate_task = GNUNET_SCHEDULER_NO_TASK; neighbor->my_set = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); - neighbor->set_op = GNUNET_SET_evaluate (neighbor->my_set /* FIXME: pass later! */, - &neighbor->peer, + neighbor->set_op = GNUNET_SET_evaluate (&neighbor->peer, &neighbor->real_session_id, NULL, 0 /* FIXME: salt */, diff --git a/src/include/gnunet_consensus_service.h b/src/include/gnunet_consensus_service.h index 66d48e0e20..db7509976e 100644 --- a/src/include/gnunet_consensus_service.h +++ b/src/include/gnunet_consensus_service.h @@ -39,28 +39,7 @@ extern "C" #include "gnunet_common.h" #include "gnunet_time_lib.h" #include "gnunet_configuration_lib.h" - - -/** - * An element of the consensus set. - */ -struct GNUNET_CONSENSUS_Element -{ - /** - * The actual data of the element. - */ - const void *data; - - /** - * Size of the element's data. - */ - uint16_t size; - - /** - * Application specific element type - */ - uint16_t type; -}; +#include "gnunet_set_service.h" /** @@ -73,7 +52,7 @@ struct GNUNET_CONSENSUS_Element * @param element new element, NULL on error */ typedef void (*GNUNET_CONSENSUS_ElementCallback) (void *cls, - const struct GNUNET_CONSENSUS_Element *element); + const struct GNUNET_SET_Element *element); @@ -138,7 +117,7 @@ typedef void (*GNUNET_CONSENSUS_InsertDoneCallback) (void *cls, */ void GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, - const struct GNUNET_CONSENSUS_Element *element, + const struct GNUNET_SET_Element *element, GNUNET_CONSENSUS_InsertDoneCallback idc, void *idc_cls); diff --git a/src/include/gnunet_mq_lib.h b/src/include/gnunet_mq_lib.h index 59b692cf02..54ea806a5a 100644 --- a/src/include/gnunet_mq_lib.h +++ b/src/include/gnunet_mq_lib.h @@ -53,36 +53,6 @@ */ #define GNUNET_MQ_msg(mvar, type) GNUNET_MQ_msg_extra(mvar, 0, type) -/** - * Append data to the end of an existing MQ message. - * If the operation is successful, mqm is changed to point to the new MQ message, - * and GNUNET_OK is returned. - * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed, - * the user of this API must take care of disposing the already allocated message - * (either by sending it, or by using GNUNET_MQ_discard) - * - * @param mqm MQ message to augment with additional data - * @param src source buffer for the additional data - * @param len length of the additional data - * @return GNUNET_SYSERR if nesting the message failed, - * GNUNET_OK on success - */ -#define GNUNET_MQ_nest(mqm, src, len) GNUNET_MQ_nest_ (&mqm, src, len) - - -/** - * Append a message to the end of an existing MQ message. - * If the operation is successful, mqm is changed to point to the new MQ message, - * and GNUNET_OK is returned. - * On failure, GNUNET_SYSERR is returned, and the pointer mqm is not changed, - * the user of this API must take care of disposing the already allocated message - * (either by sending it, or by using GNUNET_MQ_discard) - * - * @param mqm MQ message to augment with additional data - * @param mh the message to append, must be of type 'struct GNUNET_MessageHeader *' - */ -#define GNUNET_MQ_nest_mh(mqm, mh) ((NULL == mh) ? (GNUNET_OK) : GNUNET_MQ_nest((mqm), (mh), ntohs ((mh)->size))) - /** * Allocate a GNUNET_MQ_Message, where the message only consists of a header. @@ -105,6 +75,40 @@ /** + * Allocate a GNUNET_MQ_Message, and append a payload message after the given + * message struct. + * + * @param mvar pointer to a message struct, will be changed to point at the newly allocated message, + * whose size is 'sizeof(*mvar) + ntohs (mh->size)' + * @param type message type of the allocated message, has no effect on the nested message + * @param mh message to nest + * @return a newly allocated 'struct GNUNET_MQ_Message *' + */ +#define GNUNET_MQ_msg_nested_mh(mvar, type, mh) GNUNET_MQ_msg_nested_mh_((((void)(mvar)->header), (struct GNUNET_MessageHeader**) &(mvar)), sizeof (*(mvar)), (type), mh) + + +/** + * Return a pointer to the message at the end of the given message. + * + * @param var pointer to a message struct, the type of the expression determines the base size, + * the space after the base size is the nested message + * @return a 'struct GNUNET_MessageHeader *' that points at the nested message of the given message, + * or NULL if the given message in 'var' does not have any space after the message struct + */ +#define GNUNET_MQ_extract_nested_mh(var) GNUNET_MQ_extract_nested_mh_ ((struct GNUNET_MessageHeader *) (var), sizeof (*(var))) + + +struct GNUNET_MessageHeader * +GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size); + + +struct GNUNET_MQ_Message * +GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, + const struct GNUNET_MessageHeader *nested_mh); + + + +/** * End-marker for the handlers array */ #define GNUNET_MQ_HANDLERS_END {NULL, 0, 0} @@ -128,7 +132,8 @@ enum GNUNET_MQ_Error * @param cls closure * @param msg the received message */ -typedef void (*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); +typedef void +(*GNUNET_MQ_MessageCallback) (void *cls, const struct GNUNET_MessageHeader *msg); /** @@ -151,10 +156,12 @@ typedef void * * @param cls closure */ -typedef void (*GNUNET_MQ_NotifyCallback) (void *cls); +typedef void +(*GNUNET_MQ_NotifyCallback) (void *cls); -typedef void (*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); +typedef void +(*GNUNET_MQ_ErrorHandler) (void *cls, enum GNUNET_MQ_Error error); struct GNUNET_MQ_Message @@ -287,6 +294,7 @@ struct GNUNET_MQ_Handler }; + /** * Create a new message for MQ. * @@ -300,21 +308,6 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) /** - * Resize the the mq message pointed to by mqmp, - * and append the given data to it. - * - * @param mqmp pointer to a mq message pointer - * @param src source of the data to append - * @param len length of the data to append - * @return GNUNET_OK on success, - * GNUNET_SYSERR on error (e.g. if len is too large) - */ -int -GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, - const void *src, uint16_t len); - - -/** * Discard the message queue message, free all * allocated resources. Must be called in the event * that a message is created but should not actually be sent. diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index 15bbae2e8c..85c643f7de 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -1754,11 +1754,18 @@ extern "C" */ #define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ABORT 548 +/** + * Abort a round, don't send requested elements anymore + */ +#define GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ROUND_CONTEXT 547 + /******************************************************************************* * SET message types ******************************************************************************/ +#define GNUNET_MESSAGE_TYPE_SET_REJECT 569 + /** * Cancel a set operation */ @@ -1800,44 +1807,49 @@ extern "C" #define GNUNET_MESSAGE_TYPE_SET_EVALUATE 577 /** - * Evaluate a set operation + * Start a set operation with the given set + */ +#define GNUNET_MESSAGE_TYPE_SET_CONCLUDE 578 + +/** + * Notify the client of a request from a remote peer */ -#define GNUNET_MESSAGE_TYPE_SET_REQUEST 578 +#define GNUNET_MESSAGE_TYPE_SET_REQUEST 579 /** - * Evaluate a set operation. + * Create a new local set */ -#define GNUNET_MESSAGE_TYPE_SET_CREATE 579 +#define GNUNET_MESSAGE_TYPE_SET_CREATE 580 /** - * Evaluate a set operation. + * Request a set operation from a remote peer. */ -#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 580 +#define GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST 581 /** * Strata estimator. */ -#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 581 +#define GNUNET_MESSAGE_TYPE_SET_P2P_SE 582 /** * Invertible bloom filter. */ -#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 582 +#define GNUNET_MESSAGE_TYPE_SET_P2P_IBF 583 /** * Actual set elements. */ -#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 583 +#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS 584 /** * Requests for the elements with the given hashes. */ -#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 584 +#define GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENT_REQUESTS 585 /** * Operation is done. */ -#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 585 +#define GNUNET_MESSAGE_TYPE_SET_P2P_DONE 586 diff --git a/src/include/gnunet_set_service.h b/src/include/gnunet_set_service.h index ce413c0de4..2684df00a6 100644 --- a/src/include/gnunet_set_service.h +++ b/src/include/gnunet_set_service.h @@ -257,11 +257,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); /** - * Evaluate a set operation with our set and the set of another peer. + * Create a set operation for evaluation with another peer. + * The evaluation will not start until the client provides + * a local set with GNUNET_SET_conclude. * - * @param set set to use -- FIXME: remove - * this argument, use GNUNET_SET_conclude instead! - * @param salt salt for HKDF (explain more here) * @param other_peer peer with the other set * @param app_id hash for the application using the set * @param context_msg additional information for the request @@ -275,8 +274,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set); * @return a handle to cancel the operation */ struct GNUNET_SET_OperationHandle * -GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, - const struct GNUNET_PeerIdentity *other_peer, +GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_HashCode *app_id, const struct GNUNET_MessageHeader *context_msg, uint16_t salt, @@ -315,13 +313,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh); /** - * Accept a request we got via GNUNET_SET_listen. Must be called - * during GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' - * becomes invalid afterwards. + * Accept a request we got via GNUNET_SET_listen. Must be called during + * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid + * afterwards. + * Call GNUNET_SET_conclude to provide the local set to use for the operation, + * and to begin the exchange with the remote peer. * * @param request request to accept - * @param set set used for the requested operation -- FIXME: remove - * this argument, use GNUNET_SET_conclude instead! * @param result_mode specified how results will be returned, * see 'GNUNET_SET_ResultMode'. * @param result_cb callback for the results @@ -330,7 +328,6 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh); */ struct GNUNET_SET_OperationHandle * GNUNET_SET_accept (struct GNUNET_SET_Request *request, - struct GNUNET_SET_Handle *set, enum GNUNET_SET_ResultMode result_mode, GNUNET_SET_ResultIterator result_cb, void *cls); @@ -353,9 +350,9 @@ GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, /** - * Cancel the given set operation. FIXME: do clients have - * to cancel the operatino if the GNUNET_SET_ResultIterator - * has been called with timeout/error/done? + * Cancel the given set operation. + * May not be called after the operation's GNUNET_SET_ResultIterator has been + * called with a status that indicates error, timeout or done. * * @param oh set operation to cancel */ diff --git a/src/set/gnunet-service-set.c b/src/set/gnunet-service-set.c index 2aea503653..4da7188791 100644 --- a/src/set/gnunet-service-set.c +++ b/src/set/gnunet-service-set.c @@ -226,6 +226,23 @@ destroy_incoming (struct Incoming *incoming) GNUNET_free (incoming); } +static struct Listener * +get_listener_by_target (enum GNUNET_SET_OperationType op, + const struct GNUNET_HashCode *app_id) +{ + struct Listener *l; + + for (l = listeners_head; NULL != l; l = l->next) + { + if (l->operation != op) + continue; + if (0 != GNUNET_CRYPTO_hash_cmp (app_id, &l->app_id)) + continue; + return l; + } + return NULL; +} + /** * Handle a request for a set operation from @@ -240,62 +257,33 @@ handle_p2p_operation_request (void *cls, const struct GNUNET_MessageHeader *mh) struct Incoming *incoming = cls; const struct OperationRequestMessage *msg = (const struct OperationRequestMessage *) mh; struct GNUNET_MQ_Message *mqm; - struct RequestMessage *cmsg; + struct GNUNET_SET_RequestMessage *cmsg; struct Listener *listener; const struct GNUNET_MessageHeader *context_msg; - if (ntohs (mh->size) < sizeof *msg) + context_msg = GNUNET_MQ_extract_nested_mh (msg); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", + ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); + listener = get_listener_by_target (ntohs (msg->operation), &msg->app_id); + if (NULL == listener) { - /* message is to small for its type */ - GNUNET_break (0); - destroy_incoming (incoming); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "set operation request from peer failed: " + "no set with matching application ID and operation type\n"); return; } - else if (ntohs (mh->size) == sizeof *msg) - { - /* there is no context message */ - context_msg = NULL; - } - else + mqm = GNUNET_MQ_msg_nested_mh (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST, context_msg); + if (NULL == mqm) { - context_msg = &msg[1].header; - if ((ntohs (context_msg->size) + sizeof *msg) != ntohs (msg->header.size)) - { - /* size of context message is invalid */ - GNUNET_break (0); - destroy_incoming (incoming); - return; - } - } - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received P2P operation request (op %u, app %s)\n", - ntohs (msg->operation), GNUNET_h2s (&msg->app_id)); - - /* find the appropriate listener */ - for (listener = listeners_head; - listener != NULL; - listener = listener->next) - { - if ( (0 != GNUNET_CRYPTO_hash_cmp (&msg->app_id, &listener->app_id)) || - (ntohs (msg->operation) != listener->operation) ) - continue; - mqm = GNUNET_MQ_msg (cmsg, GNUNET_MESSAGE_TYPE_SET_REQUEST); - if (GNUNET_OK != GNUNET_MQ_nest_mh (mqm, context_msg)) - { - /* FIXME: disconnect the peer */ - GNUNET_MQ_discard (mqm); - GNUNET_break (0); - return; - } - incoming->accept_id = accept_id++; - cmsg->accept_id = htonl (incoming->accept_id); - GNUNET_MQ_send (listener->client_mq, mqm); + /* FIXME: disconnect the peer */ + GNUNET_break_op (0); return; } - - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "set operation request from peer failed: " - "no set with matching application ID and operation type\n"); + incoming->accept_id = accept_id++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending request with accept id %u\n", incoming->accept_id); + cmsg->accept_id = htonl (incoming->accept_id); + cmsg->peer_id = incoming->peer; + GNUNET_MQ_send (listener->client_mq, mqm); } @@ -311,7 +299,7 @@ handle_client_create (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *m) { - struct SetCreateMessage *msg = (struct SetCreateMessage *) m; + struct GNUNET_SET_CreateMessage *msg = (struct GNUNET_SET_CreateMessage *) m; struct Set *set; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client created new set (operation %u)\n", @@ -363,7 +351,7 @@ handle_client_listen (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *m) { - struct ListenMessage *msg = (struct ListenMessage *) m; + struct GNUNET_SET_ListenMessage *msg = (struct GNUNET_SET_ListenMessage *) m; struct Listener *listener; if (NULL != get_listener (client)) @@ -410,7 +398,7 @@ handle_client_remove (void *cls, switch (set->operation) { case GNUNET_SET_OPERATION_UNION: - _GSS_union_remove ((struct ElementMessage *) m, set); + _GSS_union_remove ((struct GNUNET_SET_ElementMessage *) m, set); case GNUNET_SET_OPERATION_INTERSECTION: /* FIXME: cfuchs */ break; @@ -423,6 +411,38 @@ handle_client_remove (void *cls, } + +/** + * Called when the client wants to reject an operation + * request from another peer. + * + * @param cls unused + * @param client client that sent the message + * @param m message sent by the client + */ +static void +handle_client_reject (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *m) +{ + struct Incoming *incoming; + struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) m; + + GNUNET_break (0 == ntohl (msg->request_id)); + + incoming = get_incoming (ntohl (msg->accept_reject_id)); + if (NULL == incoming) + { + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); + destroy_incoming (incoming); + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} + + + /** * Called when a client wants to add an element to a * set it inhabits. @@ -448,7 +468,7 @@ handle_client_add (void *cls, switch (set->operation) { case GNUNET_SET_OPERATION_UNION: - _GSS_union_add ((struct ElementMessage *) m, set); + _GSS_union_add ((struct GNUNET_SET_ElementMessage *) m, set); case GNUNET_SET_OPERATION_INTERSECTION: /* FIXME: cfuchs */ break; @@ -490,7 +510,7 @@ handle_client_evaluate (void *cls, /* FIXME: cfuchs */ break; case GNUNET_SET_OPERATION_UNION: - _GSS_union_evaluate ((struct EvaluateMessage *) m, set); + _GSS_union_evaluate ((struct GNUNET_SET_EvaluateMessage *) m, set); break; default: GNUNET_assert (0); @@ -502,23 +522,6 @@ handle_client_evaluate (void *cls, /** - * Handle a cancel request from a client. - * - * @param cls unused - * @param client the client - * @param m the cancel message - */ -static void -handle_client_cancel (void *cls, - struct GNUNET_SERVER_Client *client, - const struct GNUNET_MessageHeader *m) -{ - /* FIXME: implement */ - GNUNET_SERVER_receive_done (client, GNUNET_OK); -} - - -/** * Handle an ack from a client. * * @param cls unused @@ -550,25 +553,20 @@ handle_client_accept (void *cls, { struct Set *set; struct Incoming *incoming; - struct AcceptMessage *msg = (struct AcceptMessage *) mh; + struct GNUNET_SET_AcceptRejectMessage *msg = (struct GNUNET_SET_AcceptRejectMessage *) mh; + incoming = get_incoming (ntohl (msg->accept_reject_id)); - incoming = get_incoming (ntohl (msg->accept_id)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client accepting %u\n", ntohl (msg->accept_reject_id)); if (NULL == incoming) { + GNUNET_break (0); GNUNET_SERVER_client_disconnect (client); return; } - if (0 == ntohl (msg->request_id)) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer request rejected by client\n"); - destroy_incoming (incoming); - GNUNET_SERVER_receive_done (client, GNUNET_OK); - return; - } set = get_set (client); @@ -687,14 +685,14 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGURATION_Handle *cfg) { static const struct GNUNET_SERVER_MessageHandler server_handlers[] = { + {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0}, + {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0}, + {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, {handle_client_create, NULL, GNUNET_MESSAGE_TYPE_SET_CREATE, 0}, + {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, {handle_client_listen, NULL, GNUNET_MESSAGE_TYPE_SET_LISTEN, 0}, - {handle_client_add, NULL, GNUNET_MESSAGE_TYPE_SET_ADD, 0}, + {handle_client_reject, NULL, GNUNET_MESSAGE_TYPE_SET_REJECT, 0}, {handle_client_remove, NULL, GNUNET_MESSAGE_TYPE_SET_REMOVE, 0}, - {handle_client_cancel, NULL, GNUNET_MESSAGE_TYPE_SET_CANCEL, 0}, - {handle_client_evaluate, NULL, GNUNET_MESSAGE_TYPE_SET_EVALUATE, 0}, - {handle_client_ack, NULL, GNUNET_MESSAGE_TYPE_SET_ACK, 0}, - {handle_client_accept, NULL, GNUNET_MESSAGE_TYPE_SET_ACCEPT, 0}, {NULL, NULL, 0, 0} }; @@ -705,6 +703,8 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, stream_listen_socket = GNUNET_STREAM_listen (cfg, GNUNET_APPLICATION_TYPE_SET, &stream_listen_cb, NULL, GNUNET_STREAM_OPTION_END); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "set service running\n"); } diff --git a/src/set/gnunet-service-set.h b/src/set/gnunet-service-set.h index bea77416ed..15199eba49 100644 --- a/src/set/gnunet-service-set.h +++ b/src/set/gnunet-service-set.h @@ -217,7 +217,7 @@ _GSS_union_set_create (void); * @parem set the set to evaluate the operation with */ void -_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set); +_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set); /** @@ -227,7 +227,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set); * @param set set to add the element to */ void -_GSS_union_add (struct ElementMessage *m, struct Set *set); +_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set); /** @@ -238,7 +238,7 @@ _GSS_union_add (struct ElementMessage *m, struct Set *set); * @param set set to remove the element from */ void -_GSS_union_remove (struct ElementMessage *m, struct Set *set); +_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set); /** @@ -258,7 +258,7 @@ _GSS_union_set_destroy (struct Set *set); * @param incoming information about the requesting remote peer */ void -_GSS_union_accept (struct AcceptMessage *m, struct Set *set, +_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, struct Incoming *incoming); diff --git a/src/set/gnunet-service-set_union.c b/src/set/gnunet-service-set_union.c index c651a03818..6d9658ee58 100644 --- a/src/set/gnunet-service-set_union.c +++ b/src/set/gnunet-service-set_union.c @@ -245,8 +245,7 @@ struct ElementEntry /** - * Information about the element used for - * a specific union operation. + * Entries in the key-to-element map of the union set. */ struct KeyEntry { @@ -401,11 +400,14 @@ destroy_key_to_element_iter (void *cls, static void destroy_union_operation (struct UnionEvaluateOperation *eo) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op\n"); + if (NULL != eo->mq) { GNUNET_MQ_destroy (eo->mq); eo->mq = NULL; } + if (NULL != eo->socket) { GNUNET_STREAM_close (eo->socket); @@ -433,12 +435,16 @@ destroy_union_operation (struct UnionEvaluateOperation *eo) eo->key_to_element = NULL; } - GNUNET_CONTAINER_DLL_remove (eo->set->state.u->ops_head, eo->set->state.u->ops_tail, eo); GNUNET_free (eo); - /* FIXME: free and destroy everything else */ + + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "destroying union op done\n"); + + + /* FIXME: do a garbage collection of the set generations */ } @@ -452,7 +458,7 @@ static void fail_union_operation (struct UnionEvaluateOperation *eo) { struct GNUNET_MQ_Message *mqm; - struct ResultMessage *msg; + struct GNUNET_SET_ResultMessage *msg; mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_RESULT); msg->result_status = htons (GNUNET_SET_STATUS_FAILURE); @@ -495,20 +501,25 @@ send_operation_request (struct UnionEvaluateOperation *eo) struct GNUNET_MQ_Message *mqm; struct OperationRequestMessage *msg; - mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST); - if (NULL != eo->context_msg) - if (GNUNET_OK != GNUNET_MQ_nest (mqm, eo->context_msg, ntohs (eo->context_msg->size))) - { - /* the context message is too large */ - GNUNET_break (0); - GNUNET_SERVER_client_disconnect (eo->set->client); - GNUNET_MQ_discard (mqm); - return; - } + mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_P2P_OPERATION_REQUEST, eo->context_msg); + + if (NULL == mqm) + { + /* the context message is too large */ + GNUNET_break (0); + GNUNET_SERVER_client_disconnect (eo->set->client); + return; + } msg->operation = htons (GNUNET_SET_OPERATION_UNION); msg->app_id = eo->app_id; GNUNET_MQ_send (eo->mq, mqm); + if (NULL != eo->context_msg) + { + GNUNET_free (eo->context_msg); + eo->context_msg = NULL; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sent op request\n"); } @@ -537,7 +548,7 @@ insert_element_iterator (void *cls, { if (old_k->ibf_key.key_val == new_k->ibf_key.key_val) { - new_k->next_colliding = old_k; + new_k->next_colliding = old_k->next_colliding; old_k->next_colliding = new_k; return GNUNET_NO; } @@ -568,12 +579,11 @@ insert_element (struct UnionEvaluateOperation *eo, struct ElementEntry *ee) ret = GNUNET_CONTAINER_multihashmap32_get_multiple (eo->key_to_element, (uint32_t) ibf_key.key_val, insert_element_iterator, k); + /* was the element inserted into a colliding bucket? */ if (GNUNET_SYSERR == ret) - { - GNUNET_assert (NULL != k->next_colliding); return; - } + GNUNET_CONTAINER_multihashmap32_put (eo->key_to_element, (uint32_t) ibf_key.key_val, k, GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); } @@ -781,8 +791,8 @@ handle_p2p_strata_estimator (void *cls, const struct GNUNET_MessageHeader *mh) */ static int send_element_iterator (void *cls, - uint32_t key, - void *value) + uint32_t key, + void *value) { struct SendElementClosure *sec = cls; struct IBF_Key ibf_key = sec->ibf_key; @@ -795,15 +805,18 @@ send_element_iterator (void *cls, { const struct GNUNET_SET_Element *const element = &ke->element->element; struct GNUNET_MQ_Message *mqm; + struct GNUNET_MessageHeader *mh; GNUNET_assert (ke->ibf_key.key_val == ibf_key.key_val); - mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); - if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) + mqm = GNUNET_MQ_msg_header_extra (mh, element->size, GNUNET_MESSAGE_TYPE_SET_P2P_ELEMENTS); + if (NULL == mqm) { + /* element too large */ GNUNET_break (0); - GNUNET_MQ_discard (mqm); continue; } + memcpy (&mh[1], element->data, element->size); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "sending element to client\n"); GNUNET_MQ_send (eo->mq, mqm); ke = ke->next_colliding; } @@ -975,34 +988,42 @@ send_client_element (struct UnionEvaluateOperation *eo, struct GNUNET_SET_Element *element) { struct GNUNET_MQ_Message *mqm; - struct ResultMessage *rm; + struct GNUNET_SET_ResultMessage *rm; GNUNET_assert (0 != eo->request_id); - mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); - rm->result_status = htons (GNUNET_SET_STATUS_OK); - rm->request_id = htonl (eo->request_id); - if (GNUNET_OK != GNUNET_MQ_nest (mqm, element->data, element->size)) + mqm = GNUNET_MQ_msg_extra (rm, element->size, GNUNET_MESSAGE_TYPE_SET_RESULT); + if (NULL == mqm) { GNUNET_MQ_discard (mqm); GNUNET_break (0); return; } - + rm->result_status = htons (GNUNET_SET_STATUS_OK); + rm->request_id = htonl (eo->request_id); + memcpy (&rm[1], element->data, element->size); GNUNET_MQ_send (eo->set->client_mq, mqm); } /** - * Callback used for notifications + * Completion callback for shutdown * - * @param cls closure + * @param cls the closure from GNUNET_STREAM_shutdown call + * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, + * SHUT_RDWR) */ -static void -client_done_sent_cb (void *cls) +/* +static void +stream_shutdown_cb (void *cls, + int operation) { //struct UnionEvaluateOperation *eo = cls; - /* FIXME: destroy eo */ + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "stream shutdown\n"); + + // destroy_union_operation (eo); } +*/ /** @@ -1018,16 +1039,15 @@ static void send_client_done_and_destroy (struct UnionEvaluateOperation *eo) { struct GNUNET_MQ_Message *mqm; - struct ResultMessage *rm; + struct GNUNET_SET_ResultMessage *rm; GNUNET_assert (0 != eo->request_id); mqm = GNUNET_MQ_msg (rm, GNUNET_MESSAGE_TYPE_SET_RESULT); rm->request_id = htonl (eo->request_id); rm->result_status = htons (GNUNET_SET_STATUS_DONE); - GNUNET_MQ_notify_sent (mqm, client_done_sent_cb, eo); GNUNET_MQ_send (eo->set->client_mq, mqm); - /* FIXME: destroy the eo */ + // GNUNET_STREAM_shutdown (eo->socket, SHUT_RDWR, stream_shutdown_cb, eo); } @@ -1199,18 +1219,25 @@ stream_open_cb (void *cls, * @parem set the set to evaluate the operation with */ void -_GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) +_GSS_union_evaluate (struct GNUNET_SET_EvaluateMessage *m, struct Set *set) { struct UnionEvaluateOperation *eo; + struct GNUNET_MessageHeader *context_msg; eo = GNUNET_new (struct UnionEvaluateOperation); - eo->peer = m->peer; + eo->peer = m->target_peer; eo->set = set; eo->request_id = htonl (m->request_id); GNUNET_assert (0 != eo->request_id); eo->se = strata_estimator_dup (set->state.u->se); eo->salt = ntohs (m->salt); eo->app_id = m->app_id; + + context_msg = GNUNET_MQ_extract_nested_mh (m); + if (NULL != context_msg) + { + eo->context_msg = GNUNET_copy_message (context_msg); + } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "evaluating union operation, (app %s)\n", @@ -1235,7 +1262,7 @@ _GSS_union_evaluate (struct EvaluateMessage *m, struct Set *set) * @param incoming information about the requesting remote peer */ void -_GSS_union_accept (struct AcceptMessage *m, struct Set *set, +_GSS_union_accept (struct GNUNET_SET_AcceptRejectMessage *m, struct Set *set, struct Incoming *incoming) { struct UnionEvaluateOperation *eo; @@ -1250,7 +1277,6 @@ _GSS_union_accept (struct AcceptMessage *m, struct Set *set, GNUNET_assert (0 != ntohl (m->request_id)); eo->request_id = ntohl (m->request_id); eo->se = strata_estimator_dup (set->state.u->se); - eo->set = set; // FIXME: redundant!? eo->mq = incoming->mq; /* transfer ownership of mq and socket from incoming to eo */ incoming->mq = NULL; @@ -1299,7 +1325,7 @@ _GSS_union_set_create (void) * @param set set to add the element to */ void -_GSS_union_add (struct ElementMessage *m, struct Set *set) +_GSS_union_add (struct GNUNET_SET_ElementMessage *m, struct Set *set) { struct ElementEntry *ee; struct ElementEntry *ee_dup; @@ -1357,7 +1383,9 @@ _GSS_union_set_destroy (struct Set *set) destroy_elements (set->state.u); while (NULL != set->state.u->ops_head) + { destroy_union_operation (set->state.u->ops_head); + } } /** @@ -1368,7 +1396,7 @@ _GSS_union_set_destroy (struct Set *set) * @param set set to remove the element from */ void -_GSS_union_remove (struct ElementMessage *m, struct Set *set) +_GSS_union_remove (struct GNUNET_SET_ElementMessage *m, struct Set *set) { struct GNUNET_HashCode hash; struct ElementEntry *ee; diff --git a/src/set/gnunet-set.c b/src/set/gnunet-set.c index 5f2d1c9761..ae84610fc5 100644 --- a/src/set/gnunet-set.c +++ b/src/set/gnunet-set.c @@ -91,11 +91,12 @@ listen_cb (void *cls, const struct GNUNET_MessageHeader *context_msg, struct GNUNET_SET_Request *request) { + struct GNUNET_SET_OperationHandle *oh; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); GNUNET_SET_listen_cancel (listen_handle); - GNUNET_SET_accept (request, set2, - GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); + oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); + GNUNET_SET_conclude (oh, set2); } @@ -107,11 +108,14 @@ listen_cb (void *cls, static void start (void *cls) { + struct GNUNET_SET_OperationHandle *oh; + listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, &app_id, listen_cb, NULL); - GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, - GNUNET_SET_RESULT_ADDED, - result_cb_set1, NULL); + oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, + GNUNET_SET_RESULT_ADDED, + result_cb_set1, NULL); + GNUNET_SET_conclude (oh, set1); } diff --git a/src/set/set.h b/src/set/set.h index ad2200de93..7ec3e6cb25 100644 --- a/src/set/set.h +++ b/src/set/set.h @@ -29,17 +29,12 @@ #include "platform.h" #include "gnunet_common.h" - -/** - * The service sends up to GNUNET_SET_ACK_WINDOW messages per client handle, - * the client should send an ack every GNUNET_SET_ACK_WINDOW/2 messages. - */ -#define GNUNET_SET_ACK_WINDOW 8 +#define GNUNET_SET_ACK_WINDOW 10 GNUNET_NETWORK_STRUCT_BEGIN -struct SetCreateMessage +struct GNUNET_SET_CreateMessage { /** * Type: GNUNET_MESSAGE_TYPE_SET_CREATE @@ -54,7 +49,7 @@ struct SetCreateMessage }; -struct ListenMessage +struct GNUNET_SET_ListenMessage { /** * Type: GNUNET_MESSAGE_TYPE_SET_LISTEN @@ -74,32 +69,31 @@ struct ListenMessage }; -struct AcceptMessage +struct GNUNET_SET_AcceptRejectMessage { /** - * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT + * Type: GNUNET_MESSAGE_TYPE_SET_ACCEPT or + * GNUNET_MESSAGE_TYPE_SET_REJECT */ struct GNUNET_MessageHeader header; /** - * Request id that will be sent along with - * results for the accepted operation. - * Chosen by the client. - * Must be 0 if the request has been rejected. + * ID of the incoming request we want to accept / reject. */ - uint32_t request_id GNUNET_PACKED; + uint32_t accept_reject_id GNUNET_PACKED; /** - * ID of the incoming request we want to accept / reject. + * Request ID to identify responses, + * must be 0 if we don't accept the request. */ - uint32_t accept_id GNUNET_PACKED; + uint32_t request_id GNUNET_PACKED; }; /** * A request for an operation with another client. */ -struct RequestMessage +struct GNUNET_SET_RequestMessage { /** * Type: GNUNET_MESSAGE_TYPE_SET_Request. @@ -107,21 +101,21 @@ struct RequestMessage struct GNUNET_MessageHeader header; /** - * ID of the request we want to accept, - * chosen by the service. + * Identity of the requesting peer. */ - uint32_t accept_id GNUNET_PACKED; + struct GNUNET_PeerIdentity peer_id; /** - * Identity of the requesting peer. + * ID of the to identify the request when accepting or + * rejecting it. */ - struct GNUNET_PeerIdentity peer_id; + uint32_t accept_id GNUNET_PACKED; /* rest: nested context message */ }; -struct EvaluateMessage +struct GNUNET_SET_EvaluateMessage { /** * Type: GNUNET_MESSAGE_TYPE_SET_EVALUATE @@ -136,7 +130,7 @@ struct EvaluateMessage /** * Peer to evaluate the operation with */ - struct GNUNET_PeerIdentity peer; + struct GNUNET_PeerIdentity target_peer; /** * Application id @@ -157,7 +151,7 @@ struct EvaluateMessage }; -struct ResultMessage +struct GNUNET_SET_ResultMessage { /** * Type: GNUNET_MESSAGE_TYPE_SET_RESULT @@ -184,7 +178,7 @@ struct ResultMessage }; -struct ElementMessage +struct GNUNET_SET_ElementMessage { /** * Type: GNUNET_MESSAGE_TYPE_SET_ADD or @@ -200,20 +194,6 @@ struct ElementMessage }; -struct CancelMessage -{ - /** - * Type: GNUNET_MESSAGE_TYPE_SET_CANCEL - */ - struct GNUNET_MessageHeader header; - - /** - * id we want to cancel result belongs to - */ - uint32_t request_id GNUNET_PACKED; -}; - - GNUNET_NETWORK_STRUCT_END #endif diff --git a/src/set/set_api.c b/src/set/set_api.c index 5838680b9e..c74933aa0b 100644 --- a/src/set/set_api.c +++ b/src/set/set_api.c @@ -33,6 +33,7 @@ #define LOG(kind,...) GNUNET_log_from (kind, "set-api",__VA_ARGS__) + /** * Opaque handle to a set. */ @@ -52,13 +53,33 @@ struct GNUNET_SET_Request int accepted; }; - struct GNUNET_SET_OperationHandle { GNUNET_SET_ResultIterator result_cb; void *result_cls; + + /** + * Local set used for the operation, + * NULL if no set has been provided by conclude yet. + */ struct GNUNET_SET_Handle *set; + + /** + * Request ID to identify the operation within the set. + */ uint32_t request_id; + + /** + * Message sent to the server on calling conclude, + * NULL if conclude has been called. + */ + struct GNUNET_MQ_Message *conclude_mqm; + + /** + * Address of the request if in the conclude message, + * used to patch the request id into the message when the set is known. + */ + uint32_t *request_id_addr; }; @@ -83,18 +104,21 @@ struct GNUNET_SET_ListenHandle static void handle_result (void *cls, const struct GNUNET_MessageHeader *mh) { - struct ResultMessage *msg = (struct ResultMessage *) mh; + struct GNUNET_SET_ResultMessage *msg = (struct GNUNET_SET_ResultMessage *) mh; struct GNUNET_SET_Handle *set = cls; struct GNUNET_SET_OperationHandle *oh; struct GNUNET_SET_Element e; + + GNUNET_assert (NULL != set); + GNUNET_assert (NULL != set->mq); + if (set->messages_since_ack >= GNUNET_SET_ACK_WINDOW/2) { struct GNUNET_MQ_Message *mqm; mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_ACK); GNUNET_MQ_send (set->mq, mqm); } - oh = GNUNET_MQ_assoc_get (set->mq, ntohl (msg->request_id)); GNUNET_assert (NULL != oh); /* status is not STATUS_OK => there's no attached element, @@ -109,7 +133,7 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) } e.data = &msg[1]; - e.size = ntohs (mh->size) - sizeof (struct ResultMessage); + e.size = ntohs (mh->size) - sizeof (struct GNUNET_SET_ResultMessage); e.type = msg->element_type; if (NULL != oh->result_cb) oh->result_cb (oh->result_cls, &e, htons (msg->result_status)); @@ -124,28 +148,34 @@ handle_result (void *cls, const struct GNUNET_MessageHeader *mh) static void handle_request (void *cls, const struct GNUNET_MessageHeader *mh) { - struct RequestMessage *msg = (struct RequestMessage *) mh; + struct GNUNET_SET_RequestMessage *msg = (struct GNUNET_SET_RequestMessage *) mh; struct GNUNET_SET_ListenHandle *lh = cls; struct GNUNET_SET_Request *req; + struct GNUNET_MessageHeader *context_msg; + LOG (GNUNET_ERROR_TYPE_INFO, "processing request\n"); req = GNUNET_new (struct GNUNET_SET_Request); req->accept_id = ntohl (msg->accept_id); + context_msg = GNUNET_MQ_extract_nested_mh (msg); /* calling GNUNET_SET_accept in the listen cb will set req->accepted */ - lh->listen_cb (lh->listen_cls, &msg->peer_id, &mh[1], req); + lh->listen_cb (lh->listen_cls, &msg->peer_id, context_msg, req); if (GNUNET_NO == req->accepted) { struct GNUNET_MQ_Message *mqm; - struct AcceptMessage *amsg; - - mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); + struct GNUNET_SET_AcceptRejectMessage *amsg; + + mqm = GNUNET_MQ_msg (amsg, GNUNET_MESSAGE_TYPE_SET_REJECT); /* no request id, as we refused */ amsg->request_id = htonl (0); - amsg->accept_id = msg->accept_id; + amsg->accept_reject_id = msg->accept_id; GNUNET_MQ_send (lh->mq, mqm); GNUNET_free (req); + LOG (GNUNET_ERROR_TYPE_INFO, "rejecting request\n"); } + LOG (GNUNET_ERROR_TYPE_INFO, "processed op request from service\n"); + /* the accept-case is handled in GNUNET_SET_accept, * as we have the accept message available there */ } @@ -168,7 +198,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, { struct GNUNET_SET_Handle *set; struct GNUNET_MQ_Message *mqm; - struct SetCreateMessage *msg; + struct GNUNET_SET_CreateMessage *msg; static const struct GNUNET_MQ_Handler mq_handlers[] = { {handle_result, GNUNET_MESSAGE_TYPE_SET_RESULT}, GNUNET_MQ_HANDLERS_END @@ -179,6 +209,7 @@ GNUNET_SET_create (const struct GNUNET_CONFIGURATION_Handle *cfg, LOG (GNUNET_ERROR_TYPE_INFO, "set client created\n"); GNUNET_assert (NULL != set->client); set->mq = GNUNET_MQ_queue_for_connection_client (set->client, mq_handlers, set); + GNUNET_assert (NULL != set->mq); mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_CREATE); msg->operation = htons (op); GNUNET_MQ_send (set->mq, mqm); @@ -204,7 +235,7 @@ GNUNET_SET_add_element (struct GNUNET_SET_Handle *set, void *cont_cls) { struct GNUNET_MQ_Message *mqm; - struct ElementMessage *msg; + struct GNUNET_SET_ElementMessage *msg; mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_ADD); msg->element_type = element->type; @@ -232,7 +263,7 @@ GNUNET_SET_remove_element (struct GNUNET_SET_Handle *set, void *cont_cls) { struct GNUNET_MQ_Message *mqm; - struct ElementMessage *msg; + struct GNUNET_SET_ElementMessage *msg; mqm = GNUNET_MQ_msg_extra (msg, element->size, GNUNET_MESSAGE_TYPE_SET_REMOVE); msg->element_type = element->type; @@ -256,10 +287,10 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) /** - * Evaluate a set operation with our set and the set of another peer. + * Create a set operation for evaluation with another peer. + * The evaluation will not start until the client provides + * a local set with GNUNET_SET_conclude. * - * @param set set to use - * @param salt salt for HKDF (explain more here) * @param other_peer peer with the other set * @param app_id hash for the application using the set * @param context_msg additional information for the request @@ -273,8 +304,7 @@ GNUNET_SET_destroy (struct GNUNET_SET_Handle *set) * @return a handle to cancel the operation */ struct GNUNET_SET_OperationHandle * -GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, - const struct GNUNET_PeerIdentity *other_peer, +GNUNET_SET_evaluate (const struct GNUNET_PeerIdentity *other_peer, const struct GNUNET_HashCode *app_id, const struct GNUNET_MessageHeader *context_msg, uint16_t salt, @@ -283,24 +313,24 @@ GNUNET_SET_evaluate (struct GNUNET_SET_Handle *set, void *result_cls) { struct GNUNET_MQ_Message *mqm; - struct EvaluateMessage *msg; struct GNUNET_SET_OperationHandle *oh; + struct GNUNET_SET_EvaluateMessage *msg; oh = GNUNET_new (struct GNUNET_SET_OperationHandle); oh->result_cb = result_cb; oh->result_cls = result_cls; - oh->set = set; - mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE); - msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, mqm, oh)); - msg->peer = *other_peer; - msg->app_id = *app_id; - + mqm = GNUNET_MQ_msg_nested_mh (msg, GNUNET_MESSAGE_TYPE_SET_EVALUATE, context_msg); + if (NULL != context_msg) - if (GNUNET_OK != GNUNET_MQ_nest (mqm, context_msg, ntohs (context_msg->size))) - GNUNET_assert (0); - - GNUNET_MQ_send (set->mq, mqm); + LOG (GNUNET_ERROR_TYPE_INFO, "passed context msg\n"); + + msg->app_id = *app_id; + msg->target_peer = *other_peer; + msg->salt = salt; + msg->reserved = 0; + oh->conclude_mqm = mqm; + oh->request_id_addr = &msg->request_id; return oh; } @@ -327,7 +357,7 @@ GNUNET_SET_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, { struct GNUNET_SET_ListenHandle *lh; struct GNUNET_MQ_Message *mqm; - struct ListenMessage *msg; + struct GNUNET_SET_ListenMessage *msg; static const struct GNUNET_MQ_Handler mq_handlers[] = { {handle_request, GNUNET_MESSAGE_TYPE_SET_REQUEST}, GNUNET_MQ_HANDLERS_END @@ -363,10 +393,13 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) /** - * Accept a request we got via GNUNET_SET_listen. + * Accept a request we got via GNUNET_SET_listen. Must be called during + * GNUNET_SET_listen, as the 'struct GNUNET_SET_Request' becomes invalid + * afterwards. + * Call GNUNET_SET_conclude to provide the local set to use for the operation, + * and to begin the exchange with the remote peer. * * @param request request to accept - * @param set set used for the requested operation * @param result_mode specified how results will be returned, * see 'GNUNET_SET_ResultMode'. * @param result_cb callback for the results @@ -375,28 +408,26 @@ GNUNET_SET_listen_cancel (struct GNUNET_SET_ListenHandle *lh) */ struct GNUNET_SET_OperationHandle * GNUNET_SET_accept (struct GNUNET_SET_Request *request, - struct GNUNET_SET_Handle *set, enum GNUNET_SET_ResultMode result_mode, GNUNET_SET_ResultIterator result_cb, - void *result_cls) + void *cls) { struct GNUNET_MQ_Message *mqm; - struct AcceptMessage *msg; struct GNUNET_SET_OperationHandle *oh; + struct GNUNET_SET_AcceptRejectMessage *msg; - /* don't accept a request twice! */ GNUNET_assert (GNUNET_NO == request->accepted); request->accepted = GNUNET_YES; oh = GNUNET_new (struct GNUNET_SET_OperationHandle); oh->result_cb = result_cb; - oh->result_cls = result_cls; - oh->set = set; + oh->result_cls = cls; - mqm = GNUNET_MQ_msg (msg , GNUNET_MESSAGE_TYPE_SET_ACCEPT); - msg->request_id = htonl (GNUNET_MQ_assoc_add (set->mq, NULL, oh)); - msg->accept_id = htonl (request->accept_id); - GNUNET_MQ_send (set->mq, mqm); + mqm = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_SET_ACCEPT); + msg->accept_reject_id = htonl (request->accept_id); + + oh->conclude_mqm = mqm; + oh->request_id_addr = &msg->request_id; return oh; } @@ -413,10 +444,43 @@ GNUNET_SET_operation_cancel (struct GNUNET_SET_OperationHandle *oh) struct GNUNET_MQ_Message *mqm; struct GNUNET_SET_OperationHandle *h_assoc; - h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); - GNUNET_assert (h_assoc == oh); - mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); - GNUNET_MQ_send (oh->set->mq, mqm); + if (NULL != oh->set) + { + h_assoc = GNUNET_MQ_assoc_remove (oh->set->mq, oh->request_id); + GNUNET_assert (h_assoc == oh); + mqm = GNUNET_MQ_msg_header (GNUNET_MESSAGE_TYPE_SET_CANCEL); + GNUNET_MQ_send (oh->set->mq, mqm); + } + + if (NULL != oh->conclude_mqm) + GNUNET_MQ_discard (oh->conclude_mqm); + GNUNET_free (oh); } + +/** + * Conclude the given set operation using the given set. + * This function is called once we have fully constructed + * the set that we want to use for the operation. At this + * time, the P2P protocol can then begin to exchange the + * set information and call the result callback with the + * result information. + * + * @param oh handle to the set operation + * @param set the set to use for the operation + */ +void +GNUNET_SET_conclude (struct GNUNET_SET_OperationHandle *oh, + struct GNUNET_SET_Handle *set) +{ + GNUNET_assert (NULL == oh->set); + GNUNET_assert (NULL != oh->conclude_mqm); + oh->set = set; + oh->request_id = GNUNET_MQ_assoc_add (oh->set->mq, NULL, oh); + *oh->request_id_addr = htonl (oh->request_id); + GNUNET_MQ_send (oh->set->mq, oh->conclude_mqm); + oh->conclude_mqm = NULL; + oh->request_id_addr = NULL; +} + diff --git a/src/set/test_set.conf b/src/set/test_set.conf index 34b7a8d2fd..7bc26ed7ed 100644 --- a/src/set/test_set.conf +++ b/src/set/test_set.conf @@ -8,8 +8,8 @@ PORT = 2106 HOSTNAME = localhost HOME = $SERVICEHOME BINARY = gnunet-service-set -#PREFIX = gdbserver :12345 #PREFIX = valgrind --leak-check=full +#PREFIX = gdbserver :1234 ACCEPT_FROM = 127.0.0.1; ACCEPT_FROM6 = ::1; UNIXPATH = /tmp/gnunet-service-set.sock diff --git a/src/set/test_set_api.c b/src/set/test_set_api.c index bf0d656972..f773cebdf9 100644 --- a/src/set/test_set_api.c +++ b/src/set/test_set_api.c @@ -20,7 +20,7 @@ /** * @file set/test_set_api.c - * @brief testcase for consensus_api.c + * @brief testcase for set_api.c */ #include "platform.h" #include "gnunet_util_lib.h" @@ -89,11 +89,13 @@ listen_cb (void *cls, const struct GNUNET_MessageHeader *context_msg, struct GNUNET_SET_Request *request) { + struct GNUNET_SET_OperationHandle *oh; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "listen cb called\n"); GNUNET_SET_listen_cancel (listen_handle); - GNUNET_SET_accept (request, set2, - GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); + oh = GNUNET_SET_accept (request, GNUNET_SET_RESULT_ADDED, result_cb_set2, NULL); + GNUNET_SET_conclude (oh, set2); } @@ -105,11 +107,14 @@ listen_cb (void *cls, static void start (void *cls) { + struct GNUNET_SET_OperationHandle *oh; + listen_handle = GNUNET_SET_listen (config, GNUNET_SET_OPERATION_UNION, &app_id, listen_cb, NULL); - GNUNET_SET_evaluate (set1, &local_id, &app_id, NULL, 42, - GNUNET_SET_RESULT_ADDED, - result_cb_set1, NULL); + oh = GNUNET_SET_evaluate (&local_id, &app_id, NULL, 42, + GNUNET_SET_RESULT_ADDED, + result_cb_set1, NULL); + GNUNET_SET_conclude (oh, set1); } @@ -168,12 +173,14 @@ run (void *cls, struct GNUNET_TESTING_Peer *peer) { - static const char* app_str = "gnunet-set"; - config = cfg; + GNUNET_CRYPTO_get_host_identity (cfg, &local_id); + printf ("my id (from CRYPTO): %s\n", GNUNET_h2s (&local_id.hashPubKey)); GNUNET_TESTING_peer_get_identity (peer, &local_id); + printf ("my id (from TESTING): %s\n", GNUNET_h2s (&local_id.hashPubKey)); set1 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); set2 = GNUNET_SET_create (cfg, GNUNET_SET_OPERATION_UNION); + GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK, &app_id); init_set1 (); } diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index b4a47b53d4..34f1ea0fae 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -3785,7 +3785,29 @@ mq_stream_write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size struct MQStreamState *mss = (struct MQStreamState *) mq->impl_state; struct GNUNET_MQ_Message *mqm; - GNUNET_assert (GNUNET_STREAM_OK == status); + switch (status) + { + case GNUNET_STREAM_OK: + break; + case GNUNET_STREAM_SHUTDOWN: + /* FIXME: call shutdown handler */ + return; + case GNUNET_STREAM_TIMEOUT: + if (NULL == mq->error_handler) + LOG (GNUNET_ERROR_TYPE_WARNING, "write timeout, but no error handler installed for message queue\n"); + else + mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); + return; + case GNUNET_STREAM_SYSERR: + if (NULL == mq->error_handler) + LOG (GNUNET_ERROR_TYPE_WARNING, "write error, but no error handler installed for message queue\n"); + else + mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_WRITE); + return; + default: + GNUNET_assert (0); + return; + } /* call cb for message we finished sending */ mqm = mq->current_msg; @@ -3863,21 +3885,53 @@ mq_stream_mst_callback (void *cls, void *client, */ static size_t mq_stream_data_processor (void *cls, - enum GNUNET_STREAM_Status status, - const void *data, - size_t size) + enum GNUNET_STREAM_Status status, + const void *data, + size_t size) { struct GNUNET_MQ_MessageQueue *mq = cls; struct MQStreamState *mss; int ret; + + switch (status) + { + case GNUNET_STREAM_OK: + break; + case GNUNET_STREAM_SHUTDOWN: + /* FIXME: call shutdown handler */ + return 0; + case GNUNET_STREAM_TIMEOUT: + if (NULL == mq->error_handler) + LOG (GNUNET_ERROR_TYPE_WARNING, "read timeout, but no error handler installed for message queue\n"); + else + mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_TIMEOUT); + return 0; + case GNUNET_STREAM_SYSERR: + if (NULL == mq->error_handler) + LOG (GNUNET_ERROR_TYPE_WARNING, "read error, but no error handler installed for message queue\n"); + else + mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + return 0; + default: + GNUNET_assert (0); + return 0; + } mss = (struct MQStreamState *) mq->impl_state; GNUNET_assert (GNUNET_STREAM_OK == status); ret = GNUNET_SERVER_mst_receive (mss->mst, NULL, data, size, GNUNET_NO, GNUNET_NO); - GNUNET_assert (GNUNET_OK == ret); + if (GNUNET_OK != ret) + { + if (NULL == mq->error_handler) + LOG (GNUNET_ERROR_TYPE_WARNING, + "read error (message stream malformed), but no error handler installed for message queue\n"); + else + mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + return 0; + } /* we always read all data */ - mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, - mq_stream_data_processor, mq); + mss->rh = GNUNET_STREAM_read (mss->socket, GNUNET_TIME_UNIT_FOREVER_REL, + mq_stream_data_processor, mq); return size; } @@ -3935,6 +3989,7 @@ GNUNET_STREAM_mq_create (struct GNUNET_STREAM_Socket *socket, mq->destroy_impl = mq_stream_destroy_impl; mq->handlers = msg_handlers; mq->handlers_cls = cls; + mq->error_handler = error_handler; if (NULL != msg_handlers) { mss->mst = GNUNET_SERVER_mst_create (mq_stream_mst_callback, mq); diff --git a/src/util/crypto_hash.c b/src/util/crypto_hash.c index fca66aed4e..2e436c4549 100644 --- a/src/util/crypto_hash.c +++ b/src/util/crypto_hash.c @@ -339,7 +339,7 @@ GNUNET_CRYPTO_hash_distance_u32 (const struct GNUNET_HashCode * a, */ void GNUNET_CRYPTO_hash_create_random (enum GNUNET_CRYPTO_Quality mode, - struct GNUNET_HashCode * result) + struct GNUNET_HashCode *result) { int i; diff --git a/src/util/mq.c b/src/util/mq.c index 36cacd30b0..dc87b97114 100644 --- a/src/util/mq.c +++ b/src/util/mq.c @@ -119,33 +119,31 @@ GNUNET_MQ_msg_ (struct GNUNET_MessageHeader **mhp, uint16_t size, uint16_t type) } -int -GNUNET_MQ_nest_ (struct GNUNET_MQ_Message **mqmp, - const void *data, uint16_t len) +struct GNUNET_MQ_Message * +GNUNET_MQ_msg_nested_mh_ (struct GNUNET_MessageHeader **mhp, uint16_t base_size, uint16_t type, + const struct GNUNET_MessageHeader *nested_mh) { - size_t new_size; - size_t old_size; - - GNUNET_assert (NULL != mqmp); - /* there's no data to append => do nothing */ - if (NULL == data) - return GNUNET_OK; - old_size = ntohs ((*mqmp)->mh->size); - /* message too large to concatenate? */ - if (((uint16_t) (old_size + len)) < len) - return GNUNET_SYSERR; - new_size = old_size + len; - *mqmp = GNUNET_realloc (*mqmp, sizeof (struct GNUNET_MQ_Message) + new_size); - (*mqmp)->mh = (struct GNUNET_MessageHeader *) &(*mqmp)[1]; - memcpy (((void *) (*mqmp)->mh) + old_size, data, new_size - old_size); - (*mqmp)->mh->size = htons (new_size); - return GNUNET_OK; -} + struct GNUNET_MQ_Message *mqm; + uint16_t size; + + if (NULL == nested_mh) + return GNUNET_MQ_msg_ (mhp, base_size, type); + size = base_size + ntohs (nested_mh->size); + /* check for uint16_t overflow */ + if (size < base_size) + return NULL; + + mqm = GNUNET_MQ_msg_ (mhp, size, type); + memcpy ((char *) mqm->mh + base_size, nested_mh, ntohs (nested_mh->size)); + + return mqm; +} -/*** Transmit a queued message to the session's client. +/** + * Transmit a queued message to the session's client. * * @param cls consensus session * @param size number of bytes available in buf @@ -265,7 +263,8 @@ handle_client_message (void *cls, { if (NULL == mq->error_handler) LOG (GNUNET_ERROR_TYPE_WARNING, "ignoring read error (no handler installed)\n"); - mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); + else + mq->error_handler (mq->handlers_cls, GNUNET_MQ_ERROR_READ); return; } @@ -479,3 +478,39 @@ GNUNET_MQ_destroy (struct GNUNET_MQ_MessageQueue *mq) GNUNET_free (mq); } + + + +struct GNUNET_MessageHeader * +GNUNET_MQ_extract_nested_mh_ (const struct GNUNET_MessageHeader *mh, uint16_t base_size) +{ + uint16_t whole_size; + uint16_t nested_size; + struct GNUNET_MessageHeader *nested_msg; + + whole_size = ntohs (mh->size); + GNUNET_assert (whole_size >= base_size); + + nested_size = whole_size - base_size; + + if (0 == nested_size) + return NULL; + + if (nested_size < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return NULL; + } + + nested_msg = (struct GNUNET_MessageHeader *) ((char *) mh + base_size); + + if (ntohs (nested_msg->size) != nested_size) + { + GNUNET_break_op (0); + nested_msg->size = htons (nested_size); + } + + return nested_msg; +} + + diff --git a/src/util/test_mq.c b/src/util/test_mq.c index 161b40a207..55cd80ef14 100644 --- a/src/util/test_mq.c +++ b/src/util/test_mq.c @@ -58,35 +58,6 @@ void test2 (void) { struct GNUNET_MQ_Message *mqm; - struct MyMessage *mm; - int res; - char *s = "foo"; - - mqm = GNUNET_MQ_msg (mm, 42); - res = GNUNET_MQ_nest (mqm, s, strlen(s)); - GNUNET_assert (GNUNET_OK == res); - res = GNUNET_MQ_nest (mqm, s, strlen(s)); - GNUNET_assert (GNUNET_OK == res); - res = GNUNET_MQ_nest (mqm, NULL, 0); - GNUNET_assert (GNUNET_OK == res); - - GNUNET_assert (strlen (s) * 2 + sizeof (struct MyMessage) == ntohs (mm->header.size)); - - res = GNUNET_MQ_nest_mh (mqm, &mm->header); - GNUNET_assert (GNUNET_OK == res); - GNUNET_assert (2 * (strlen (s) * 2 + sizeof (struct MyMessage)) == ntohs (mm->header.size)); - - res = GNUNET_MQ_nest (mqm, (void *) 0xF00BA, 0xFFF0); - GNUNET_assert (GNUNET_OK != res); - - GNUNET_MQ_discard (mqm); -} - - -void -test3 (void) -{ - struct GNUNET_MQ_Message *mqm; struct GNUNET_MessageHeader *mh; mqm = GNUNET_MQ_msg_header (42); @@ -107,7 +78,6 @@ main (int argc, char **argv) GNUNET_log_setup ("test-mq", "INFO", NULL); test1 (); test2 (); - test3 (); return 0; } |