aboutsummaryrefslogtreecommitdiff
path: root/src/consensus
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2013-04-11 10:08:52 +0000
committerFlorian Dold <florian.dold@gmail.com>2013-04-11 10:08:52 +0000
commit210be82b7cdc6058401e7d5042aa50dd0b750c92 (patch)
treee2bfa5a87038ef0a7f906d5ede8d6e7ea7f2638b /src/consensus
parent2b406c1533a919057cda8850315af1fca5b48a45 (diff)
added consensus log-round simulation, work on consensus service, still problems with dv test case
Diffstat (limited to 'src/consensus')
-rw-r--r--src/consensus/Makefile.am6
-rw-r--r--src/consensus/consensus-simulation.py103
-rw-r--r--src/consensus/consensus_api.c20
-rw-r--r--src/consensus/gnunet-consensus-ibf.c4
-rw-r--r--src/consensus/gnunet-consensus.c3
-rw-r--r--src/consensus/gnunet-service-consensus.c2779
-rw-r--r--src/consensus/ibf.c87
-rw-r--r--src/consensus/ibf.h53
-rw-r--r--src/consensus/strata_estimator.c145
-rw-r--r--src/consensus/strata_estimator.h (renamed from src/consensus/consensus_flout.h)46
10 files changed, 1620 insertions, 1626 deletions
diff --git a/src/consensus/Makefile.am b/src/consensus/Makefile.am
index e469de0575..82af29c87a 100644
--- a/src/consensus/Makefile.am
+++ b/src/consensus/Makefile.am
@@ -61,7 +61,8 @@ gnunet_consensus_ibf_LDADD = \
gnunet_service_consensus_SOURCES = \
gnunet-service-consensus.c \
- ibf.c
+ ibf.c \
+ strata_estimator.c
gnunet_service_consensus_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/core/libgnunetcore.la \
@@ -71,7 +72,8 @@ gnunet_service_consensus_LDADD = \
gnunet_service_evil_consensus_SOURCES = \
gnunet-service-consensus.c \
- ibf.c
+ ibf.c \
+ strata_estimator.c
gnunet_service_evil_consensus_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/core/libgnunetcore.la \
diff --git a/src/consensus/consensus-simulation.py b/src/consensus/consensus-simulation.py
new file mode 100644
index 0000000000..930dfee62b
--- /dev/null
+++ b/src/consensus/consensus-simulation.py
@@ -0,0 +1,103 @@
+#!/usr/bin/python
+# This file is part of GNUnet
+# (C) 2013 Christian Grothoff (and other contributing authors)
+#
+# GNUnet is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published
+# by the Free Software Foundation; either version 2, or (at your
+# option) any later version.
+#
+# GNUnet is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNUnet; see the file COPYING. If not, write to the
+# Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+
+import argparse
+import random
+from math import ceil,log,floor
+
+def bsc(n):
+ """ count the bits set in n"""
+ l = n.bit_length()
+ c = 0
+ x = 1
+ for _ in range(0, l):
+ if n & x:
+ c = c + 1
+ x = x << 1
+ return c
+
+def simulate(k, n, verbose):
+ assert k < n
+ largest_arc = int(2**ceil(log(n, 2))) / 2
+ num_ghosts = (2 * largest_arc) - n
+ if verbose:
+ print "we have", num_ghosts, "ghost peers"
+ # n.b. all peers with idx<k are evil
+ peers = range(n)
+ info = [1 << x for x in xrange(n)]
+ def done_p():
+ for x in xrange(k, n):
+ if bsc(info[x]) < n-k:
+ return False
+ return True
+ rounds = 0
+ while not done_p():
+ if verbose:
+ print "-- round --"
+ arc = 1
+ while arc <= largest_arc:
+ if verbose:
+ print "-- subround --"
+ new_info = [x for x in info]
+ for peer_physical in xrange(n):
+ peer_logical = peers[peer_physical]
+ peer_type = None
+ partner_logical = (peer_logical + arc) % n
+ partner_physical = peers.index(partner_logical)
+ if peer_physical < k or partner_physical < k:
+ if verbose:
+ print "bad peer in connection", peer_physical, "--", partner_physical
+ continue
+ if peer_logical & arc == 0:
+ # we are outgoing
+ if verbose:
+ print peer_physical, "connects to", partner_physical
+ peer_type = "outgoing"
+ if peer_logical < num_ghosts:
+ # we have a ghost, check if the peer who connects
+ # to our ghost is actually outgoing
+ ghost_partner_logical = (peer_logical - arc) % n
+ if ghost_partner_logical & arc == 0:
+ peer_type = peer_type + ", ghost incoming"
+ new_info[peer_physical] = new_info[peer_physical] | info[peer_physical] | info[partner_physical]
+ new_info[partner_physical] = new_info[partner_physical] | info[peer_physical] | info[partner_physical]
+ else:
+ peer_type = "incoming"
+ if verbose > 1:
+ print "type of", str(peer_physical) + ":", peer_type
+ info = new_info
+ arc = arc << 1;
+ rounds = rounds + 1
+ random.shuffle(peers)
+ return rounds
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("k", metavar="k", type=int, help="#(bad peers)")
+ parser.add_argument("n", metavar="n", type=int, help="#(all peers)")
+ parser.add_argument("r", metavar="r", type=int, help="#(rounds)")
+ parser.add_argument('--verbose', '-v', action='count')
+
+ args = parser.parse_args()
+ sum = 0.0;
+ for n in xrange (0, args.r):
+ sum += simulate(args.k, args.n, args.verbose)
+ print sum / args.r;
+
+
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c
index 25ace3a4dc..fd61d37122 100644
--- a/src/consensus/consensus_api.c
+++ b/src/consensus/consensus_api.c
@@ -231,15 +231,6 @@ send_next (struct GNUNET_CONSENSUS_Handle *consensus)
}
}
-static void
-queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg)
-{
- struct QueuedMessage *qm;
- qm = GNUNET_malloc (sizeof *qm);
- qm->msg = msg;
- GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm);
-}
-
/**
* Called when the server has sent is a new element
@@ -252,8 +243,6 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
struct GNUNET_CONSENSUS_ElementMessage *msg)
{
struct GNUNET_CONSENSUS_Element element;
- struct GNUNET_CONSENSUS_AckMessage *ack_msg;
- int ret;
LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n");
@@ -261,14 +250,7 @@ handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus,
element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage);
element.data = &msg[1];
- ret = consensus->new_element_cb (consensus->new_element_cls, &element);
-
- ack_msg = GNUNET_new (struct GNUNET_CONSENSUS_AckMessage);
- ack_msg->header.size = htons (sizeof *ack_msg);
- ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK);
- ack_msg->keep = ret;
-
- queue_message (consensus, &ack_msg->header);
+ consensus->new_element_cb (consensus->new_element_cls, &element);
send_next (consensus);
}
diff --git a/src/consensus/gnunet-consensus-ibf.c b/src/consensus/gnunet-consensus-ibf.c
index 73dc31b56a..d431795f17 100644
--- a/src/consensus/gnunet-consensus-ibf.c
+++ b/src/consensus/gnunet-consensus-ibf.c
@@ -160,8 +160,8 @@ run (void *cls, char *const *args, const char *cfgfile,
i++;
}
- ibf_a = ibf_create (ibf_size, hash_num, 0);
- ibf_b = ibf_create (ibf_size, hash_num, 0);
+ ibf_a = ibf_create (ibf_size, hash_num);
+ ibf_b = ibf_create (ibf_size, hash_num);
printf ("generated sets\n");
diff --git a/src/consensus/gnunet-consensus.c b/src/consensus/gnunet-consensus.c
index 9e9b89446f..d8c1b14eee 100644
--- a/src/consensus/gnunet-consensus.c
+++ b/src/consensus/gnunet-consensus.c
@@ -192,12 +192,11 @@ connect_complete (void *cls,
}
-static int
+static void
new_element_cb (void *cls,
const struct GNUNET_CONSENSUS_Element *element)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO, "received new element\n");
- return GNUNET_YES;
}
diff --git a/src/consensus/gnunet-service-consensus.c b/src/consensus/gnunet-service-consensus.c
index ebd2d238bc..179df0fb0d 100644
--- a/src/consensus/gnunet-service-consensus.c
+++ b/src/consensus/gnunet-service-consensus.c
@@ -32,23 +32,32 @@
#include "gnunet_consensus_service.h"
#include "gnunet_core_service.h"
#include "gnunet_stream_lib.h"
+
#include "consensus_protocol.h"
-#include "ibf.h"
#include "consensus.h"
+#include "ibf.h"
+#include "strata_estimator.h"
+
+
+/*
+ * Log macro that prefixes the local peer and the peer we are in contact with.
+ */
+#define LOG_PP(kind, cpi, m,...) GNUNET_log (kind, "P%d for P%d: " m, \
+ cpi->session->local_peer_idx, (int) (cpi - cpi->session->info),##__VA_ARGS__)
/**
* Number of IBFs in a strata estimator.
*/
-#define STRATA_COUNT 32
+#define SE_STRATA_COUNT 32
/**
- * Number of buckets per IBF.
+ * Size of the IBFs in the strata estimator.
*/
-#define STRATA_IBF_BUCKETS 80
+#define SE_IBF_SIZE 80
/**
* hash num parameter for the difference digests and strata estimators
*/
-#define STRATA_HASH_NUM 3
+#define SE_IBF_HASH_NUM 3
/**
* Number of buckets that can be transmitted in one message.
@@ -63,72 +72,55 @@
#define MAX_IBF_ORDER (16)
/**
- * Number exp-rounds.
+ * Number of exponential rounds, used in the inventory and completion round.
*/
#define NUM_EXP_ROUNDS (4)
/* forward declarations */
-struct ConsensusSession;
-struct IncomingSocket;
+/* mutual recursion with struct ConsensusSession */
struct ConsensusPeerInformation;
-static void
-client_send_next (struct ConsensusSession *session);
-
-static int
-get_peer_idx (const struct GNUNET_PeerIdentity *peer, const struct ConsensusSession *session);
-
-static void
-round_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+struct MessageQueue;
+/* mutual recursion with round_over */
static void
-send_ibf (struct ConsensusPeerInformation *cpi);
+subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+/* mutial recursion with transmit_queued */
static void
-send_strata_estimator (struct ConsensusPeerInformation *cpi);
+client_send_next (struct MessageQueue *mq);
+/* mutual recursion with mst_session_callback */
static void
-decode (struct ConsensusPeerInformation *cpi);
-
-static void
-write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size);
+open_cb (void *cls, struct GNUNET_STREAM_Socket *socket);
-static void
-subround_over (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+static int
+mst_session_callback (void *cls, void *client, const struct GNUNET_MessageHeader *message);
/**
- * An element that is waiting to be transmitted to the client.
+ * Additional information about a consensus element.
*/
-struct PendingElement
+struct ElementInfo
{
/**
- * Pending elements are kept in a DLL.
+ * The element itself.
*/
- struct PendingElement *next;
-
+ struct GNUNET_CONSENSUS_Element *element;
/**
- * Pending elements are kept in a DLL.
+ * Hash of the element
*/
- struct PendingElement *prev;
-
+ struct GNUNET_HashCode *element_hash;
/**
- * The actual element
+ * Number of other peers that have the element in the inventory.
*/
- struct GNUNET_CONSENSUS_Element *element;
-
- /* peer this element is coming from */
- struct ConsensusPeerInformation *cpi;
-};
-
-
-struct ElementList
-{
- struct ElementList *next;
- struct GNUNET_CONSENSUS_Element *element;
- struct GNUNET_HashCode *element_hash;
+ unsigned int inventory_count;
+ /**
+ * Bitmap of peers that have this element in their inventory
+ */
+ uint8_t *inventory_bitmap;
};
@@ -147,178 +139,93 @@ enum ConsensusRound
CONSENSUS_ROUND_EXCHANGE,
/**
* Exchange which elements each peer has, but not the elements.
+ * This round uses the all-to-all scheme.
*/
CONSENSUS_ROUND_INVENTORY,
/**
- * Collect and distribute missing values.
+ * Collect and distribute missing values with the exponential scheme.
*/
- CONSENSUS_ROUND_STOCK,
+ CONSENSUS_ROUND_COMPLETION,
/**
- * Consensus concluded.
+ * Consensus concluded. After timeout and finished communication with client,
+ * consensus session will be destroyed.
*/
CONSENSUS_ROUND_FINISH
};
+/* FIXME: review states, ANTICIPATE_DIFF and DECODING in particular */
/**
- * Information about a peer that is in a consensus session.
+ * State of another peer with respect to the
+ * current ibf.
*/
-struct ConsensusPeerInformation
-{
- struct GNUNET_PeerIdentity peer_id;
-
- /**
- * Socket for communicating with the peer, either created by the local peer,
- * or the remote peer.
- */
- struct GNUNET_STREAM_Socket *socket;
-
- /**
- * Message tokenizer, for the data received from this peer via the stream socket.
- */
- struct GNUNET_SERVER_MessageStreamTokenizer *mst;
-
- /**
- * Do we connect to the peer, or does the peer connect to us?
- * Only valid for all-to-all phases
- */
- int is_outgoing;
-
- /**
- * Did we receive/send a consensus hello?
- */
- int hello;
-
+enum ConsensusIBFState {
/**
- * Handle for currently active read
+ * There is nothing going on with the IBF.
*/
- struct GNUNET_STREAM_ReadHandle *rh;
-
+ IBF_STATE_NONE=0,
/**
- * Handle for currently active read
+ * We currently receive an ibf.
*/
- struct GNUNET_STREAM_WriteHandle *wh;
-
- enum {
- /* beginning of round */
- IBF_STATE_NONE=0,
- /* we currently receive an ibf */
- IBF_STATE_RECEIVING,
- /* we currently transmit an ibf */
- IBF_STATE_TRANSMITTING,
- /* we decode a received ibf */
- IBF_STATE_DECODING,
- /* wait for elements and element requests */
- IBF_STATE_ANTICIPATE_DIFF
- } ibf_state ;
-
- /**
- * What is the order (=log2 size) of the ibf
- * we're currently dealing with?
- * Interpretation depends on ibf_state.
- */
- int ibf_order;
-
- /**
- * The current IBF for this peer,
- * purpose dependent on ibf_state
- */
- struct InvertibleBloomFilter *ibf;
-
- /**
- * How many buckets have we transmitted/received?
- * Interpretatin depends on ibf_state
- */
- int ibf_bucket_counter;
-
- /**
- * Strata estimator of the peer, NULL if our peer
- * initiated the reconciliation.
- */
- struct StrataEstimator *se;
-
- /**
- * Element keys that this peer misses, but we have them.
- */
- struct GNUNET_CONTAINER_MultiHashMap *requested_keys;
-
- /**
- * Element keys that this peer has, but we miss.
- */
- struct GNUNET_CONTAINER_MultiHashMap *reported_keys;
-
- /**
- * Back-reference to the consensus session,
- * to that ConsensusPeerInformation can be used as a closure
- */
- struct ConsensusSession *session;
-
+ IBF_STATE_RECEIVING,
+ /*
+ * we decode a received ibf
+ */
+ IBF_STATE_DECODING,
/**
- * Messages queued for the current round.
+ * wait for elements and element requests
*/
- struct QueuedMessage *messages_head;
+ IBF_STATE_ANTICIPATE_DIFF
+};
- /**
- * Messages queued for the current round.
- */
- struct QueuedMessage *messages_tail;
- /**
- * True if we are actually replaying the strata message,
- * e.g. currently handling the premature_strata_message.
- */
- int replaying_strata_message;
+typedef void (*AddCallback) (struct MessageQueue *mq);
+typedef void (*MessageSentCallback) (void *cls);
- /**
- * A strata message that is not actually for the current round,
- * used in the exp-scheme.
- */
- struct StrataMessage *premature_strata_message;
- /**
- * We have finishes the exp-subround with the peer.
- */
- int exp_subround_finished;
+/**
+ * Collection of the state necessary to read and write gnunet messages
+ * to a stream socket. Should be used as closure for stream_data_processor.
+ */
+struct MessageStreamState
+{
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+ struct MessageQueue *mq;
+ void *mst_cls;
+ struct GNUNET_STREAM_Socket *socket;
+ struct GNUNET_STREAM_ReadHandle *rh;
+ struct GNUNET_STREAM_WriteHandle *wh;
+};
- int inventory_synced;
-
- /**
- * Round this peer seems to be in, according to the last SE we got.
- * Necessary to store this, as we sometimes need to respond to a request from an
- * older round, while we are already in the next round.
- */
- enum ConsensusRound apparent_round;
+struct ServerClientSocketState
+{
+ struct GNUNET_SERVER_Client *client;
+ struct GNUNET_SERVER_TransmitHandle* th;
};
-typedef void (*QueuedMessageCallback) (void *msg);
/**
- * A doubly linked list of messages.
+ * Generic message queue, for queueing outgoing messages.
*/
-struct QueuedMessage
+struct MessageQueue
{
- struct GNUNET_MessageHeader *msg;
-
- /**
- * Queued messages are stored in a doubly linked list.
- */
- struct QueuedMessage *next;
-
- /**
- * Queued messages are stored in a doubly linked list.
- */
- struct QueuedMessage *prev;
-
- QueuedMessageCallback cb;
-
- void *cls;
+ void *state;
+ AddCallback add_cb;
+ struct PendingMessage *pending_head;
+ struct PendingMessage *pending_tail;
+ struct PendingMessage *current_pm;
};
-struct StrataEstimator
+struct PendingMessage
{
- struct InvertibleBloomFilter **strata;
+ struct GNUNET_MessageHeader *msg;
+ struct MessageQueue *parent_queue;
+ struct PendingMessage *next;
+ struct PendingMessage *prev;
+ MessageSentCallback sent_cb;
+ void *sent_cb_cls;
};
@@ -351,38 +258,26 @@ struct ConsensusSession
struct GNUNET_HashCode global_id;
/**
- * Local client in this consensus session.
- * There is only one client per consensus session.
- */
- struct GNUNET_SERVER_Client *client;
-
- /**
- * Elements in the consensus set of this session,
- * all of them either have been sent by or approved by the client.
- * Contains ElementList.
- * Used as a unique-key hashmap.
+ * The server's client and associated local state
*/
- struct GNUNET_CONTAINER_MultiHashMap *values;
-
- /**
- * Elements that have not been approved (or rejected) by the client yet.
- */
- struct PendingElement *client_approval_head;
+ struct ServerClientSocketState scss;
/**
- * Elements that have not been approved (or rejected) by the client yet.
+ * Queued messages to the client.
*/
- struct PendingElement *client_approval_tail;
+ struct MessageQueue *client_mq;
/**
- * Messages to be sent to the local client that owns this session
+ * IBF_Key -> 2^(HashCode*)
+ * FIXME:
+ * should be array of hash maps, mapping replicated struct IBF_Keys to struct HashCode *.
*/
- struct QueuedMessage *client_messages_head;
+ struct GNUNET_CONTAINER_MultiHashMap *ibf_key_map;
/**
- * Messages to be sent to the local client that owns this session
+ * Maps HashCodes to ElementInfos
*/
- struct QueuedMessage *client_messages_tail;
+ struct GNUNET_CONTAINER_MultiHashMap *values;
/**
* Currently active transmit handle for sending to the client
@@ -412,9 +307,14 @@ struct ConsensusSession
struct ConsensusPeerInformation *info;
/**
+ * GNUNET_YES if the client has called conclude.
+ * */
+ int conclude;
+
+ /**
* Index of the local peer in the peers array
*/
- int local_peer_idx;
+ unsigned int local_peer_idx;
/**
* Strata estimator, computed online
@@ -431,16 +331,16 @@ struct ConsensusSession
*/
enum ConsensusRound current_round;
- int exp_round;
-
- int exp_subround;
-
/**
* Permutation of peers for the current round,
* maps logical index (for current round) to physical index (location in info array)
*/
int *shuffle;
+ int exp_round;
+
+ int exp_subround;
+
/**
* The partner for the current exp-round
*/
@@ -454,41 +354,121 @@ struct ConsensusSession
/**
- * Sockets from other peers who want to communicate with us.
- * It may not be known yet which consensus session they belong to.
- * Also, the session might not exist yet locally.
+ * Information about a peer that is in a consensus session.
*/
-struct IncomingSocket
+struct ConsensusPeerInformation
{
/**
- * Incoming sockets are kept in a double linked list.
+ * Peer identitty of the peer in the consensus session
*/
- struct IncomingSocket *next;
+ struct GNUNET_PeerIdentity peer_id;
/**
- * Incoming sockets are kept in a double linked list.
+ * Do we connect to the peer, or does the peer connect to us?
+ * Only valid for all-to-all phases
*/
- struct IncomingSocket *prev;
+ int is_outgoing;
/**
- * The actual socket.
+ * Did we receive/send a consensus hello?
*/
- struct GNUNET_STREAM_Socket *socket;
+ int hello;
+
+ /*
+ * FIXME
+ */
+ struct MessageStreamState mss;
/**
- * Handle for currently active read
+ * Current state
*/
- struct GNUNET_STREAM_ReadHandle *rh;
+ enum ConsensusIBFState ibf_state;
/**
- * Peer that connected to us with the socket.
+ * What is the order (=log2 size) of the ibf
+ * we're currently dealing with?
+ * Interpretation depends on ibf_state.
*/
- struct GNUNET_PeerIdentity peer_id;
+ int ibf_order;
+
+ /**
+ * The current IBF for this peer,
+ * purpose dependent on ibf_state
+ */
+ struct InvertibleBloomFilter *ibf;
/**
- * Message stream tokenizer for this socket.
+ * How many buckets have we transmitted/received?
+ * Interpretatin depends on ibf_state
*/
- struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+ int ibf_bucket_counter;
+
+ /**
+ * Strata estimator of the peer, NULL if our peer
+ * initiated the reconciliation.
+ */
+ struct StrataEstimator *se;
+
+ /**
+ * Back-reference to the consensus session,
+ * to that ConsensusPeerInformation can be used as a closure
+ */
+ struct ConsensusSession *session;
+
+ /**
+ * True if we are actually replaying the strata message,
+ * e.g. currently handling the premature_strata_message.
+ */
+ int replaying_strata_message;
+
+ /**
+ * A strata message that is not actually for the current round,
+ * used in the exp-scheme.
+ */
+ struct StrataMessage *premature_strata_message;
+
+ /**
+ * We have finishes the exp-subround with the peer.
+ */
+ int exp_subround_finished;
+
+ /**
+ * GNUNET_YES if we synced inventory with this peer;
+ * GNUNET_NO otherwise.
+ */
+ int inventory_synced;
+
+ /**
+ * Round this peer seems to be in, according to the last SE we got.
+ * Necessary to store this, as we sometimes need to respond to a request from an
+ * older round, while we are already in the next round.
+ */
+ enum ConsensusRound apparent_round;
+};
+
+
+/**
+ * Sockets from other peers who want to communicate with us.
+ * It may not be known yet which consensus session they belong to, we have to wait for the
+ * peer's hello.
+ * Also, the session might not exist yet locally, we have to wait for a local client to connect.
+ */
+struct IncomingSocket
+{
+ /**
+ * Incoming sockets are kept in a double linked list.
+ */
+ struct IncomingSocket *next;
+
+ /**
+ * Incoming sockets are kept in a double linked list.
+ */
+ struct IncomingSocket *prev;
+
+ /**
+ * Peer that connected to us with the socket.
+ */
+ struct GNUNET_PeerIdentity peer_id;
/**
* Peer-in-session this socket belongs to, once known, otherwise NULL.
@@ -500,19 +480,35 @@ struct IncomingSocket
* but the session does not exist yet.
*/
struct GNUNET_HashCode *requested_gid;
+
+ /*
+ * Timeout, will disconnect the socket if not yet in a session.
+ * FIXME: implement
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout;
+
+ /* FIXME */
+ struct MessageStreamState mss;
};
+/**
+ * Linked list of incoming sockets.
+ */
static struct IncomingSocket *incoming_sockets_head;
+
+/**
+ * Linked list of incoming sockets.
+ */
static struct IncomingSocket *incoming_sockets_tail;
/**
- * Linked list of sesstions this peer participates in.
+ * Linked list of sessions this peer participates in.
*/
static struct ConsensusSession *sessions_head;
/**
- * Linked list of sesstions this peer participates in.
+ * Linked list of sessions this peer participates in.
*/
static struct ConsensusSession *sessions_tail;
@@ -543,151 +539,159 @@ static struct GNUNET_STREAM_ListenSocket *listener;
/**
- * Queue a message to be sent to the inhabiting client of a session.
+ * Transmit a queued message to the session's client.
*
- * @param session session
- * @param msg message we want to queue
+ * @param cls consensus session
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
*/
-static void
-queue_client_message (struct ConsensusSession *session, struct GNUNET_MessageHeader *msg)
+static size_t
+transmit_queued (void *cls, size_t size,
+ void *buf)
{
- struct QueuedMessage *qm;
- qm = GNUNET_malloc (sizeof *qm);
- qm->msg = msg;
- GNUNET_CONTAINER_DLL_insert_tail (session->client_messages_head, session->client_messages_tail, qm);
+ struct MessageQueue *mq = cls;
+ struct PendingMessage *pm = mq->pending_head;
+ struct ServerClientSocketState *state = mq->state;
+ size_t msg_size;
+
+ GNUNET_assert (NULL != pm);
+ GNUNET_assert (NULL != buf);
+ msg_size = ntohs (pm->msg->size);
+ GNUNET_assert (size >= msg_size);
+ memcpy (buf, pm->msg, msg_size);
+ GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
+ state->th = NULL;
+ client_send_next (cls);
+ GNUNET_free (pm);
+ return msg_size;
}
-/**
- * Queue a message to be sent to another peer
- *
- * @param cpi peer
- * @param msg message we want to queue
- * @param cb callback, called when the message is given to strem
- * @param cls closure for cb
- */
+
static void
-queue_peer_message_with_cls (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg, QueuedMessageCallback cb, void *cls)
-{
- struct QueuedMessage *qm;
- qm = GNUNET_malloc (sizeof *qm);
- qm->msg = msg;
- qm->cls = cls;
- qm->cb = cb;
- GNUNET_CONTAINER_DLL_insert_tail (cpi->messages_head, cpi->messages_tail, qm);
- if (cpi->wh == NULL)
- write_queued (cpi, GNUNET_STREAM_OK, 0);
+client_send_next (struct MessageQueue *mq)
+{
+ struct ServerClientSocketState *state = mq->state;
+ int msize;
+
+ GNUNET_assert (NULL != state);
+
+ if ( (NULL != state->th) ||
+ (NULL == mq->pending_head) )
+ return;
+ msize = ntohs (mq->pending_head->msg->size);
+ state->th =
+ GNUNET_SERVER_notify_transmit_ready (state->client, msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &transmit_queued, mq);
+}
+
+
+struct MessageQueue *
+create_message_queue_for_server_client (struct ServerClientSocketState *scss)
+{
+ struct MessageQueue *mq;
+ mq = GNUNET_new (struct MessageQueue);
+ mq->add_cb = client_send_next;
+ mq->state = scss;
+ return mq;
}
/**
- * Queue a message to be sent to another peer
+ * Functions of this signature are called whenever writing operations
+ * on a stream are executed
*
- * @param cpi peer
- * @param msg message we want to queue
+ * @param cls the closure from GNUNET_STREAM_write
+ * @param status the status of the stream at the time this function is called;
+ * GNUNET_STREAM_OK if writing to stream was completed successfully;
+ * GNUNET_STREAM_TIMEOUT if the given data is not sent successfully
+ * (this doesn't mean that the data is never sent, the receiver may
+ * have read the data but its ACKs may have been lost);
+ * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ * mean time; GNUNET_STREAM_SYSERR if the stream is broken and cannot
+ * be processed.
+ * @param size the number of bytes written
*/
-static void
-queue_peer_message (struct ConsensusPeerInformation *cpi, struct GNUNET_MessageHeader *msg)
+static void
+write_queued (void *cls, enum GNUNET_STREAM_Status status, size_t size)
{
- queue_peer_message_with_cls (cpi, msg, NULL, NULL);
+ struct MessageQueue *mq = cls;
+ struct MessageStreamState *mss = mq->state;
+ struct PendingMessage *pm;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+
+ /* call cb for message we finished sending */
+ pm = mq->current_pm;
+ if (NULL != pm)
+ {
+ if (NULL != pm->sent_cb)
+ pm->sent_cb (pm->sent_cb_cls);
+ GNUNET_free (pm);
+ }
+
+ mss->wh = NULL;
+
+ pm = mq->pending_head;
+ mq->current_pm = pm;
+ if (NULL == pm)
+ return;
+ GNUNET_CONTAINER_DLL_remove (mq->pending_head, mq->pending_tail, pm);
+ mss->wh = GNUNET_STREAM_write (mss->socket, pm->msg, ntohs (pm->msg->size),
+ GNUNET_TIME_UNIT_FOREVER_REL, write_queued, cls);
+ GNUNET_assert (NULL != mss->wh);
}
-/*
static void
-clear_peer_messages (struct ConsensusPeerInformation *cpi)
+stream_socket_add_cb (struct MessageQueue *mq)
{
- cpi->messages_head = NULL;
- cpi->messages_tail = NULL;
+ if (NULL != mq->current_pm)
+ return;
+ write_queued (mq, GNUNET_STREAM_OK, 0);
}
-*/
-/**
- * Estimate set difference with two strata estimators,
- * i.e. arrays of IBFs.
- * Does not not modify its arguments.
- *
- * @param se1 first strata estimator
- * @param se2 second strata estimator
- * @return the estimated difference
- */
-static int
-estimate_difference (const struct StrataEstimator *se1,
- const struct StrataEstimator *se2)
+struct MessageQueue *
+create_message_queue_for_stream_socket (struct MessageStreamState *mss)
{
- int i;
- int count;
- count = 0;
- for (i = STRATA_COUNT - 1; i >= 0; i--)
- {
- struct InvertibleBloomFilter *diff;
- /* number of keys decoded from the ibf */
- int ibf_count;
- int more;
- ibf_count = 0;
- /* FIXME: implement this without always allocating new IBFs */
- diff = ibf_dup (se1->strata[i]);
- ibf_subtract (diff, se2->strata[i]);
- for (;;)
- {
- more = ibf_decode (diff, NULL, NULL);
- if (GNUNET_NO == more)
- {
- count += ibf_count;
- break;
- }
- if (GNUNET_SYSERR == more)
- {
- ibf_destroy (diff);
- return count * (1 << (i + 1));
- }
- ibf_count++;
- }
- ibf_destroy (diff);
- }
- return count;
+ struct MessageQueue *mq;
+ mq = GNUNET_new (struct MessageQueue);
+ mq->state = mss;
+ mq->add_cb = stream_socket_add_cb;
+ return mq;
+}
+
+
+struct PendingMessage *
+new_pending_message (uint16_t size, uint16_t type)
+{
+ struct PendingMessage *pm;
+ pm = GNUNET_malloc (sizeof *pm + size);
+ pm->msg = (void *) &pm[1];
+ pm->msg->size = htons (size);
+ pm->msg->type = htons (type);
+ return pm;
}
/**
- * Called when receiving data from a peer that is member of
- * an inhabited consensus session.
+ * Queue a message in a message queue.
*
- * @param cls the closure from GNUNET_STREAM_read
- * @param status the status of the stream at the time this function is called