aboutsummaryrefslogtreecommitdiff
path: root/src/cadet/cadet_api.c
diff options
context:
space:
mode:
authorBart Polot <bart.polot+voyager@gmail.com>2017-01-31 02:58:54 +0100
committerBart Polot <bart.polot+voyager@gmail.com>2017-01-31 02:58:54 +0100
commitbc38effeecf8300b99de2f367e01c9d02fcafb78 (patch)
tree6366d9f50b482b7636479eaab8103ee82250d607 /src/cadet/cadet_api.c
parentb2c3389e8de9d24794d4b4bb499e14408101d433 (diff)
Implement the connect and create_channel call for mq api
Diffstat (limited to 'src/cadet/cadet_api.c')
-rw-r--r--src/cadet/cadet_api.c308
1 files changed, 298 insertions, 10 deletions
diff --git a/src/cadet/cadet_api.c b/src/cadet/cadet_api.c
index 2b50f781c4..3491bd75f3 100644
--- a/src/cadet/cadet_api.c
+++ b/src/cadet/cadet_api.c
@@ -38,6 +38,8 @@
/**
* Transmission queue to the service
+ *
+ * @deprecated
*/
struct GNUNET_CADET_TransmitHandle
{
@@ -117,17 +119,26 @@ union CadetInfoCB
struct GNUNET_CADET_Handle
{
/**
+ * Flag to indicate old or MQ API.
+ */
+ int mq_api;
+
+ /**
* Message queue (if available).
*/
struct GNUNET_MQ_Handle *mq;
/**
* Set of handlers used for processing incoming messages in the channels
+ *
+ * @deprecated
*/
const struct GNUNET_CADET_MessageHandler *message_handlers;
/**
* Number of handlers in the handlers array.
+ *
+ * @deprecated
*/
unsigned int n_handlers;
@@ -153,16 +164,22 @@ struct GNUNET_CADET_Handle
/**
* Closure for all the handlers given by the client
+ *
+ * @deprecated
*/
void *cls;
/**
* Messages to send to the service, head.
+ *
+ * @deprecated
*/
struct GNUNET_CADET_TransmitHandle *th_head;
/**
* Messages to send to the service, tail.
+ *
+ * @deprecated
*/
struct GNUNET_CADET_TransmitHandle *th_tail;
@@ -241,9 +258,9 @@ struct GNUNET_CADET_Channel
struct GNUNET_CADET_ClientChannelNumber ccn;
/**
- * Channel's port, if any.
+ * Channel's port, if incoming.
*/
- struct GNUNET_CADET_Port *port;
+ struct GNUNET_CADET_Port *incoming_port;
/**
* Other end of the channel.
@@ -262,9 +279,30 @@ struct GNUNET_CADET_Channel
/**
* Are we allowed to send to the service?
+ *
+ * @deprecated
*/
unsigned int allow_send;
+ /****************************************************************************/
+ /***************************** MQ ************************************/
+ /****************************************************************************/
+
+ /**
+ * Message Queue for the channel.
+ */
+ struct GNUNET_MQ_Handle *mq;
+
+ /**
+ * Window change handler.
+ */
+ GNUNET_CADET_WindowSizeEventHandler window_changes;
+
+ /**
+ * Disconnect handler.
+ */
+ GNUNET_CADET_DisconnectEventHandler disconnects;
+
};
@@ -611,7 +649,7 @@ handle_channel_created (void *cls,
ch->peer = GNUNET_PEER_intern (&msg->peer);
ch->cadet = h;
ch->ccn = ccn;
- ch->port = port;
+ ch->incoming_port = port;
ch->options = ntohl (msg->opt);
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2047,9 +2085,9 @@ cadet_mq_ntr (void *cls, size_t size,
* @param impl_state state of the implementation
*/
static void
-cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
- const struct GNUNET_MessageHeader *msg,
- void *impl_state)
+cadet_mq_send_impl_old (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
{
struct CadetMQState *state = impl_state;
@@ -2075,8 +2113,8 @@ cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
* @param impl_state state of the implementation
*/
static void
-cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
- void *impl_state)
+cadet_mq_destroy_impl_old (struct GNUNET_MQ_Handle *mq,
+ void *impl_state)
{
struct CadetMQState *state = impl_state;
@@ -2104,8 +2142,8 @@ GNUNET_CADET_mq_create (struct GNUNET_CADET_Channel *channel)
state = GNUNET_new (struct CadetMQState);
state->channel = channel;
- mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
- &cadet_mq_destroy_impl,
+ mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl_old,
+ &cadet_mq_destroy_impl_old,
NULL, /* FIXME: cancel impl. */
state,
NULL, /* no msg handlers */
@@ -2136,3 +2174,253 @@ GC_u2h (uint32_t port)
return &hash;
}
+
+
+
+/******************************************************************************/
+/******************************* MQ-BASED API *********************************/
+/******************************************************************************/
+
+/**
+ * Connect to the MQ-based cadet service.
+ *
+ * @param cfg Configuration to use.
+ *
+ * @return Handle to the cadet service NULL on error.
+ */
+struct GNUNET_CADET_Handle *
+GNUNET_CADET_connecT (const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_CADET_Handle *h;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "GNUNET_CADET_connecT()\n");
+ h = GNUNET_new (struct GNUNET_CADET_Handle);
+ h->cfg = cfg;
+ h->mq_api = GNUNET_YES;
+ h->ports = GNUNET_CONTAINER_multihashmap_create (4, GNUNET_YES);
+ do_reconnect (h);
+ if (h->mq == NULL)
+ {
+ GNUNET_break (0);
+ GNUNET_CADET_disconnect (h);
+ return NULL;
+ }
+ h->next_ccn.channel_of_client = htonl (GNUNET_CADET_LOCAL_CHANNEL_ID_CLI);
+ h->reconnect_time = GNUNET_TIME_UNIT_MILLISECONDS;
+ h->reconnect_task = NULL;
+
+ return h;
+}
+
+
+/**
+ * Open a port to receive incomming MQ-based channels.
+ *
+ * @param h CADET handle.
+ * @param port Hash identifying the port.
+ * @param connects Function called when an incoming channel is connected.
+ * @param connects_cls Closure for the @a connects handler.
+ * @param window_changes Function called when the transmit window size changes.
+ * @param disconnects Function called when a channel is disconnected.
+ * @param handlers Callbacks for messages we care about, NULL-terminated.
+ *
+ * @return Port handle.
+ */
+struct GNUNET_CADET_Port *
+GNUNET_CADET_open_porT (struct GNUNET_CADET_Handle *h,
+ const struct GNUNET_HashCode *port,
+ GNUNET_CADET_ConnectEventHandler connects,
+ void * connects_cls,
+ GNUNET_CADET_WindowSizeEventHandler window_changes,
+ GNUNET_CADET_DisconnectEventHandler disconnects,
+ const struct GNUNET_MQ_MessageHandler *handlers)
+{
+ return NULL;
+}
+
+
+/**
+ * Implement sending functionality of a message queue for
+ * us sending messages to a peer.
+ *
+ * Encapsulates the payload message in a #GNUNET_CADET_LocalData message
+ * in order to label the message with the channel ID and send the
+ * encapsulated message to the service.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg,
+ void *impl_state)
+{
+ struct GNUNET_CADET_Channel *ch = impl_state;
+ struct GNUNET_CADET_Handle *h = ch->cadet;
+ uint16_t msize;
+ struct GNUNET_MQ_Envelope *env;
+ struct GNUNET_CADET_LocalData *cadet_msg;
+
+
+ if (NULL == h->mq)
+ {
+ /* We're currently reconnecting, pretend this worked */
+ GNUNET_MQ_impl_send_continue (mq);
+ return;
+ }
+
+ /* check message size for sanity */
+ msize = ntohs (msg->size);
+ if (msize > GNUNET_CONSTANTS_MAX_CADET_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ GNUNET_MQ_impl_send_continue (mq);
+ return;
+ }
+
+ env = GNUNET_MQ_msg_nested_mh (cadet_msg,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_DATA,
+ msg);
+ cadet_msg->ccn = ch->ccn;
+ GNUNET_MQ_send (h->mq, env);
+ GNUNET_MQ_impl_send_continue (mq);
+}
+
+
+/**
+ * Handle destruction of a message queue. Implementations must not
+ * free @a mq, but should take care of @a impl_state.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+cadet_mq_destroy_impl (struct GNUNET_MQ_Handle *mq,
+ void *impl_state)
+{
+ struct GNUNET_CADET_Channel *ch = impl_state;
+
+ GNUNET_assert (mq == ch->mq);
+ ch->mq = NULL;
+}
+
+
+/**
+ * We had an error processing a message we forwarded from a peer to
+ * the CADET service. We should just complain about it but otherwise
+ * continue processing.
+ *
+ * @param cls closure
+ * @param error error code
+ */
+static void
+cadet_mq_error_handler (void *cls,
+ enum GNUNET_MQ_Error error)
+{
+ GNUNET_break_op (0);
+}
+
+
+/**
+ * Implementation function that cancels the currently sent message.
+ * Should basically undo whatever #mq_send_impl() did.
+ *
+ * @param mq message queue
+ * @param impl_state state specific to the implementation
+ */
+static void
+cadet_mq_cancel_impl (struct GNUNET_MQ_Handle *mq,
+ void *impl_state)
+{
+ struct GNUNET_CADET_Channel *ch = impl_state;
+
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Cannot cancel mq message on channel %X of %p\n",
+ ch->ccn.channel_of_client, ch->cadet);
+
+ GNUNET_break (0);
+}
+
+
+/**
+ * Create a new channel towards a remote peer.
+ *
+ * If the destination port is not open by any peer or the destination peer
+ * does not accept the channel, #GNUNET_CADET_ChannelEndHandler will be called
+ * for this channel.
+ *
+ * @param h CADET handle.
+ * @param channel_cls Closure for the channel. It's given to:
+ * - The disconnect handler @a disconnects
+ * - Each message type callback in @a handlers
+ * @param destination Peer identity the channel should go to.
+ * @param port Identification of the destination port.
+ * @param options CadetOption flag field, with all desired option bits set to 1.
+ * @param window_changes Function called when the transmit window size changes.
+ * @param disconnects Function called when the channel is disconnected.
+ * @param handlers Callbacks for messages we care about, NULL-terminated.
+ *
+ * @return Handle to the channel.
+ */
+struct GNUNET_CADET_Channel *
+GNUNET_CADET_channel_creatE (struct GNUNET_CADET_Handle *h,
+ void *channel_cls,
+ const struct GNUNET_PeerIdentity *destination,
+ const struct GNUNET_HashCode *port,
+ enum GNUNET_CADET_ChannelOption options,
+ GNUNET_CADET_WindowSizeEventHandler window_changes,
+ GNUNET_CADET_DisconnectEventHandler disconnects,
+ const struct GNUNET_MQ_MessageHandler *handlers)
+{
+ struct GNUNET_CADET_Channel *ch;
+ struct GNUNET_CADET_ClientChannelNumber ccn;
+ struct GNUNET_CADET_LocalChannelCreateMessage *msg;
+ struct GNUNET_MQ_Envelope *env;
+
+ /* Save parameters */
+ ccn.channel_of_client = htonl (0);
+ ch = create_channel (h, ccn);
+ ch->ctx = channel_cls;
+ ch->peer = GNUNET_PEER_intern (destination);
+ ch->options = options;
+ ch->window_changes = window_changes;
+ ch->disconnects = disconnects;
+
+ /* Create MQ for channel */
+ ch->mq = GNUNET_MQ_queue_for_callbacks (&cadet_mq_send_impl,
+ &cadet_mq_destroy_impl,
+ &cadet_mq_cancel_impl,
+ ch,
+ handlers,
+ &cadet_mq_error_handler,
+ ch);
+ GNUNET_MQ_set_handlers_closure (ch->mq, channel_cls);
+
+ /* Request channel creation to service */
+ env = GNUNET_MQ_msg (msg,
+ GNUNET_MESSAGE_TYPE_CADET_LOCAL_CHANNEL_CREATE);
+ msg->ccn = ch->ccn;
+ msg->port = *port;
+ msg->peer = *destination;
+ msg->opt = htonl (options);
+ GNUNET_MQ_send (h->mq,
+ env);
+
+ return ch;
+}
+
+
+/**
+ * Obtain the message queue for a connected peer.
+ *
+ * @param channel The channel handle from which to get the MQ.
+ *
+ * @return NULL if @a channel is not yet connected.
+ */
+struct GNUNET_MQ_Handle *
+GNUNET_CADET_get_mq (const struct GNUNET_CADET_Channel *channel)
+{
+ return channel->mq;
+}