diff options
Diffstat (limited to 'src/consensus/consensus_api.c')
-rw-r--r-- | src/consensus/consensus_api.c | 535 |
1 files changed, 535 insertions, 0 deletions
diff --git a/src/consensus/consensus_api.c b/src/consensus/consensus_api.c new file mode 100644 index 0000000..7ebb0a9 --- /dev/null +++ b/src/consensus/consensus_api.c @@ -0,0 +1,535 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file consensus/consensus_api.c + * @brief + * @author Florian Dold + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_client_lib.h" +#include "gnunet_consensus_service.h" +#include "consensus.h" + + +#define LOG(kind,...) GNUNET_log_from (kind, "consensus-api",__VA_ARGS__) + +/** + * Actions that can be queued. + */ +struct QueuedMessage +{ + /** + * Queued messages are stored in a doubly linked list. + */ + struct QueuedMessage *next; + + /** + * Queued messages are stored in a doubly linked list. + */ + struct QueuedMessage *prev; + + /** + * The actual queued message. + */ + struct GNUNET_MessageHeader *msg; + + /** + * Will be called after transmit, if not NULL + */ + GNUNET_CONSENSUS_InsertDoneCallback idc; + + /** + * Closure for idc + */ + void *idc_cls; +}; + + +/** + * Handle for the service. + */ +struct GNUNET_CONSENSUS_Handle +{ + /** + * Configuration to use. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Client connected to the consensus service, may be NULL if not connected. + */ + struct GNUNET_CLIENT_Connection *client; + + /** + * Callback for new elements. Not called for elements added locally. + */ + GNUNET_CONSENSUS_ElementCallback new_element_cb; + + /** + * Closure for new_element_cb + */ + void *new_element_cls; + + /** + * The (local) session identifier for the consensus session. + */ + struct GNUNET_HashCode session_id; + + /** + * Number of peers in the consensus. Optionally includes the local peer. + */ + int num_peers; + + /** + * Peer identities of peers participating in the consensus, includes the local peer. + */ + struct GNUNET_PeerIdentity **peers; + + /** + * Currently active transmit request. + */ + struct GNUNET_CLIENT_TransmitHandle *th; + + /** + * GNUNES_YES iff the join message has been sent to the service. + */ + int joined; + + /** + * Closure for the insert done callback. + */ + void *idc_cls; + + /** + * Called when the conclude operation finishes or fails. + */ + GNUNET_CONSENSUS_ConcludeCallback conclude_cb; + + /** + * Closure for the conclude callback. + */ + void *conclude_cls; + + /** + * Deadline for the conclude operation. + */ + struct GNUNET_TIME_Absolute conclude_deadline; + + unsigned int conclude_min_size; + + struct QueuedMessage *messages_head; + struct QueuedMessage *messages_tail; +}; + + + +/** + * Schedule transmitting the next message. + * + * @param consensus consensus handle + */ +static void +send_next (struct GNUNET_CONSENSUS_Handle *consensus); + + +/** + * Function called to notify a client about the connection + * begin ready to queue more data. "buf" will be + * NULL and "size" zero if the connection was closed for + * writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_queued (void *cls, size_t size, + void *buf) +{ + struct GNUNET_CONSENSUS_Handle *consensus; + struct QueuedMessage *qmsg; + size_t msg_size; + + consensus = (struct GNUNET_CONSENSUS_Handle *) cls; + consensus->th = NULL; + + qmsg = consensus->messages_head; + GNUNET_CONTAINER_DLL_remove (consensus->messages_head, consensus->messages_tail, qmsg); + + if (NULL == buf) + { + if (NULL != qmsg->idc) + { + qmsg->idc (qmsg->idc_cls, GNUNET_YES); + } + return 0; + } + + msg_size = ntohs (qmsg->msg->size); + + GNUNET_assert (size >= msg_size); + + memcpy (buf, qmsg->msg, msg_size); + if (NULL != qmsg->idc) + { + qmsg->idc (qmsg->idc_cls, GNUNET_YES); + } + + /* FIXME: free the messages */ + + send_next (consensus); + + return msg_size; +} + + +/** + * Schedule transmitting the next message. + * + * @param consensus consensus handle + */ +static void +send_next (struct GNUNET_CONSENSUS_Handle *consensus) +{ + if (NULL != consensus->th) + return; + + if (NULL != consensus->messages_head) + { + consensus->th = + GNUNET_CLIENT_notify_transmit_ready (consensus->client, ntohs (consensus->messages_head->msg->size), + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_NO, &transmit_queued, consensus); + } +} + +static void +queue_message (struct GNUNET_CONSENSUS_Handle *consensus, struct GNUNET_MessageHeader *msg) +{ + struct QueuedMessage *qm; + qm = GNUNET_malloc (sizeof *qm); + qm->msg = msg; + GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qm); +} + + +/** + * Called when the server has sent is a new element + * + * @param consensus consensus handle + * @param msg element message + */ +static void +handle_new_element (struct GNUNET_CONSENSUS_Handle *consensus, + struct GNUNET_CONSENSUS_ElementMessage *msg) +{ + struct GNUNET_CONSENSUS_Element element; + struct GNUNET_CONSENSUS_AckMessage *ack_msg; + int ret; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "received new element\n"); + + element.type = msg->element_type; + element.size = ntohs (msg->header.size) - sizeof (struct GNUNET_CONSENSUS_ElementMessage); + element.data = &msg[1]; + + ret = consensus->new_element_cb (consensus->new_element_cls, &element); + + ack_msg = GNUNET_malloc (sizeof *ack_msg); + ack_msg->header.size = htons (sizeof *ack_msg); + ack_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_ACK); + ack_msg->keep = ret; + + queue_message (consensus, (struct GNUNET_MessageHeader *) ack_msg); + + send_next (consensus); +} + + +/** + * Called when the server has announced + * that the conclusion is over. + * + * @param consensus consensus handle + * @param msg conclude done message + */ +static void +handle_conclude_done (struct GNUNET_CONSENSUS_Handle *consensus, + struct GNUNET_CONSENSUS_ConcludeDoneMessage *msg) +{ + GNUNET_assert (NULL != consensus->conclude_cb); + consensus->conclude_cb (consensus->conclude_cls, NULL); + consensus->conclude_cb = NULL; +} + + + +/** + * Type of a function to call when we receive a message + * from the service. + * + * @param cls closure + * @param msg message received, NULL on timeout or fatal error + */ +static void +message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_CONSENSUS_Handle *consensus = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "received message from consensus service\n"); + + if (msg == NULL) + { + /* Error, timeout, death */ + LOG (GNUNET_ERROR_TYPE_ERROR, "error receiving\n"); + GNUNET_CLIENT_disconnect (consensus->client); + consensus->client = NULL; + consensus->new_element_cb (NULL, NULL); + return; + } + + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_RECEIVED_ELEMENT: + handle_new_element (consensus, (struct GNUNET_CONSENSUS_ElementMessage *) msg); + break; + case GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE_DONE: + handle_conclude_done (consensus, (struct GNUNET_CONSENSUS_ConcludeDoneMessage *) msg); + break; + default: + GNUNET_break (0); + } + GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, + GNUNET_TIME_UNIT_FOREVER_REL); +} + +/** + * Function called to notify a client about the connection + * begin ready to queue more data. "buf" will be + * NULL and "size" zero if the connection was closed for + * writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_join (void *cls, size_t size, void *buf) +{ + struct GNUNET_CONSENSUS_JoinMessage *msg; + struct GNUNET_CONSENSUS_Handle *consensus; + int msize; + + GNUNET_assert (NULL != buf); + + LOG (GNUNET_ERROR_TYPE_DEBUG, "transmitting join message\n"); + + consensus = cls; + consensus->th = NULL; + consensus->joined = 1; + + msg = buf; + + msize = sizeof (struct GNUNET_CONSENSUS_JoinMessage) + + consensus->num_peers * sizeof (struct GNUNET_PeerIdentity); + + msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_JOIN); + msg->header.size = htons (msize); + msg->session_id = consensus->session_id; + msg->num_peers = htons (consensus->num_peers); + if (0 != msg->num_peers) + memcpy(&msg[1], + consensus->peers, + consensus->num_peers * sizeof (struct GNUNET_PeerIdentity)); + + send_next (consensus); + + GNUNET_CLIENT_receive (consensus->client, &message_handler, consensus, + GNUNET_TIME_UNIT_FOREVER_REL); + + return msize; +} + +/** + * Create a consensus session. + * + * @param cfg configuration to use for connecting to the consensus service + * @param num_peers number of peers in the peers array + * @param peers array of peers participating in this consensus session + * Inclusion of the local peer is optional. + * @param session_id session identifier + * Allows a group of peers to have more than consensus session. + * @param new_element_cb callback, called when a new element is added to the set by + * another peer + * @param new_element_cls closure for new_element + * @return handle to use, NULL on error + */ +struct GNUNET_CONSENSUS_Handle * +GNUNET_CONSENSUS_create (const struct GNUNET_CONFIGURATION_Handle *cfg, + unsigned int num_peers, + const struct GNUNET_PeerIdentity *peers, + const struct GNUNET_HashCode *session_id, + GNUNET_CONSENSUS_ElementCallback new_element_cb, + void *new_element_cls) +{ + struct GNUNET_CONSENSUS_Handle *consensus; + size_t join_message_size; + + consensus = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_Handle)); + consensus->cfg = cfg; + consensus->new_element_cb = new_element_cb; + consensus->new_element_cls = new_element_cls; + consensus->num_peers = num_peers; + consensus->session_id = *session_id; + + if (0 == num_peers) + consensus->peers = NULL; + else if (num_peers > 0) + consensus->peers = + GNUNET_memdup (peers, num_peers * sizeof (struct GNUNET_PeerIdentity)); + + consensus->client = GNUNET_CLIENT_connect ("consensus", cfg); + + GNUNET_assert (consensus->client != NULL); + + join_message_size = (sizeof (struct GNUNET_CONSENSUS_JoinMessage)) + + (num_peers * sizeof (struct GNUNET_PeerIdentity)); + + consensus->th = + GNUNET_CLIENT_notify_transmit_ready (consensus->client, + join_message_size, + GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_NO, &transmit_join, consensus); + + + GNUNET_assert (consensus->th != NULL); + return consensus; +} + + + +/** + * Insert an element in the set being reconsiled. Must not be called after + * "GNUNET_CONSENSUS_conclude". + * + * @param consensus handle for the consensus session + * @param element the element to be inserted + * @param idc function called when we are done with this element and it + * is thus allowed to call GNUNET_CONSENSUS_insert again + * @param idc_cls closure for 'idc' + */ +void +GNUNET_CONSENSUS_insert (struct GNUNET_CONSENSUS_Handle *consensus, + const struct GNUNET_CONSENSUS_Element *element, + GNUNET_CONSENSUS_InsertDoneCallback idc, + void *idc_cls) +{ + struct QueuedMessage *qmsg; + struct GNUNET_CONSENSUS_ElementMessage *element_msg; + size_t element_msg_size; + + LOG (GNUNET_ERROR_TYPE_DEBUG, "inserting, size=%llu\n", element->size); + + element_msg_size = (sizeof (struct GNUNET_CONSENSUS_ElementMessage) + + element->size); + + element_msg = GNUNET_malloc (element_msg_size); + element_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_INSERT); + element_msg->header.size = htons (element_msg_size); + memcpy (&element_msg[1], element->data, element->size); + + qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); + qmsg->msg = (struct GNUNET_MessageHeader *) element_msg; + qmsg->idc = idc; + qmsg->idc_cls = idc_cls; + + GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); + + send_next (consensus); +} + + +/** + * We are done with inserting new elements into the consensus; + * try to conclude the consensus within a given time window. + * After conclude has been called, no further elements may be + * inserted by the client. + * + * @param consensus consensus session + * @param timeout timeout after which the conculde callback + * must be called + * @param conclude called when the conclusion was successful + * @param conclude_cls closure for the conclude callback + */ +void +GNUNET_CONSENSUS_conclude (struct GNUNET_CONSENSUS_Handle *consensus, + struct GNUNET_TIME_Relative timeout, + unsigned int min_group_size_in_consensus, + GNUNET_CONSENSUS_ConcludeCallback conclude, + void *conclude_cls) +{ + struct QueuedMessage *qmsg; + struct GNUNET_CONSENSUS_ConcludeMessage *conclude_msg; + + GNUNET_assert (NULL != conclude); + GNUNET_assert (NULL == consensus->conclude_cb); + + consensus->conclude_cls = conclude_cls; + consensus->conclude_cb = conclude; + + conclude_msg = GNUNET_malloc (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); + conclude_msg->header.type = htons (GNUNET_MESSAGE_TYPE_CONSENSUS_CLIENT_CONCLUDE); + conclude_msg->header.size = htons (sizeof (struct GNUNET_CONSENSUS_ConcludeMessage)); + conclude_msg->timeout = GNUNET_TIME_relative_hton (timeout); + conclude_msg->min_group_size = min_group_size_in_consensus; + + qmsg = GNUNET_malloc (sizeof (struct QueuedMessage)); + qmsg->msg = (struct GNUNET_MessageHeader *) conclude_msg; + + GNUNET_CONTAINER_DLL_insert_tail (consensus->messages_head, consensus->messages_tail, qmsg); + + send_next (consensus); +} + + +/** + * Destroy a consensus handle (free all state associated with + * it, no longer call any of the callbacks). + * + * @param consensus handle to destroy + */ +void +GNUNET_CONSENSUS_destroy (struct GNUNET_CONSENSUS_Handle *consensus) +{ + if (consensus->client != NULL) + { + GNUNET_CLIENT_disconnect (consensus->client); + consensus->client = NULL; + } + if (NULL != consensus->peers) + GNUNET_free (consensus->peers); + GNUNET_free (consensus); +} + |