aboutsummaryrefslogtreecommitdiff
path: root/src/fs
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2017-02-17 11:06:15 +0100
committerChristian Grothoff <christian@grothoff.org>2017-02-17 11:06:15 +0100
commit9727e5e53721dace7abbcc5bcd28c838af4291cc (patch)
treeca32ed19cf0d4129d3497261531aa40a19599280 /src/fs
parentc793bffc39fe1445616c9d0cb071d62575dea217 (diff)
convert to new CADET API, not working due to CADET-API internal bugs
Diffstat (limited to 'src/fs')
-rw-r--r--src/fs/gnunet-service-fs.c2
-rw-r--r--src/fs/gnunet-service-fs_cadet.h49
-rw-r--r--src/fs/gnunet-service-fs_cadet_client.c290
-rw-r--r--src/fs/gnunet-service-fs_cadet_server.c286
4 files changed, 290 insertions, 337 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index e38fdb0323..8c605c6a2c 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -1177,7 +1177,6 @@ handle_client_unindex (void *cls,
static void
shutdown_task (void *cls)
{
- GSF_cadet_stop_client ();
GSF_cadet_stop_server ();
if (NULL != GSF_core)
{
@@ -1320,7 +1319,6 @@ main_init (const struct GNUNET_CONFIGURATION_Handle *c)
NULL);
datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
GSF_cadet_start_server ();
- GSF_cadet_start_client ();
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
NULL);
return GNUNET_OK;
diff --git a/src/fs/gnunet-service-fs_cadet.h b/src/fs/gnunet-service-fs_cadet.h
index 060a3993c9..1fbd3a4060 100644
--- a/src/fs/gnunet-service-fs_cadet.h
+++ b/src/fs/gnunet-service-fs_cadet.h
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- Copyright (C) 2012 GNUnet e.V.
+ Copyright (C) 2012, 2017 GNUnet e.V.
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -38,14 +38,15 @@ struct GSF_CadetRequest;
* @param cls closure
* @param type type of the block, ANY on error
* @param expiration expiration time for the block
- * @param data_size number of bytes in 'data', 0 on error
+ * @param data_size number of bytes in @a data, 0 on error
* @param data reply block data, NULL on error
*/
-typedef void (*GSF_CadetReplyProcessor)(void *cls,
- enum GNUNET_BLOCK_Type type,
- struct GNUNET_TIME_Absolute expiration,
- size_t data_size,
- const void *data);
+typedef void
+(*GSF_CadetReplyProcessor)(void *cls,
+ enum GNUNET_BLOCK_Type type,
+ struct GNUNET_TIME_Absolute expiration,
+ size_t data_size,
+ const void *data);
/**
@@ -55,14 +56,28 @@ typedef void (*GSF_CadetReplyProcessor)(void *cls,
* @param query hash to query for the block
* @param type desired type for the block
* @param proc function to call with result
- * @param proc_cls closure for 'proc'
+ * @param proc_cls closure for @a proc
* @return handle to cancel the operation
*/
struct GSF_CadetRequest *
GSF_cadet_query (const struct GNUNET_PeerIdentity *target,
- const struct GNUNET_HashCode *query,
- enum GNUNET_BLOCK_Type type,
- GSF_CadetReplyProcessor proc, void *proc_cls);
+ const struct GNUNET_HashCode *query,
+ enum GNUNET_BLOCK_Type type,
+ GSF_CadetReplyProcessor proc,
+ void *proc_cls);
+
+/**
+ * Function called on each active cadets to shut them down.
+ *
+ * @param cls NULL
+ * @param key target peer, unused
+ * @param value the `struct CadetHandle` to destroy
+ * @return #GNUNET_YES (continue to iterate)
+ */
+int
+GSF_cadet_release_clients (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value);
/**
@@ -89,17 +104,15 @@ void
GSF_cadet_stop_server (void);
/**
- * Initialize subsystem for non-anonymous file-sharing.
+ * Cadet channel for creating outbound channels.
*/
-void
-GSF_cadet_start_client (void);
-
+extern struct GNUNET_CADET_Handle *cadet_handle;
/**
- * Shutdown subsystem for non-anonymous file-sharing.
+ * Map from peer identities to 'struct CadetHandles' with cadet
+ * channels to those peers.
*/
-void
-GSF_cadet_stop_client (void);
+extern struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
GNUNET_NETWORK_STRUCT_BEGIN
diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c
index 4e268b93c2..193fe2263f 100644
--- a/src/fs/gnunet-service-fs_cadet_client.c
+++ b/src/fs/gnunet-service-fs_cadet_client.c
@@ -155,13 +155,13 @@ struct CadetHandle
/**
* Cadet channel for creating outbound channels.
*/
-static struct GNUNET_CADET_Handle *cadet_handle;
+struct GNUNET_CADET_Handle *cadet_handle;
/**
* Map from peer identities to 'struct CadetHandles' with cadet
* channels to those peers.
*/
-static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
+struct GNUNET_CONTAINER_MultiPeerMap *cadet_map;
/* ********************* client-side code ************************* */
@@ -419,9 +419,9 @@ struct HandleReplyClosure
* @return #GNUNET_YES (continue to iterate)
*/
static int
-handle_reply (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
+process_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
{
struct HandleReplyClosure *hrc = cls;
struct GSF_CadetRequest *sr = value;
@@ -443,38 +443,43 @@ handle_reply (void *cls,
* is received.
*
* @param cls closure with the `struct CadetHandle`
- * @param channel channel handle
- * @param channel_ctx channel context
- * @param message the actual message
+ * @param srm the actual message
* @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
*/
static int
-reply_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- void **channel_ctx,
- const struct GNUNET_MessageHeader *message)
+check_reply (void *cls,
+ const struct CadetReplyMessage *srm)
{
- struct CadetHandle *mh = *channel_ctx;
- const struct CadetReplyMessage *srm;
+ /* We check later... */
+ return GNUNET_OK;
+}
+
+
+/**
+ * Functions with this signature are called whenever a complete reply
+ * is received.
+ *
+ * @param cls closure with the `struct CadetHandle`
+ * @param srm the actual message
+ */
+static void
+handle_reply (void *cls,
+ const struct CadetReplyMessage *srm)
+{
+ struct CadetHandle *mh = cls;
struct HandleReplyClosure hrc;
uint16_t msize;
enum GNUNET_BLOCK_Type type;
struct GNUNET_HashCode query;
- msize = ntohs (message->size);
- if (sizeof (struct CadetReplyMessage) > msize)
- {
- GNUNET_break_op (0);
- reset_cadet_async (mh);
- return GNUNET_SYSERR;
- }
- srm = (const struct CadetReplyMessage *) message;
- msize -= sizeof (struct CadetReplyMessage);
+ msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage);
type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
if (GNUNET_YES !=
GNUNET_BLOCK_get_key (GSF_block_ctx,
type,
- &srm[1], msize, &query))
+ &srm[1],
+ msize,
+ &query))
{
GNUNET_break_op (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -483,13 +488,13 @@ reply_cb (void *cls,
msize,
GNUNET_i2s (&mh->target));
reset_cadet_async (mh);
- return GNUNET_SYSERR;
+ return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received reply `%s' via cadet from peer %s\n",
GNUNET_h2s (&query),
GNUNET_i2s (&mh->target));
- GNUNET_CADET_receive_done (channel);
+ GNUNET_CADET_receive_done (mh->channel);
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via cadet"), 1,
GNUNET_NO);
@@ -500,16 +505,103 @@ reply_cb (void *cls,
hrc.found = GNUNET_NO;
GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map,
&query,
- &handle_reply,
+ &process_reply,
&hrc);
if (GNUNET_NO == hrc.found)
{
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# replies received via cadet dropped"), 1,
GNUNET_NO);
- return GNUNET_OK;
}
- return GNUNET_OK;
+}
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the `struct CadetHandle`
+ * @param key the key of the entry in the map (the query)
+ * @param value the `struct GSF_CadetRequest` to clean up
+ * @return #GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GSF_CadetRequest *sr = value;
+
+ GSF_cadet_query_cancel (sr);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Function called by cadet when a client disconnects.
+ * Cleans up our `struct CadetClient` of that channel.
+ *
+ * @param cls our `struct CadetClient`
+ * @param channel channel of the disconnecting client
+ */
+static void
+disconnect_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel)
+{
+ struct CadetHandle *mh = cls;
+ struct GSF_CadetRequest *sr;
+
+ if (NULL == mh->channel)
+ return; /* being destroyed elsewhere */
+ GNUNET_assert (channel == mh->channel);
+ mh->channel = NULL;
+ while (NULL != (sr = mh->pending_head))
+ GSF_cadet_query_cancel (sr);
+ /* first remove `mh` from the `cadet_map`, so that if the
+ callback from `free_waiting_entry()` happens to re-issue
+ the request, we don't immediately have it back in the
+ `waiting_map`. */
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multipeermap_remove (cadet_map,
+ &mh->target,
+ mh));
+ GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
+ &free_waiting_entry,
+ mh);
+ if (NULL != mh->wh)
+ GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
+ if (NULL != mh->timeout_task)
+ GNUNET_SCHEDULER_cancel (mh->timeout_task);
+ if (NULL != mh->reset_task)
+ GNUNET_SCHEDULER_cancel (mh->reset_task);
+ GNUNET_assert (0 ==
+ GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
+ GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
+ GNUNET_free (mh);
+}
+
+
+/**
+ * Function called whenever an MQ-channel's transmission window size changes.
+ *
+ * The first callback in an outgoing channel will be with a non-zero value
+ * and will mean the channel is connected to the destination.
+ *
+ * For an incoming channel it will be called immediately after the
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ *
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
+ * @param window_size New window size. If the is more messages than buffer size
+ * this value will be negative..
+ */
+static void
+window_change_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel,
+ int window_size)
+{
+ /* FIXME: for flow control, implement? */
}
@@ -552,14 +644,25 @@ get_cadet (const struct GNUNET_PeerIdentity *target)
GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
&port);
- mh->channel = GNUNET_CADET_channel_create (cadet_handle,
- mh,
- &mh->target,
- &port,
- GNUNET_CADET_OPTION_RELIABLE);
- GNUNET_assert (mh ==
- GNUNET_CONTAINER_multipeermap_get (cadet_map,
- target));
+
+ {
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_var_size (reply,
+ GNUNET_MESSAGE_TYPE_FS_CADET_REPLY,
+ struct CadetReplyMessage,
+ mh),
+ GNUNET_MQ_handler_end ()
+ };
+
+ mh->channel = GNUNET_CADET_channel_creatE (cadet_handle,
+ mh,
+ &mh->target,
+ &port,
+ GNUNET_CADET_OPTION_RELIABLE,
+ &window_change_cb,
+ &disconnect_cb,
+ handlers);
+ }
return mh;
}
@@ -646,93 +749,6 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr)
/**
- * Iterator called on each entry in a waiting map to
- * call the 'proc' continuation and release associated
- * resources.
- *
- * @param cls the `struct CadetHandle`
- * @param key the key of the entry in the map (the query)
- * @param value the `struct GSF_CadetRequest` to clean up
- * @return #GNUNET_YES (continue to iterate)
- */
-static int
-free_waiting_entry (void *cls,
- const struct GNUNET_HashCode *key,
- void *value)
-{
- struct GSF_CadetRequest *sr = value;
-
- GSF_cadet_query_cancel (sr);
- return GNUNET_YES;
-}
-
-
-/**
- * Function called by cadet when a client disconnects.
- * Cleans up our `struct CadetClient` of that channel.
- *
- * @param cls NULL
- * @param channel channel of the disconnecting client
- * @param channel_ctx our `struct CadetClient`
- */
-static void
-cleaner_cb (void *cls,
- const struct GNUNET_CADET_Channel *channel,
- void *channel_ctx)
-{
- struct CadetHandle *mh = channel_ctx;
- struct GSF_CadetRequest *sr;
-
- if (NULL == mh->channel)
- return; /* being destroyed elsewhere */
- GNUNET_assert (channel == mh->channel);
- mh->channel = NULL;
- while (NULL != (sr = mh->pending_head))
- GSF_cadet_query_cancel (sr);
- /* first remove `mh` from the `cadet_map`, so that if the
- callback from `free_waiting_entry()` happens to re-issue
- the request, we don't immediately have it back in the
- `waiting_map`. */
- GNUNET_assert (GNUNET_OK ==
- GNUNET_CONTAINER_multipeermap_remove (cadet_map,
- &mh->target,
- mh));
- GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map,
- &free_waiting_entry,
- mh);
- if (NULL != mh->wh)
- GNUNET_CADET_notify_transmit_ready_cancel (mh->wh);
- if (NULL != mh->timeout_task)
- GNUNET_SCHEDULER_cancel (mh->timeout_task);
- if (NULL != mh->reset_task)
- GNUNET_SCHEDULER_cancel (mh->reset_task);
- GNUNET_assert (0 ==
- GNUNET_CONTAINER_multihashmap_size (mh->waiting_map));
- GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map);
- GNUNET_free (mh);
-}
-
-
-/**
- * Initialize subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_start_client ()
-{
- static const struct GNUNET_CADET_MessageHandler handlers[] = {
- { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 },
- { NULL, 0, 0 }
- };
-
- cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
- cadet_handle = GNUNET_CADET_connect (GSF_cfg,
- NULL,
- &cleaner_cb,
- handlers);
-}
-
-
-/**
* Function called on each active cadets to shut them down.
*
* @param cls NULL
@@ -740,10 +756,10 @@ GSF_cadet_start_client ()
* @param value the `struct CadetHandle` to destroy
* @return #GNUNET_YES (continue to iterate)
*/
-static int
-release_cadets (void *cls,
- const struct GNUNET_PeerIdentity *key,
- void *value)
+int
+GSF_cadet_release_clients (void *cls,
+ const struct GNUNET_PeerIdentity *key,
+ void *value)
{
struct CadetHandle *mh = value;
@@ -756,23 +772,5 @@ release_cadets (void *cls,
}
-/**
- * Shutdown subsystem for non-anonymous file-sharing.
- */
-void
-GSF_cadet_stop_client ()
-{
- GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
- &release_cadets,
- NULL);
- GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
- cadet_map = NULL;
- if (NULL != cadet_handle)
- {
- GNUNET_CADET_disconnect (cadet_handle);
- cadet_handle = NULL;
- }
-}
-
/* end of gnunet-service-fs_cadet_client.c */
diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c
index ac86537c3e..0a72a82793 100644
--- a/src/fs/gnunet-service-fs_cadet_server.c
+++ b/src/fs/gnunet-service-fs_cadet_server.c
@@ -124,9 +124,9 @@ struct CadetClient
/**
- * Listen channel for incoming requests.
+ * Listen port for incoming requests.
*/
-static struct GNUNET_CADET_Handle *listen_channel;
+static struct GNUNET_CADET_Port *cadet_port;
/**
* Head of DLL of cadet clients.
@@ -188,121 +188,29 @@ refresh_timeout_task (struct CadetClient *sc)
/**
- * We're done handling a request from a client, read the next one.
+ * Check if we are done with the write queue, and if so tell CADET
+ * that we are ready to read more.
*
- * @param sc client to continue reading requests from
+ * @param cls where to process the write queue
*/
static void
-continue_reading (struct CadetClient *sc)
-{
- refresh_timeout_task (sc);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Finished processing cadet request from client %p, ready to receive the next one\n",
- sc);
- GNUNET_CADET_receive_done (sc->channel);
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct CadetClient *sc);
-
-
-/**
- * Send a reply now, cadet is ready.
- *
- * @param cls closure with the `struct CadetClient` which sent the query
- * @param size number of bytes available in @a buf
- * @param buf where to write the message
- * @return number of bytes written to @a buf
- */
-static size_t
-write_continuation (void *cls,
- size_t size,
- void *buf)
+continue_writing (void *cls)
{
struct CadetClient *sc = cls;
- struct GNUNET_CADET_Channel *tun;
- struct WriteQueueItem *wqi;
- size_t ret;
-
- sc->wh = NULL;
- if (NULL == (wqi = sc->wqi_head))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write queue empty, reading more requests\n");
- return 0;
- }
- if ( (0 == size) ||
- (size < wqi->msize) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmission of reply failed, terminating cadet\n");
- tun = sc->channel;
- sc->channel = NULL;
- GNUNET_CADET_channel_destroy (tun);
- return 0;
- }
- GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitted %u byte reply via cadet to %p\n",
- (unsigned int) size,
- sc);
- GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# Blocks transferred via cadet"), 1,
- GNUNET_NO);
- ret = wqi->msize;
- GNUNET_memcpy (buf, &wqi[1], ret);
- GNUNET_free (wqi);
- continue_writing (sc);
- return ret;
-}
-
-
-/**
- * Transmit the next entry from the write queue.
- *
- * @param sc where to process the write queue
- */
-static void
-continue_writing (struct CadetClient *sc)
-{
- struct WriteQueueItem *wqi;
- struct GNUNET_CADET_Channel *tun;
+ struct GNUNET_MQ_Handle *mq;
- if (NULL != sc->wh)
+ mq = GNUNET_CADET_get_mq (sc->channel);
+ if (0 != GNUNET_MQ_get_length (mq))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Write pending, waiting for it to complete\n");
- return; /* write already pending */
- }
- if (NULL == (wqi = sc->wqi_head))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write queue empty, reading more requests\n");
- continue_reading (sc);
- return;
- }
- sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO,
- GNUNET_TIME_UNIT_FOREVER_REL,
- wqi->msize,
- &write_continuation,
- sc);
- if (NULL == sc->wh)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Write failed; terminating cadet\n");
- tun = sc->channel;
- sc->channel = NULL;
- GNUNET_CADET_channel_destroy (tun);
return;
}
+ refresh_timeout_task (sc);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Finished processing cadet request from client %p, ready to receive the next one\n",
+ sc);
+ GNUNET_CADET_receive_done (sc->channel);
}
@@ -333,7 +241,7 @@ handle_datastore_reply (void *cls,
{
struct CadetClient *sc = cls;
size_t msize = size + sizeof (struct CadetReplyMessage);
- struct WriteQueueItem *wqi;
+ struct GNUNET_MQ_Envelope *env;
struct CadetReplyMessage *srm;
sc->qe = NULL;
@@ -357,7 +265,8 @@ handle_datastore_reply (void *cls,
GNUNET_h2s (key));
}
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# queries received via CADET not answered"), 1,
+ gettext_noop ("# queries received via CADET not answered"),
+ 1,
GNUNET_NO);
continue_writing (sc);
return;
@@ -369,9 +278,13 @@ handle_datastore_reply (void *cls,
GNUNET_h2s (key));
if (GNUNET_OK !=
GNUNET_FS_handle_on_demand_block (key,
- size, data, type,
- priority, anonymity,
- expiration, uid,
+ size,
+ data,
+ type,
+ priority,
+ anonymity,
+ expiration,
+ uid,
&handle_datastore_reply,
sc))
{
@@ -394,19 +307,23 @@ handle_datastore_reply (void *cls,
(unsigned int) type,
GNUNET_h2s (key),
sc);
- wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
- wqi->msize = msize;
- srm = (struct CadetReplyMessage *) &wqi[1];
- srm->header.size = htons ((uint16_t) msize);
- srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
+ env = GNUNET_MQ_msg_extra (srm,
+ size,
+ GNUNET_MESSAGE_TYPE_FS_CADET_REPLY);
srm->type = htonl (type);
srm->expiration = GNUNET_TIME_absolute_hton (expiration);
- GNUNET_memcpy (&srm[1], data, size);
- sc->reply_size = msize;
- GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
- sc->wqi_tail,
- wqi);
- continue_writing (sc);
+ GNUNET_memcpy (&srm[1],
+ data,
+ size);
+ GNUNET_MQ_notify_sent (env,
+ &continue_writing,
+ sc);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Blocks transferred via cadet"),
+ 1,
+ GNUNET_NO);
+ GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel),
+ env);
}
@@ -414,30 +331,22 @@ handle_datastore_reply (void *cls,
* Functions with this signature are called whenever a
* complete query message is received.
*
- * Do not call #GNUNET_SERVER_mst_destroy() in callback
- *
* @param cls closure with the `struct CadetClient`
- * @param channel channel handle
- * @param channel_ctx channel context
- * @param message the actual message
- * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing
+ * @param sqm the actual message
*/
-static int
-request_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- void **channel_ctx,
- const struct GNUNET_MessageHeader *message)
+static void
+handle_request (void *cls,
+ const struct CadetQueryMessage *sqm)
{
- struct CadetClient *sc = *channel_ctx;
- const struct CadetQueryMessage *sqm;
+ struct CadetClient *sc = cls;
- sqm = (const struct CadetQueryMessage *) message;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received query for `%s' via cadet from client %p\n",
GNUNET_h2s (&sqm->query),
sc);
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# queries received via cadet"), 1,
+ gettext_noop ("# queries received via cadet"),
+ 1,
GNUNET_NO);
refresh_timeout_task (sc);
sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
@@ -446,14 +355,14 @@ request_cb (void *cls,
ntohl (sqm->type),
0 /* priority */,
GSF_datastore_queue_size,
- &handle_datastore_reply, sc);
+ &handle_datastore_reply,
+ sc);
if (NULL == sc->qe)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Queueing request with datastore failed (queue full?)\n");
continue_writing (sc);
}
- return GNUNET_OK;
}
@@ -464,16 +373,12 @@ request_cb (void *cls,
* @param channel the channel representing the cadet
* @param initiator the identity of the peer who wants to establish a cadet
* with us; NULL on binding error
- * @param port cadet port used for the incoming connection
- * @param options channel option flags
- * @return initial channel context (our 'struct CadetClient')
+ * @return initial channel context (our `struct CadetClient`)
*/
static void *
-accept_cb (void *cls,
- struct GNUNET_CADET_Channel *channel,
- const struct GNUNET_PeerIdentity *initiator,
- const struct GNUNET_HashCode *port,
- enum GNUNET_CADET_ChannelOption options)
+connect_cb (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ const struct GNUNET_PeerIdentity *initiator)
{
struct CadetClient *sc;
@@ -481,13 +386,15 @@ accept_cb (void *cls,
if (sc_count >= sc_count_max)
{
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# cadet client connections rejected"), 1,
+ gettext_noop ("# cadet client connections rejected"),
+ 1,
GNUNET_NO);
GNUNET_CADET_channel_destroy (channel);
return NULL;
}
GNUNET_STATISTICS_update (GSF_stats,
- gettext_noop ("# cadet connections active"), 1,
+ gettext_noop ("# cadet connections active"),
+ 1,
GNUNET_NO);
sc = GNUNET_new (struct CadetClient);
sc->channel = channel;
@@ -506,18 +413,17 @@ accept_cb (void *cls,
/**
* Function called by cadet when a client disconnects.
- * Cleans up our 'struct CadetClient' of that channel.
+ * Cleans up our `struct CadetClient` of that channel.
*
- * @param cls NULL
+ * @param cls our `struct CadetClient`
* @param channel channel of the disconnecting client
- * @param channel_ctx our 'struct CadetClient'
+ * @param channel_ctx
*/
static void
-cleaner_cb (void *cls,
- const struct GNUNET_CADET_Channel *channel,
- void *channel_ctx)
+disconnect_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel)
{
- struct CadetClient *sc = channel_ctx;
+ struct CadetClient *sc = cls;
struct WriteQueueItem *wqi;
if (NULL == sc)
@@ -552,15 +458,42 @@ cleaner_cb (void *cls,
}
+
+/**
+ * Function called whenever an MQ-channel's transmission window size changes.
+ *
+ * The first callback in an outgoing channel will be with a non-zero value
+ * and will mean the channel is connected to the destination.
+ *
+ * For an incoming channel it will be called immediately after the
+ * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value.
+ *
+ * @param cls Channel closure.
+ * @param channel Connection to the other end (henceforth invalid).
+ * @param window_size New window size. If the is more messages than buffer size
+ * this value will be negative..
+ */
+static void
+window_change_cb (void *cls,
+ const struct GNUNET_CADET_Channel *channel,
+ int window_size)
+{
+ /* FIXME: could do flow control here... */
+}
+
+
/**
* Initialize subsystem for non-anonymous file-sharing.
*/
void
GSF_cadet_start_server ()
{
- static const struct GNUNET_CADET_MessageHandler handlers[] = {
- { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)},
- { NULL, 0, 0 }
+ struct GNUNET_MQ_MessageHandler handlers[] = {
+ GNUNET_MQ_hd_fixed_size (request,
+ GNUNET_MESSAGE_TYPE_FS_CADET_QUERY,
+ struct CadetQueryMessage,
+ NULL),
+ GNUNET_MQ_handler_end ()
};
struct GNUNET_HashCode port;
@@ -573,18 +506,19 @@ GSF_cadet_start_server ()
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Initializing cadet FS server with a limit of %llu connections\n",
sc_count_max);
- listen_channel = GNUNET_CADET_connect (GSF_cfg,
- NULL,
- &cleaner_cb,
- handlers);
- GNUNET_assert (NULL != listen_channel);
+ cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES);
+ cadet_handle = GNUNET_CADET_connecT (GSF_cfg);
+ GNUNET_assert (NULL != cadet_handle);
GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER,
strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER),
&port);
- GNUNET_CADET_open_port (listen_channel,
- &port,
- &accept_cb,
- NULL);
+ cadet_port = GNUNET_CADET_open_porT (cadet_handle,
+ &port,
+ &connect_cb,
+ NULL,
+ &window_change_cb,
+ &disconnect_cb,
+ handlers);
}
@@ -594,10 +528,20 @@ GSF_cadet_start_server ()
void
GSF_cadet_stop_server ()
{
- if (NULL != listen_channel)
+ GNUNET_CONTAINER_multipeermap_iterate (cadet_map,
+ &GSF_cadet_release_clients,
+ NULL);
+ GNUNET_CONTAINER_multipeermap_destroy (cadet_map);
+ cadet_map = NULL;
+ if (NULL != cadet_port)
+ {
+ GNUNET_CADET_close_port (cadet_port);
+ cadet_port = NULL;
+ }
+ if (NULL != cadet_handle)
{
- GNUNET_CADET_disconnect (listen_channel);
- listen_channel = NULL;
+ GNUNET_CADET_disconnect (cadet_handle);
+ cadet_handle = NULL;
}
GNUNET_assert (NULL == sc_head);
GNUNET_assert (0 == sc_count);