diff options
-rw-r--r-- | src/consensus/consensus_api.c | 32 | ||||
-rw-r--r-- | src/consensus/consensus_protocol.h | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 477 | ||||
-rw-r--r-- | src/consensus/ibf.c | 2 | ||||
-rw-r--r-- | src/consensus/test_consensus.conf | 1 |
6 files changed, 453 insertions, 67 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 5c0494254c..ba0e69e48d 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -176,7 +176,6 @@ transmit_queued (void *cls, size_t size, qmsg = consensus->messages_head; GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); - GNUNET_assert (qmsg); if (NULL == buf) { @@ -196,8 +195,8 @@ transmit_queued (void *cls, size_t size, { qmsg->idc (qmsg->idc_cls, GNUNET_YES); } - GNUNET_free (qmsg->msg); - GNUNET_free (qmsg); + + /* FIXME: free the messages */ send_next (consensus); @@ -218,7 +217,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) if (NULL != consensus->messages_head) { - LOG (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); consensus->th = GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), GNUNET_TIME_UNIT_FOREVER_REL, @@ -226,6 +224,15 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) } } +static void +queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) +{ + struct QueuedMessage *qm; + qm = GNUNET_malloc (sizeof *qm); + qm->msg = msg; + GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); +} + /** * Called when the server has sent is a new element @@ -239,23 +246,24 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, { struct GNUNET_CONSENSUS_Element element; struct GNUNET_CONSENSUS_AckMessage *ack_msg; - struct QueuedMessage *queued_msg; int ret; + LOG (GNUNET_ERROR_TYPE_INFO, "received new element\n"); + element.type = msg->element_type; - element.size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); + element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); element.data = &msg[1]; ret = consensus->new_element_cb (consensus->new_element_cls, &element); - queued_msg = GNUNET_malloc (sizeof (struct QueuedMessage) + sizeof (struct GNUNET_CONSENSUS_AckMessage)); - queued_msg->msg = (struct GNUNET_MessageHeader *) &queued_msg[1]; - - ack_msg = (struct GNUNET_CONSENSUS_AckMessage *) queued_msg->msg; + ack_msg = GNUNET_malloc (sizeof *ack_msg); + ack_msg->header.size = htons (sizeof *ack_msg); + ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); ack_msg->keep = ret; - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, - queued_msg); + queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg); + + send_next (consensus); } diff --git a/src/consensus/consensus_protocol.h b/src/consensus/consensus_protocol.h index 105708ee9e..c84aad2639 100644 --- a/src/consensus/consensus_protocol.h +++ b/src/consensus/consensus_protocol.h @@ -49,13 +49,15 @@ struct StrataMessage struct DifferenceDigest { - struct GNUNET_MessageHeader header; + uint8_t order; + uint8_t round; }; struct Element { struct GNUNET_MessageHeader header; + struct GNUNET_HashCode hash; }; struct ConsensusHello diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index c8a5593f17..222ec3e9df 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c @@ -177,6 +177,7 @@ static int new_element_cb (void *cls, struct GNUNET_CONSENSUS_Element *element) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); return GNUNET_YES; } @@ -263,8 +264,11 @@ test_master (void *cls, int i; + GNUNET_log_setup ("gnunet-consensus", "INFO", NULL); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test master\n"); + peers = started_peers; peer_ids = GNUNET_malloc (num_peers * sizeof (struct GNUNET_PeerIdentity)); diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index ad02669549..e80bee331c 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -55,12 +55,16 @@ */ #define STRATA_PER_MESSAGE ((1<<15) / (IBF_BUCKET_SIZE * STRATA_IBF_BUCKETS)) +#define BUCKETS_PER_MESSAGE ((1<<15) / IBF_BUCKET_SIZE) + +#define MAX_IBF_ORDER (64) /* forward declarations */ struct ConsensusSession; struct IncomingSocket; +struct ConsensusPeerInformation; static void send_next (struct ConsensusSession *session); @@ -68,6 +72,12 @@ send_next (struct ConsensusSession *session); static void write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size); +static void +write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size); + +static void +write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size); + static int get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); @@ -91,6 +101,9 @@ struct PendingElement * The actual element */ struct GNUNET_CONSENSUS_Element *element; + + /* peer this element is coming from */ + struct ConsensusPeerInformation *cpi; }; struct ConsensusPeerInformation @@ -130,13 +143,21 @@ struct ConsensusPeerInformation */ int strata_counter; - struct InvertibleBloomFilter *my_ibf; + int ibf_order; + + struct InvertibleBloomFilter *outgoing_ibf; - int my_ibf_bucket_counter; + int outgoing_bucket_counter; - struct InvertibleBloomFilter *peer_ibf; + struct InvertibleBloomFilter *incoming_ibf; - int peer_ibf_bucket_counter; + int incoming_bucket_counter; + + /** + * NULL or incoming_ibf - outgoing_ibf. + * Decoded values of side '1' are to be requested from the the peer. + */ + struct InvertibleBloomFilter *diff_ibf; /** * Strata estimator of the peer, NULL if our peer @@ -144,6 +165,8 @@ struct ConsensusPeerInformation */ struct InvertibleBloomFilter **strata; + unsigned int diff; + struct GNUNET_SERVER_MessageStreamTokenizer *mst; struct ConsensusSession *session; @@ -206,16 +229,6 @@ struct ConsensusSession struct GNUNET_CONTAINER_MultiHashMap *values; /** - * Elements that have not been sent to the client yet. - */ - struct PendingElement *transmit_pending_head; - - /** - * Elements that have not been sent to the client yet. - */ - struct PendingElement *transmit_pending_tail; - - /** * Elements that have not been approved (or rejected) by the client yet. */ struct PendingElement *approval_pending_head; @@ -281,6 +294,8 @@ struct ConsensusSession GNUNET_SCHEDULER_TaskIdentifier round_timeout_tid; struct InvertibleBloomFilter **strata; + + struct InvertibleBloomFilter **ibfs; }; @@ -365,6 +380,16 @@ static struct GNUNET_CORE_Handle *core; static struct GNUNET_STREAM_ListenSocket *listener; +static void +queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) +{ + struct QueuedMessage *qm; + qm = GNUNET_malloc (sizeof *qm); + qm->msg = msg; + GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); +} + + static int estimate_difference (struct InvertibleBloomFilter** strata1, struct InvertibleBloomFilter** strata2) @@ -400,6 +425,7 @@ estimate_difference (struct InvertibleBloomFilter** strata1, } + /** * Functions of this signature are called whenever data is available from the * stream. @@ -412,7 +438,48 @@ estimate_difference (struct InvertibleBloomFilter** strata1, * given to the next time the read processor is called). */ static size_t -stream_data_processor (void *cls, +session_stream_data_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *data, + size_t size) +{ + struct ConsensusPeerInformation *cpi; + int ret; + + GNUNET_assert (GNUNET_STREAM_OK == status); + + cpi = cls; + + GNUNET_assert (NULL != cpi->mst); + + ret = GNUNET_SERVER_mst_receive (cpi->mst, cpi, data, size, GNUNET_NO, GNUNET_YES); + if (GNUNET_SYSERR == ret) + { + /* FIXME: handle this correctly */ + GNUNET_assert (0); + } + + /* read again */ + cpi->rh = GNUNET_STREAM_read (cpi->socket, GNUNET_TIME_UNIT_FOREVER_REL, + &session_stream_data_processor, cpi); + + /* we always read all data */ + return size; +} + +/** + * Functions of this signature are called whenever data is available from the + * stream. + * + * @param cls the closure from GNUNET_STREAM_read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read; will be 0 on timeout + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +incoming_stream_data_processor (void *cls, enum GNUNET_STREAM_Status status, const void *data, size_t size) @@ -422,9 +489,9 @@ stream_data_processor (void *cls, GNUNET_assert (GNUNET_STREAM_OK == status); - incoming = (struct IncomingSocket *) cls; + incoming = cls; - ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_NO); + ret = GNUNET_SERVER_mst_receive (incoming->mst, incoming, data, size, GNUNET_NO, GNUNET_YES); if (GNUNET_SYSERR == ret) { /* FIXME: handle this correctly */ @@ -433,12 +500,46 @@ stream_data_processor (void *cls, /* read again */ incoming->rh = GNUNET_STREAM_read (incoming->socket, GNUNET_TIME_UNIT_FOREVER_REL, - &stream_data_processor, incoming); + &incoming_stream_data_processor, incoming); /* we always read all data */ return size; } + +/** + * Iterator over hash map entries. + * + * @param cls closure + * @param key current key code + * @param value value in the hash map + * @return GNUNET_YES if we should continue to + * iterate, + * GNUNET_NO if not. + */ +static int +ibf_values_iterator (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct ConsensusPeerInformation *cpi; + cpi = cls; + ibf_insert (cpi->session->ibfs[cpi->ibf_order], key); + return GNUNET_YES; +} + + +static void +create_outgoing_ibf (struct ConsensusPeerInformation *cpi) +{ + if (NULL == cpi->session->ibfs[cpi->ibf_order]) + { + cpi->session->ibfs[cpi->ibf_order] = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); + GNUNET_CONTAINER_multihashmap_iterate (cpi->session->values, ibf_values_iterator, cpi); + } + cpi->outgoing_ibf = ibf_dup (cpi->session->ibfs[cpi->ibf_order]); +} + static int handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMessage *strata_msg) { @@ -477,8 +578,6 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess for (i = 0; i < num_strata; i++) { - uint8_t zero[STRATA_IBF_BUCKETS]; - memset (zero, 0, STRATA_IBF_BUCKETS); memcpy (cpi->strata[cpi->strata_counter+i]->count, count_src, STRATA_IBF_BUCKETS); count_src += STRATA_IBF_BUCKETS; } @@ -489,9 +588,17 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess if (STRATA_COUNT == cpi->strata_counter) { - int diff; - diff = estimate_difference (cpi->session->strata, cpi->strata); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "diff=%d\n", diff); + + cpi->diff = estimate_difference (cpi->session->strata, cpi->strata); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received strata, diff=%d\n", cpi->diff); + cpi->ibf_order = 0; + while ((1 << cpi->ibf_order) < cpi->diff) + cpi->ibf_order++; + if (cpi->ibf_order > MAX_IBF_ORDER) + cpi->ibf_order = MAX_IBF_ORDER; + cpi->ibf_order += 2; + create_outgoing_ibf (cpi); + write_ibf (cpi, GNUNET_STREAM_OK, 0); } return GNUNET_YES; @@ -499,15 +606,97 @@ handle_p2p_strata (struct ConsensusPeerInformation *cpi, const struct StrataMess static int -handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *strata) +handle_p2p_ibf (struct ConsensusPeerInformation *cpi, const struct DifferenceDigest *digest) { + struct GNUNET_HashCode *hash_src; + int num_buckets; + uint8_t *count_src; + + num_buckets = (ntohs (digest->header.size) - (sizeof *digest)) / IBF_BUCKET_SIZE; + + if (cpi->is_outgoing == GNUNET_YES) + { + /* we receive the ibf as an initiator, thus we're interested in the order */ + cpi->ibf_order = digest->order; + if ((0 == cpi->outgoing_bucket_counter) && (NULL == cpi->wh)) + { + create_outgoing_ibf (cpi); + write_ibf (cpi, GNUNET_STREAM_OK, 0); + } + /* FIXME: ensure that orders do not differ each time */ + } + else + { + /* FIXME: handle correctly */ + GNUNET_assert (cpi->ibf_order == digest->order); + } + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving %d buckets at %d of %d\n", num_buckets, cpi->incoming_bucket_counter, (1 << cpi->ibf_order)); + + if (cpi->incoming_bucket_counter + num_buckets > (1 << cpi->ibf_order)) + { + /* TODO: handle this */ + GNUNET_assert (0); + } + + if (NULL == cpi->incoming_ibf) + cpi->incoming_ibf = ibf_create (1 << cpi->ibf_order, STRATA_HASH_NUM, 0); + + hash_src = (struct GNUNET_HashCode *) &digest[1]; + + memcpy (cpi->incoming_ibf->hash_sum, hash_src, num_buckets * sizeof *hash_src); + hash_src += num_buckets; + + memcpy (cpi->incoming_ibf->id_sum, hash_src, num_buckets * sizeof *hash_src); + hash_src += num_buckets; + + count_src = (uint8_t *) hash_src; + + memcpy (cpi->incoming_ibf->count, count_src, num_buckets * sizeof *count_src); + + cpi->incoming_bucket_counter += num_buckets; + + if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received full ibf\n"); + if ((NULL == cpi->wh) && (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order))) + write_values (cpi, GNUNET_STREAM_OK, 0); + } return GNUNET_YES; } static int -handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct Element *strata) +handle_p2p_element (struct ConsensusPeerInformation *cpi, const struct GNUNET_MessageHeader *element_msg) { + struct PendingElement *pending_element; + struct GNUNET_CONSENSUS_Element *element; + struct GNUNET_CONSENSUS_ElementMessage *client_element_msg; + size_t size; + + size = ntohs (element_msg->size) - sizeof *element_msg; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "receiving element, size=%d\n", size); + + element = GNUNET_malloc (size + sizeof *element); + element->size = size; + memcpy (&element[1], &element_msg[1], size); + + pending_element = GNUNET_malloc (sizeof *pending_element); + pending_element->element = element; + GNUNET_CONTAINER_DLL_insert_tail (cpi->session->approval_pending_head, cpi->session->approval_pending_tail, pending_element); + + client_element_msg = GNUNET_malloc (size + sizeof *client_element_msg); + client_element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT); + client_element_msg->header.size = htons (size + sizeof *client_element_msg); + memcpy (&client_element_msg[1], &element[1], size); + + queue_client_message (cpi->session, (struct GNUNET_MessageHeader *) client_element_msg); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received element\n"); + + send_next (cpi->session); + return GNUNET_YES; } @@ -556,16 +745,17 @@ static int mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message) { struct ConsensusPeerInformation *cpi; - cpi = (struct ConsensusPeerInformation *) cls; - switch (ntohs( message->type)) + cpi = cls; + switch (ntohs (message->type)) { case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DELTA_ESTIMATE: return handle_p2p_strata (cpi, (struct StrataMessage *) message); case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST: return handle_p2p_ibf (cpi, (struct DifferenceDigest *) message); case GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS: - return handle_p2p_element (cpi, (struct Element *) message); + return handle_p2p_element (cpi, message); default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "unexpected message type from peer: %u\n", ntohs (message->type)); /* FIXME: handle correctly */ GNUNET_assert (0); } @@ -632,7 +822,7 @@ listen_cb (void *cls, incoming->peer = GNUNET_memdup (initiator, sizeof *initiator); incoming->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, - &stream_data_processor, incoming); + &incoming_stream_data_processor, incoming); incoming->mst = GNUNET_SERVER_mst_create (mst_incoming_callback, incoming); @@ -727,7 +917,7 @@ transmit_queued (void *cls, size_t size, struct QueuedMessage *qmsg; size_t msg_size; - session = (struct ConsensusSession *) cls; + session = cls; session->th = NULL; @@ -773,7 +963,6 @@ send_next (struct ConsensusSession *session) { int msize; msize = ntohs (session->client_messages_head->msg->size); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "scheduling queued\n"); session->th = GNUNET_SERVER_notify_transmit_ready (session->client, msize, GNUNET_TIME_UNIT_FOREVER_REL, &transmit_queued, session); @@ -821,13 +1010,11 @@ hello_cont (void *cls, enum GNUNET_STREAM_Status status, size_t size) { struct ConsensusPeerInformation *cpi; - cpi = (struct ConsensusPeerInformation *) cls; + cpi = cls; cpi->hello = GNUNET_YES; GNUNET_assert (GNUNET_STREAM_OK == status); - cpi = (struct ConsensusPeerInformation *) cls; - if (cpi->session->conclude_requested) { write_strata (cpi, GNUNET_STREAM_OK, 0); @@ -848,7 +1035,7 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) struct ConsensusHello *hello; - cpi = (struct ConsensusPeerInformation *) cls; + cpi = cls; cpi->is_connected = GNUNET_YES; hello = GNUNET_malloc (sizeof *hello); @@ -856,10 +1043,12 @@ open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) hello->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_HELLO); memcpy (&hello->global_id, &cpi->session->global_id, sizeof (struct GNUNET_HashCode)); - cpi->wh = GNUNET_STREAM_write (socket, hello, sizeof *hello, GNUNET_TIME_UNIT_FOREVER_REL, hello_cont, cpi); + cpi->rh = GNUNET_STREAM_read (socket, GNUNET_TIME_UNIT_FOREVER_REL, + &session_stream_data_processor, cpi); + } @@ -874,18 +1063,19 @@ initialize_session_info (struct ConsensusSession *session) /* initialize back-references, so consensus peer information can * be used as closure */ session->info[i].session = session; - } - last = (session->local_peer_idx + (session->num_peers / 2)) % session->num_peers; + last = (session->local_peer_idx + ((session->num_peers - 1) / 2) + 1) % session->num_peers; i = (session->local_peer_idx + 1) % session->num_peers; while (i != last) { session->info[i].is_outgoing = GNUNET_YES; session->info[i].socket = GNUNET_STREAM_open (cfg, &session->peers[i], GNUNET_APPLICATION_TYPE_CONSENSUS, open_cb, &session->info[i], GNUNET_STREAM_OPTION_END); - session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, session); + session->info[i].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[i]); i = (i + 1) % session->num_peers; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d\n", session->local_peer_idx, i); } // tie-breaker for even number of peers if (((session->num_peers % 2) == 0) && (session->local_peer_idx < last)) @@ -893,6 +1083,9 @@ initialize_session_info (struct ConsensusSession *session) session->info[last].is_outgoing = GNUNET_YES; session->info[last].socket = GNUNET_STREAM_open (cfg, &session->peers[last], GNUNET_APPLICATION_TYPE_CONSENSUS, open_cb, &session->info[last], GNUNET_STREAM_OPTION_END); + session->info[last].mst = GNUNET_SERVER_mst_create (mst_session_callback, &session->info[last]); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "peer %d contacts peer %d (tiebreaker)\n", session->local_peer_idx, last); } } @@ -949,9 +1142,6 @@ strata_insert (struct InvertibleBloomFilter **strata, struct GNUNET_HashCode *ke v = key->bits[0]; /* count trailing '1'-bits of v */ for (i = 0; v & 1; v>>=1, i++); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata insert at %d\n", i); - ibf_insert (strata[i], key); } @@ -1001,8 +1191,9 @@ initialize_session (struct ConsensusSession *session) for (i = 0; i < STRATA_COUNT; i++) session->strata[i] = ibf_create (STRATA_IBF_BUCKETS, STRATA_HASH_NUM, 0); - session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); + session->ibfs = GNUNET_malloc (MAX_IBF_ORDER * sizeof (struct InvertibleBloomFilter *)); + session->info = GNUNET_malloc (session->num_peers * sizeof (struct ConsensusPeerInformation)); initialize_session_info (session); GNUNET_free (session->join_msg); @@ -1053,11 +1244,9 @@ client_join (void *cls, if (NULL == my_peer) { GNUNET_SERVER_disable_receive_done_warning (client); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init delayed\n"); return; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "session init now\n"); initialize_session (session); } @@ -1097,7 +1286,7 @@ client_insert (void *cls, } msg = (struct GNUNET_CONSENSUS_ElementMessage *) m; - element_size = msg->header.size - sizeof (struct GNUNET_CONSENSUS_ElementMessage); + element_size = ntohs (msg->header.size )- sizeof (struct GNUNET_CONSENSUS_ElementMessage); element = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Element) + element_size); @@ -1146,7 +1335,8 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) uint8_t *count_dst; int num_strata; - cpi = (struct ConsensusPeerInformation *) cls; + cpi = cls; + cpi->wh = NULL; GNUNET_assert (GNUNET_YES == cpi->is_outgoing); @@ -1156,6 +1346,7 @@ write_strata (void *cls, enum GNUNET_STREAM_Status status, size_t size) if (STRATA_COUNT == cpi->strata_counter) { /* strata have been written, wait for other side's IBF */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata written\n"); return; } @@ -1223,8 +1414,57 @@ static void write_ibf (void *cls, enum GNUNET_STREAM_Status status, size_t size) { struct ConsensusPeerInformation *cpi; + struct DifferenceDigest *digest; + int msize; + struct GNUNET_HashCode *hash_dst; + uint8_t *count_dst; + int num_buckets; + + cpi = cls; + cpi->wh = NULL; + + if (cpi->outgoing_bucket_counter == (1 << cpi->ibf_order)) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "ibf completely written\n"); + if (cpi->incoming_bucket_counter == (1 << cpi->ibf_order)) + write_values (cpi, GNUNET_STREAM_OK, 0); + return; + } + + /* remaining buckets */ + num_buckets = (1 << cpi->ibf_order) - cpi->outgoing_bucket_counter; + + /* limit to maximum */ + if (num_buckets > BUCKETS_PER_MESSAGE) + num_buckets = BUCKETS_PER_MESSAGE; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "writing ibf buckets at %d/%d\n", cpi->outgoing_bucket_counter, (1<<cpi->ibf_order)); + + msize = (sizeof *digest) + (num_buckets * IBF_BUCKET_SIZE); + + digest = GNUNET_malloc (msize); + digest->header.size = htons (msize); + digest->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_DIFFERENCE_DIGEST); + digest->order = cpi->ibf_order; + + hash_dst = (struct GNUNET_HashCode *) &digest[1]; + + memcpy (hash_dst, cpi->outgoing_ibf->hash_sum, num_buckets * sizeof *hash_dst); + hash_dst += num_buckets; - cpi = (struct ConsensusPeerInformation *) cls; + memcpy (hash_dst, cpi->outgoing_ibf->id_sum, num_buckets * sizeof *hash_dst); + hash_dst += num_buckets; + + count_dst = (uint8_t *) hash_dst; + + memcpy (count_dst, cpi->outgoing_ibf->count, num_buckets * sizeof *count_dst); + + cpi->outgoing_bucket_counter += num_buckets; + + cpi->wh = GNUNET_STREAM_write (cpi->socket, digest, msize, GNUNET_TIME_UNIT_FOREVER_REL, + write_ibf, cpi); + + GNUNET_assert (NULL != cpi->wh); } @@ -1247,8 +1487,71 @@ static void write_values (void *cls, enum GNUNET_STREAM_Status status, size_t size) { struct ConsensusPeerInformation *cpi; + struct GNUNET_HashCode key; + struct GNUNET_CONSENSUS_Element *element; + struct GNUNET_MessageHeader *element_msg; + int side; + int msize; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitting value\n"); + + cpi = cls; + cpi->wh = NULL; + + if (NULL == cpi->diff_ibf) + { + GNUNET_assert (NULL != cpi->incoming_ibf); + GNUNET_assert (NULL != cpi->outgoing_ibf); + GNUNET_assert (cpi->outgoing_ibf->size == cpi->incoming_ibf->size); + cpi->diff_ibf = ibf_dup (cpi->incoming_ibf); + ibf_subtract (cpi->diff_ibf, cpi->outgoing_ibf); + } + + for (;;) + { + int res; + res = ibf_decode (cpi->diff_ibf, &side, &key); + if (GNUNET_SYSERR == res) + { + /* TODO: handle this correctly, request new ibf */ + GNUNET_break (0); + return; + } + if (GNUNET_NO == res) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted all values\n"); + return; + } + if (-1 == side) + break; + } + + element = GNUNET_CONTAINER_multihashmap_get (cpi->session->values, &key); + + if (NULL == element) + { + /* FIXME: handle correctly */ + GNUNET_break (0); + return; + } + + msize = sizeof (struct GNUNET_MessageHeader) + element->size; + + element_msg = GNUNET_malloc (msize); + element_msg->size = htons (msize); + element_msg->type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_P2P_ELEMENTS); + + memcpy (&element_msg[1], element->data, element->size); + + cpi->wh = GNUNET_STREAM_write (cpi->socket, element_msg, msize, GNUNET_TIME_UNIT_FOREVER_REL, + write_values, cpi); - cpi = (struct ConsensusPeerInformation *) cls; + GNUNET_free (element_msg); + + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "transmitted value\n"); + + GNUNET_assert (NULL != cpi->wh); } @@ -1301,7 +1604,6 @@ client_conclude (void *cls, write_strata (&session->info[i], GNUNET_STREAM_OK, 0); } } - GNUNET_SERVER_receive_done (client, GNUNET_OK); send_next (session); @@ -1320,7 +1622,48 @@ client_ack (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "client ack received\n"); + struct ConsensusSession *session; + struct GNUNET_CONSENSUS_AckMessage *msg; + struct PendingElement *pending; + struct GNUNET_CONSENSUS_Element *element; + struct GNUNET_HashCode key; + + session = sessions_head; + while (NULL != session) + { + if (session->client == client) + break; + } + + if (NULL == session) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "client tried to ack, but client is not in any session\n"); + GNUNET_SERVER_client_disconnect (client); + return; + } + + pending = session->approval_pending_head; + + GNUNET_CONTAINER_DLL_remove (session->approval_pending_head, session->approval_pending_tail, pending); + + msg = (struct GNUNET_CONSENSUS_AckMessage *) message; + + if (msg->keep) + { + + element = pending->element; + + GNUNET_CRYPTO_hash (element, element->size, &key); + + GNUNET_CONTAINER_multihashmap_put (session->values, &key, element, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + + strata_insert (session->strata, &key); + } + + /* FIXME: also remove element from strata */ + + GNUNET_SERVER_receive_done (client, GNUNET_OK); } /** @@ -1371,10 +1714,41 @@ static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + + /* FIXME: complete; write separate destructors for different data types */ + + while (NULL != incoming_sockets_head) + { + struct IncomingSocket *socket; + socket = incoming_sockets_head; + if (NULL == socket->cpi) + { + GNUNET_STREAM_close (socket->socket); + } + incoming_sockets_head = incoming_sockets_head->next; + GNUNET_free (socket); + } + while (NULL != sessions_head) { struct ConsensusSession *session; + int i; + session = sessions_head; + + for (i = 0; session->num_peers; i++) + { + struct ConsensusPeerInformation *cpi; + cpi = &session->info[i]; + if ((NULL != cpi) && (NULL != cpi->socket)) + { + GNUNET_STREAM_close (cpi->socket); + } + } + + if (NULL != session->client) + GNUNET_SERVER_client_disconnect (session->client); + sessions_head = sessions_head->next; GNUNET_free (session); } @@ -1436,7 +1810,6 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, const struct GNUNET_CONFIGU GNUNET_assert (NULL != core); GNUNET_log(GNUNET_ERROR_TYPE_INFO, "consensus running\n"); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "strata per msg: %d\n", STRATA_PER_MESSAGE); } diff --git a/src/consensus/ibf.c b/src/consensus/ibf.c index 2d06fc29bb..629fde3fca 100644 --- a/src/consensus/ibf.c +++ b/src/consensus/ibf.c @@ -111,8 +111,6 @@ ibf_insert_on_side (struct InvertibleBloomFilter *ibf, ibf->count[bucket] += side; - GNUNET_log_from(GNUNET_ERROR_TYPE_INFO, "ibf", "inserting in bucket %d \n", bucket); - GNUNET_CRYPTO_hash_xor (&key_copy, &ibf->id_sum[bucket], &ibf->id_sum[bucket]); GNUNET_CRYPTO_hash_xor (&key_hash, &ibf->hash_sum[bucket], diff --git a/src/consensus/test_consensus.conf b/src/consensus/test_consensus.conf index 01266c2a9f..61c382f4cc 100644 --- a/src/consensus/test_consensus.conf +++ b/src/consensus/test_consensus.conf @@ -5,6 +5,7 @@ HOSTNAME = localhost HOME = $SERVICEHOME BINARY = gnunet-service-consensus #PREFIX = gdbserver :12345 +PREFIX = valgrind ACCEPT_FROM = 127.0.0.1; ACCEPT_FROM6 = ::1; UNIXPATH = /tmp/gnunet-service-consensus.sock |