diff options
author | Florian Dold <florian.dold@gmail.com> | 2013-04-11 10:08:52 +0000 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2013-04-11 10:08:52 +0000 |
commit | 210be82b7cdc6058401e7d5042aa50dd0b750c92 (patch) | |
tree | e2bfa5a87038ef0a7f906d5ede8d6e7ea7f2638b /src/consensus | |
parent | 2b406c1533a919057cda8850315af1fca5b48a45 (diff) |
added consensus log-round simulation, work on consensus service, still problems with dv test case
Diffstat (limited to 'src/consensus')
-rw-r--r-- | src/consensus/Makefile.am | 6 | ||||
-rw-r--r-- | src/consensus/consensus-simulation.py | 103 | ||||
-rw-r--r-- | src/consensus/consensus_api.c | 20 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus-ibf.c | 4 | ||||
-rw-r--r-- | src/consensus/gnunet-consensus.c | 3 | ||||
-rw-r--r-- | src/consensus/gnunet-service-consensus.c | 2779 | ||||
-rw-r--r-- | src/consensus/ibf.c | 87 | ||||
-rw-r--r-- | src/consensus/ibf.h | 53 | ||||
-rw-r--r-- | src/consensus/strata_estimator.c | 145 | ||||
-rw-r--r-- | src/consensus/strata_estimator.h (renamed from src/consensus/consensus_flout.h) | 46 |
10 files changed, 1620 insertions, 1626 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am index e469de0575..82af29c87a 100644 --- a/src/consensus/Makefile.am +++ b/src/consensus/Makefile.am @@ -61,7 +61,8 @@ gnunet_consensus_ibf_LDADD = \ gnunet_service_consensus_SOURCES = \ gnunet-service-consensus.c \ - ibf.c + ibf.c \ + strata_estimator.c gnunet_service_consensus_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/core/libgnunetcore.la \ @@ -71,7 +72,8 @@ gnunet_service_consensus_LDADD = \ gnunet_service_evil_consensus_SOURCES = \ gnunet-service-consensus.c \ - ibf.c + ibf.c \ + strata_estimator.c gnunet_service_evil_consensus_LDADD = \ $(top_builddir)/src/util/libgnunetutil.la \ $(top_builddir)/src/core/libgnunetcore.la \ diff --git a/src/consensus/consensus-simulation.py b/src/consensus/consensus-simulation.py new file mode 100644 index 0000000000..930dfee62b --- /dev/null +++ b/src/consensus/consensus-simulation.py @@ -0,0 +1,103 @@ +#!/usr/bin/python
+# This file is part of GNUnet
+# (C) 2013 Christian Grothoff (and other contributing authors)
+#
+# GNUnet is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation; either version 2, or (at your
+# option) any later version.
+#
+# GNUnet is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNUnet; see the file COPYING. If not, write to the
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+
+import argparse
+import random
+from math import ceil,log,floor
+
+def bsc(n):
+ """ count the bits set in n"""
+ l = n.bit_length()
+ c = 0
+ x = 1
+ for _ in range(0, l):
+ if n & x:
+ c = c + 1
+ x = x << 1
+ return c
+
+def simulate(k, n, verbose):
+ assert k < n
+ largest_arc = int(2**ceil(log(n, 2))) / 2
+ num_ghosts = (2 * largest_arc) - n
+ if verbose:
+ print "we have", num_ghosts, "ghost peers"
+ # n.b. all peers with idx<k are evil
+ peers = range(n)
+ info = [1 << x for x in xrange(n)]
+ def done_p():
+ for x in xrange(k, n):
+ if bsc(info[x]) < n-k:
+ return False
+ return True
+ rounds = 0
+ while not done_p():
+ if verbose:
+ print "-- round --"
+ arc = 1
+ while arc <= largest_arc:
+ if verbose:
+ print "-- subround --"
+ new_info = [x for x in info]
+ for peer_physical in xrange(n):
+ peer_logical = peers[peer_physical]
+ peer_type = None
+ partner_logical = (peer_logical + arc) % n
+ partner_physical = peers.index(partner_logical)
+ if peer_physical < k or partner_physical < k:
+ if verbose:
+ print "bad peer in connection", peer_physical, "--", partner_physical
+ continue
+ if peer_logical & arc == 0:
+ # we are outgoing
+ if verbose:
+ print peer_physical, "connects to", partner_physical
+ peer_type = "outgoing"
+ if peer_logical < num_ghosts:
+ # we have a ghost, check if the peer who connects
+ # to our ghost is actually outgoing
+ ghost_partner_logical = (peer_logical - arc) % n
+ if ghost_partner_logical & arc == 0:
+ peer_type = peer_type + ", ghost incoming"
+ new_info[peer_physical] = new_info[peer_physical] | info[peer_physical] | info[partner_physical]
+ new_info[partner_physical] = new_info[partner_physical] | info[peer_physical] | info[partner_physical]
+ else:
+ peer_type = "incoming"
+ if verbose > 1:
+ print "type of", str(peer_physical) + ":", peer_type
+ info = new_info
+ arc = arc << 1;
+ rounds = rounds + 1
+ random.shuffle(peers)
+ return rounds
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("k", metavar="k", type=int, help="#(bad peers)")
+ parser.add_argument("n", metavar="n", type=int, help="#(all peers)")
+ parser.add_argument("r", metavar="r", type=int, help="#(rounds)")
+ parser.add_argument('--verbose', '-v', action='count')
+
+ args = parser.parse_args()
+ sum = 0.0;
+ for n in xrange (0, args.r):
+ sum += simulate(args.k, args.n, args.verbose)
+ print sum / args.r;
+
+
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c index 25ace3a4dc..fd61d37122 100644 --- a/src/consensus/consensus_api.c +++ b/src/consensus/consensus_api.c @@ -231,15 +231,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus) } } -static void -queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) -{ - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); -} - /** * Called when the server has sent is a new element @@ -252,8 +243,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_CONSENSUS_ElementMessage *msg) { struct GNUNET_CONSENSUS_Element element; - struct GNUNET_CONSENSUS_AckMessage *ack_msg; - int ret; LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); @@ -261,14 +250,7 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); element.data = &msg[1]; - ret = consensus->new_element_cb (consensus->new_element_cls, &element); - - ack_msg = GNUNET_new (struct GNUNET_CONSENSUS_AckMessage); - ack_msg->header.size = htons (sizeof *ack_msg); - ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); - ack_msg->keep = ret; - - queue_message (consensus, &ack_msg->header); + consensus->new_element_cb (consensus->new_element_cls, &element); send_next (consensus); } diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c index 73dc31b56a..d431795f17 100644 --- a/src/consensus/gnunet-consensus-ibf.c +++ b/src/consensus/gnunet-consensus-ibf.c @@ -160,8 +160,8 @@ run (void *cls, char *const *args, const char *cfgfile, i++; } - ibf_a = ibf_create (ibf_size, hash_num, 0); - ibf_b = ibf_create (ibf_size, hash_num, 0); + ibf_a = ibf_create (ibf_size, hash_num); + ibf_b = ibf_create (ibf_size, hash_num); printf ("generated sets\n"); diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c index 9e9b89446f..d8c1b14eee 100644 --- a/src/consensus/gnunet-consensus.c +++ b/src/consensus/gnunet-consensus.c @@ -192,12 +192,11 @@ connect_complete (void *cls, } -static int +static void new_element_cb (void *cls, const struct GNUNET_CONSENSUS_Element *element) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n"); - return GNUNET_YES; } diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c index ebd2d238bc..179df0fb0d 100644 --- a/src/consensus/gnunet-service-consensus.c +++ b/src/consensus/gnunet-service-consensus.c @@ -32,23 +32,32 @@ #include "gnunet_consensus_service.h" #include "gnunet_core_service.h" #include "gnunet_stream_lib.h" + #include "consensus_protocol.h" -#include "ibf.h" #include "consensus.h" +#include "ibf.h" +#include "strata_estimator.h" + + +/* + * Log macro that prefixes the local peer and the peer we are in contact with. + */ +#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \ + cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__) /** * Number of IBFs in a strata estimator. */ -#define STRATA_COUNT 32 +#define SE_STRATA_COUNT 32 /** - * Number of buckets per IBF. + * Size of the IBFs in the strata estimator. */ -#define STRATA_IBF_BUCKETS 80 +#define SE_IBF_SIZE 80 /** * hash num parameter for the difference digests and strata estimators */ -#define STRATA_HASH_NUM 3 +#define SE_IBF_HASH_NUM 3 /** * Number of buckets that can be transmitted in one message. @@ -63,72 +72,55 @@ #define MAX_IBF_ORDER (16) /** - * Number exp-rounds. + * Number of exponential rounds, used in the inventory and completion round. */ #define NUM_EXP_ROUNDS (4) /* forward declarations */ -struct ConsensusSession; -struct IncomingSocket; +/* mutual recursion with struct ConsensusSession */ struct ConsensusPeerInformation; -static void -client_send_next (struct ConsensusSession *session); - -static int -get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session); - -static void -round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +struct MessageQueue; +/* mutual recursion with round_over */ static void -send_ibf (struct ConsensusPeerInformation *cpi); +subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +/* mutial recursion with transmit_queued */ static void -send_strata_estimator (struct ConsensusPeerInformation *cpi); +client_send_next (struct MessageQueue *mq); +/* mutual recursion with mst_session_callback */ static void -decode (struct ConsensusPeerInformation *cpi); - -static void -write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size); +open_cb (void *cls, struct GNUNET_STREAM_Socket *socket); -static void -subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +static int +mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message); /** - * An element that is waiting to be transmitted to the client. + * Additional information about a consensus element. */ -struct PendingElement +struct ElementInfo { /** - * Pending elements are kept in a DLL. + * The element itself. */ - struct PendingElement *next; - + struct GNUNET_CONSENSUS_Element *element; /** - * Pending elements are kept in a DLL. + * Hash of the element */ - struct PendingElement *prev; - + struct GNUNET_HashCode *element_hash; /** - * The actual element + * Number of other peers that have the element in the inventory. */ - struct GNUNET_CONSENSUS_Element *element; - - /* peer this element is coming from */ - struct ConsensusPeerInformation *cpi; -}; - - -struct ElementList -{ - struct ElementList *next; - struct GNUNET_CONSENSUS_Element *element; - struct GNUNET_HashCode *element_hash; + unsigned int inventory_count; + /** + * Bitmap of peers that have this element in their inventory + */ + uint8_t *inventory_bitmap; }; @@ -147,178 +139,93 @@ enum ConsensusRound CONSENSUS_ROUND_EXCHANGE, /** * Exchange which elements each peer has, but not the elements. + * This round uses the all-to-all scheme. */ CONSENSUS_ROUND_INVENTORY, /** - * Collect and distribute missing values. + * Collect and distribute missing values with the exponential scheme. */ - CONSENSUS_ROUND_STOCK, + CONSENSUS_ROUND_COMPLETION, /** - * Consensus concluded. + * Consensus concluded. After timeout and finished communication with client, + * consensus session will be destroyed. */ CONSENSUS_ROUND_FINISH }; +/* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */ /** - * Information about a peer that is in a consensus session. + * State of another peer with respect to the + * current ibf. */ -struct ConsensusPeerInformation -{ - struct GNUNET_PeerIdentity peer_id; - - /** - * Socket for communicating with the peer, either created by the local peer, - * or the remote peer. - */ - struct GNUNET_STREAM_Socket *socket; - - /** - * Message tokenizer, for the data received from this peer via the stream socket. - */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; - - /** - * Do we connect to the peer, or does the peer connect to us? - * Only valid for all-to-all phases - */ - int is_outgoing; - - /** - * Did we receive/send a consensus hello? - */ - int hello; - +enum ConsensusIBFState { /** - * Handle for currently active read + * There is nothing going on with the IBF. */ - struct GNUNET_STREAM_ReadHandle *rh; - + IBF_STATE_NONE=0, /** - * Handle for currently active read + * We currently receive an ibf. */ - struct GNUNET_STREAM_WriteHandle *wh; - - enum { - /* beginning of round */ - IBF_STATE_NONE=0, - /* we currently receive an ibf */ - IBF_STATE_RECEIVING, - /* we currently transmit an ibf */ - IBF_STATE_TRANSMITTING, - /* we decode a received ibf */ - IBF_STATE_DECODING, - /* wait for elements and element requests */ - IBF_STATE_ANTICIPATE_DIFF - } ibf_state ; - - /** - * What is the order (=log2 size) of the ibf - * we're currently dealing with? - * Interpretation depends on ibf_state. - */ - int ibf_order; - - /** - * The current IBF for this peer, - * purpose dependent on ibf_state - */ - struct InvertibleBloomFilter *ibf; - - /** - * How many buckets have we transmitted/received? - * Interpretatin depends on ibf_state - */ - int ibf_bucket_counter; - - /** - * Strata estimator of the peer, NULL if our peer - * initiated the reconciliation. - */ - struct StrataEstimator *se; - - /** - * Element keys that this peer misses, but we have them. - */ - struct GNUNET_CONTAINER_MultiHashMap *requested_keys; - - /** - * Element keys that this peer has, but we miss. - */ - struct GNUNET_CONTAINER_MultiHashMap *reported_keys; - - /** - * Back-reference to the consensus session, - * to that ConsensusPeerInformation can be used as a closure - */ - struct ConsensusSession *session; - + IBF_STATE_RECEIVING, + /* + * we decode a received ibf + */ + IBF_STATE_DECODING, /** - * Messages queued for the current round. + * wait for elements and element requests */ - struct QueuedMessage *messages_head; + IBF_STATE_ANTICIPATE_DIFF +}; - /** - * Messages queued for the current round. - */ - struct QueuedMessage *messages_tail; - /** - * True if we are actually replaying the strata message, - * e.g. currently handling the premature_strata_message. - */ - int replaying_strata_message; +typedef void (*AddCallback) (struct MessageQueue *mq); +typedef void (*MessageSentCallback) (void *cls); - /** - * A strata message that is not actually for the current round, - * used in the exp-scheme. - */ - struct StrataMessage *premature_strata_message; - /** - * We have finishes the exp-subround with the peer. - */ - int exp_subround_finished; +/** + * Collection of the state necessary to read and write gnunet messages + * to a stream socket. Should be used as closure for stream_data_processor. + */ +struct MessageStreamState +{ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + struct MessageQueue *mq; + void *mst_cls; + struct GNUNET_STREAM_Socket *socket; + struct GNUNET_STREAM_ReadHandle *rh; + struct GNUNET_STREAM_WriteHandle *wh; +}; - int inventory_synced; - - /** - * Round this peer seems to be in, according to the last SE we got. - * Necessary to store this, as we sometimes need to respond to a request from an - * older round, while we are already in the next round. - */ - enum ConsensusRound apparent_round; +struct ServerClientSocketState +{ + struct GNUNET_SERVER_Client *client; + struct GNUNET_SERVER_TransmitHandle* th; }; -typedef void (*QueuedMessageCallback) (void *msg); /** - * A doubly linked list of messages. + * Generic message queue, for queueing outgoing messages. */ -struct QueuedMessage +struct MessageQueue { - struct GNUNET_MessageHeader *msg; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *next; - - /** - * Queued messages are stored in a doubly linked list. - */ - struct QueuedMessage *prev; - - QueuedMessageCallback cb; - - void *cls; + void *state; + AddCallback add_cb; + struct PendingMessage *pending_head; + struct PendingMessage *pending_tail; + struct PendingMessage *current_pm; }; -struct StrataEstimator +struct PendingMessage { - struct InvertibleBloomFilter **strata; + struct GNUNET_MessageHeader *msg; + struct MessageQueue *parent_queue; + struct PendingMessage *next; + struct PendingMessage *prev; + MessageSentCallback sent_cb; + void *sent_cb_cls; }; @@ -351,38 +258,26 @@ struct ConsensusSession struct GNUNET_HashCode global_id; /** - * Local client in this consensus session. - * There is only one client per consensus session. - */ - struct GNUNET_SERVER_Client *client; - - /** - * Elements in the consensus set of this session, - * all of them either have been sent by or approved by the client. - * Contains ElementList. - * Used as a unique-key hashmap. + * The server's client and associated local state */ - struct GNUNET_CONTAINER_MultiHashMap *values; - - /** - * Elements that have not been approved (or rejected) by the client yet. - */ - struct PendingElement *client_approval_head; + struct ServerClientSocketState scss; /** - * Elements that have not been approved (or rejected) by the client yet. + * Queued messages to the client. */ - struct PendingElement *client_approval_tail; + struct MessageQueue *client_mq; /** - * Messages to be sent to the local client that owns this session + * IBF_Key -> 2^(HashCode*) + * FIXME: + * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *. */ - struct QueuedMessage *client_messages_head; + struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map; /** - * Messages to be sent to the local client that owns this session + * Maps HashCodes to ElementInfos */ - struct QueuedMessage *client_messages_tail; + struct GNUNET_CONTAINER_MultiHashMap *values; /** * Currently active transmit handle for sending to the client @@ -412,9 +307,14 @@ struct ConsensusSession struct ConsensusPeerInformation *info; /** + * GNUNET_YES if the client has called conclude. + * */ + int conclude; + + /** * Index of the local peer in the peers array */ - int local_peer_idx; + unsigned int local_peer_idx; /** * Strata estimator, computed online @@ -431,16 +331,16 @@ struct ConsensusSession */ enum ConsensusRound current_round; - int exp_round; - - int exp_subround; - /** * Permutation of peers for the current round, * maps logical index (for current round) to physical index (location in info array) */ int *shuffle; + int exp_round; + + int exp_subround; + /** * The partner for the current exp-round */ @@ -454,41 +354,121 @@ struct ConsensusSession /** - * Sockets from other peers who want to communicate with us. - * It may not be known yet which consensus session they belong to. - * Also, the session might not exist yet locally. + * Information about a peer that is in a consensus session. */ -struct IncomingSocket +struct ConsensusPeerInformation { /** - * Incoming sockets are kept in a double linked list. + * Peer identitty of the peer in the consensus session */ - struct IncomingSocket *next; + struct GNUNET_PeerIdentity peer_id; /** - * Incoming sockets are kept in a double linked list. + * Do we connect to the peer, or does the peer connect to us? + * Only valid for all-to-all phases */ - struct IncomingSocket *prev; + int is_outgoing; /** - * The actual socket. + * Did we receive/send a consensus hello? */ - struct GNUNET_STREAM_Socket *socket; + int hello; + + /* + * FIXME + */ + struct MessageStreamState mss; /** - * Handle for currently active read + * Current state */ - struct GNUNET_STREAM_ReadHandle *rh; + enum ConsensusIBFState ibf_state; /** - * Peer that connected to us with the socket. + * What is the order (=log2 size) of the ibf + * we're currently dealing with? + * Interpretation depends on ibf_state. */ - struct GNUNET_PeerIdentity peer_id; + int ibf_order; + + /** + * The current IBF for this peer, + * purpose dependent on ibf_state + */ + struct InvertibleBloomFilter *ibf; /** - * Message stream tokenizer for this socket. + * How many buckets have we transmitted/received? + * Interpretatin depends on ibf_state */ - struct GNUNET_SERVER_MessageStreamTokenizer *mst; + int ibf_bucket_counter; + + /** + * Strata estimator of the peer, NULL if our peer + * initiated the reconciliation. + */ + struct StrataEstimator *se; + + /** + * Back-reference to the consensus session, + * to that ConsensusPeerInformation can be used as a closure + */ + struct ConsensusSession *session; + + /** + * True if we are actually replaying the strata message, + * e.g. currently handling the premature_strata_message. + */ + int replaying_strata_message; + + /** + * A strata message that is not actually for the current round, + * used in the exp-scheme. + */ + struct StrataMessage *premature_strata_message; + + /** + * We have finishes the exp-subround with the peer. + */ + int exp_subround_finished; + + /** + * GNUNET_YES if we synced inventory with this peer; + * GNUNET_NO otherwise. + */ + int inventory_synced; + + /** + * Round this peer seems to be in, according to the last SE we got. + * Necessary to store this, as we sometimes need to respond to a request from an + * older round, while we are already in the next round. + */ + enum ConsensusRound apparent_round; +}; + + +/** + * Sockets from other peers who want to communicate with us. + * It may not be known yet which consensus session they belong to, we have to wait for the + * peer's hello. + * Also, the session might not exist yet locally, we have to wait for a local client to connect. + */ +struct IncomingSocket +{ + /** + * Incoming sockets are kept in a double linked list. + */ + struct IncomingSocket *next; + + /** + * Incoming sockets are kept in a double linked list. + */ + struct IncomingSocket *prev; + + /** + * Peer that connected to us with the socket. + */ + struct GNUNET_PeerIdentity peer_id; /** * Peer-in-session this socket belongs to, once known, otherwise NULL. @@ -500,19 +480,35 @@ struct IncomingSocket * but the session does not exist yet. */ struct GNUNET_HashCode *requested_gid; + + /* + * Timeout, will disconnect the socket if not yet in a session. + * FIXME: implement + */ + GNUNET_SCHEDULER_TaskIdentifier timeout; + + /* FIXME */ + struct MessageStreamState mss; }; +/** + * Linked list of incoming sockets. + */ static struct IncomingSocket *incoming_sockets_head; + +/** + * Linked list of incoming sockets. + */ static struct IncomingSocket *incoming_sockets_tail; /** - * Linked list of sesstions this peer participates in. + * Linked list of sessions this peer participates in. */ static struct ConsensusSession *sessions_head; /** - * Linked list of sesstions this peer participates in. + * Linked list of sessions this peer participates in. */ static struct ConsensusSession *sessions_tail; @@ -543,151 +539,159 @@ static struct GNUNET_STREAM_ListenSocket *listener; /** - * Queue a message to be sent to the inhabiting client of a session. + * Transmit a queued message to the session's client. * - * @param session session - * @param msg message we want to queue + * @param cls consensus session + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf */ -static void -queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg) +static size_t +transmit_queued (void *cls, size_t size, + void *buf) { - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm); + struct MessageQueue *mq = cls; + struct PendingMessage *pm = mq->pending_head; + struct ServerClientSocketState *state = mq->state; + size_t msg_size; + + GNUNET_assert (NULL != pm); + GNUNET_assert (NULL != buf); + msg_size = ntohs (pm->msg->size); + GNUNET_assert (size >= msg_size); + memcpy (buf, pm->msg, msg_size); + GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); + state->th = NULL; + client_send_next (cls); + GNUNET_free (pm); + return msg_size; } -/** - * Queue a message to be sent to another peer - * - * @param cpi peer - * @param msg message we want to queue - * @param cb callback, called when the message is given to strem - * @param cls closure for cb - */ + static void -queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls) -{ - struct QueuedMessage *qm; - qm = GNUNET_malloc (sizeof *qm); - qm->msg = msg; - qm->cls = cls; - qm->cb = cb; - GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm); - if (cpi->wh == NULL) - write_queued (cpi, GNUNET_STREAM_OK, 0); +client_send_next (struct MessageQueue *mq) +{ + struct ServerClientSocketState *state = mq->state; + int msize; + + GNUNET_assert (NULL != state); + + if ( (NULL != state->th) || + (NULL == mq->pending_head) ) + return; + msize = ntohs (mq->pending_head->msg->size); + state->th = + GNUNET_SERVER_notify_transmit_ready (state->client, msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &transmit_queued, mq); +} + + +struct MessageQueue * +create_message_queue_for_server_client (struct ServerClientSocketState *scss) +{ + struct MessageQueue *mq; + mq = GNUNET_new (struct MessageQueue); + mq->add_cb = client_send_next; + mq->state = scss; + return mq; } /** - * Queue a message to be sent to another peer + * Functions of this signature are called whenever writing operations + * on a stream are executed * - * @param cpi peer - * @param msg message we want to queue + * @param cls the closure from GNUNET_STREAM_write + * @param status the status of the stream at the time this function is called; + * GNUNET_STREAM_OK if writing to stream was completed successfully; + * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully + * (this doesn't mean that the data is never sent, the receiver may + * have read the data but its ACKs may have been lost); + * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the + * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot + * be processed. + * @param size the number of bytes written */ -static void -queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg) +static void +write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - queue_peer_message_with_cls (cpi, msg, NULL, NULL); + struct MessageQueue *mq = cls; + struct MessageStreamState *mss = mq->state; + struct PendingMessage *pm; + + GNUNET_assert (GNUNET_STREAM_OK == status); + + /* call cb for message we finished sending */ + pm = mq->current_pm; + if (NULL != pm) + { + if (NULL != pm->sent_cb) + pm->sent_cb (pm->sent_cb_cls); + GNUNET_free (pm); + } + + mss->wh = NULL; + + pm = mq->pending_head; + mq->current_pm = pm; + if (NULL == pm) + return; + GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm); + mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls); + GNUNET_assert (NULL != mss->wh); } -/* static void -clear_peer_messages (struct ConsensusPeerInformation *cpi) +stream_socket_add_cb (struct MessageQueue *mq) { - cpi->messages_head = NULL; - cpi->messages_tail = NULL; + if (NULL != mq->current_pm) + return; + write_queued (mq, GNUNET_STREAM_OK, 0); } -*/ -/** - * Estimate set difference with two strata estimators, - * i.e. arrays of IBFs. - * Does not not modify its arguments. - * - * @param se1 first strata estimator - * @param se2 second strata estimator - * @return the estimated difference - */ -static int -estimate_difference (const struct StrataEstimator *se1, - const struct StrataEstimator *se2) +struct MessageQueue * +create_message_queue_for_stream_socket (struct MessageStreamState *mss) { - int i; - int count; - count = 0; - for (i = STRATA_COUNT - 1; i >= 0; i--) - { - struct InvertibleBloomFilter *diff; - /* number of keys decoded from the ibf */ - int ibf_count; - int more; - ibf_count = 0; - /* FIXME: implement this without always allocating new IBFs */ - diff = ibf_dup (se1->strata[i]); - ibf_subtract (diff, se2->strata[i]); - for (;;) - { - more = ibf_decode (diff, NULL, NULL); - if (GNUNET_NO == more) - { - count += ibf_count; - break; - } - if (GNUNET_SYSERR == more) - { - ibf_destroy (diff); - return count * (1 << (i + 1)); - } - ibf_count++; - } - ibf_destroy (diff); - } - return count; + struct MessageQueue *mq; + mq = GNUNET_new (struct MessageQueue); + mq->state = mss; + mq->add_cb = stream_socket_add_cb; + return mq; +} + + +struct PendingMessage * +new_pending_message (uint16_t size, uint16_t type) +{ + struct PendingMessage *pm; + pm = GNUNET_malloc (sizeof *pm + size); + pm->msg = (void *) &pm[1]; + pm->msg->size = htons (size); + pm->msg->type = htons (type); + return pm; } /** - * Called when receiving data from a peer that is member of - * an inhabited consensus session. + * Queue a message in a message queue. * - * @param cls the closure from GNUNET_STREAM_read - * @param status the status of the stream at the time this function is called |