diff options
author | Christian Grothoff <christian@grothoff.org> | 2017-02-17 11:06:15 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2017-02-17 11:06:15 +0100 |
commit | 9727e5e53721dace7abbcc5bcd28c838af4291cc (patch) | |
tree | ca32ed19cf0d4129d3497261531aa40a19599280 /src/fs | |
parent | c793bffc39fe1445616c9d0cb071d62575dea217 (diff) |
convert to new CADET API, not working due to CADET-API internal bugs
Diffstat (limited to 'src/fs')
-rw-r--r-- | src/fs/gnunet-service-fs.c | 2 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_cadet.h | 49 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_cadet_client.c | 290 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_cadet_server.c | 286 |
4 files changed, 290 insertions, 337 deletions
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index e38fdb0323..8c605c6a2c 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -1177,7 +1177,6 @@ handle_client_unindex (void *cls, static void shutdown_task (void *cls) { - GSF_cadet_stop_client (); GSF_cadet_stop_server (); if (NULL != GSF_core) { @@ -1320,7 +1319,6 @@ main_init (const struct GNUNET_CONFIGURATION_Handle *c) NULL); datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); GSF_cadet_start_server (); - GSF_cadet_start_client (); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); return GNUNET_OK; diff --git a/src/fs/gnunet-service-fs_cadet.h b/src/fs/gnunet-service-fs_cadet.h index 060a3993c9..1fbd3a4060 100644 --- a/src/fs/gnunet-service-fs_cadet.h +++ b/src/fs/gnunet-service-fs_cadet.h @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - Copyright (C) 2012 GNUnet e.V. + Copyright (C) 2012, 2017 GNUnet e.V. GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -38,14 +38,15 @@ struct GSF_CadetRequest; * @param cls closure * @param type type of the block, ANY on error * @param expiration expiration time for the block - * @param data_size number of bytes in 'data', 0 on error + * @param data_size number of bytes in @a data, 0 on error * @param data reply block data, NULL on error */ -typedef void (*GSF_CadetReplyProcessor)(void *cls, - enum GNUNET_BLOCK_Type type, - struct GNUNET_TIME_Absolute expiration, - size_t data_size, - const void *data); +typedef void +(*GSF_CadetReplyProcessor)(void *cls, + enum GNUNET_BLOCK_Type type, + struct GNUNET_TIME_Absolute expiration, + size_t data_size, + const void *data); /** @@ -55,14 +56,28 @@ typedef void (*GSF_CadetReplyProcessor)(void *cls, * @param query hash to query for the block * @param type desired type for the block * @param proc function to call with result - * @param proc_cls closure for 'proc' + * @param proc_cls closure for @a proc * @return handle to cancel the operation */ struct GSF_CadetRequest * GSF_cadet_query (const struct GNUNET_PeerIdentity *target, - const struct GNUNET_HashCode *query, - enum GNUNET_BLOCK_Type type, - GSF_CadetReplyProcessor proc, void *proc_cls); + const struct GNUNET_HashCode *query, + enum GNUNET_BLOCK_Type type, + GSF_CadetReplyProcessor proc, + void *proc_cls); + +/** + * Function called on each active cadets to shut them down. + * + * @param cls NULL + * @param key target peer, unused + * @param value the `struct CadetHandle` to destroy + * @return #GNUNET_YES (continue to iterate) + */ +int +GSF_cadet_release_clients (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value); /** @@ -89,17 +104,15 @@ void GSF_cadet_stop_server (void); /** - * Initialize subsystem for non-anonymous file-sharing. + * Cadet channel for creating outbound channels. */ -void -GSF_cadet_start_client (void); - +extern struct GNUNET_CADET_Handle *cadet_handle; /** - * Shutdown subsystem for non-anonymous file-sharing. + * Map from peer identities to 'struct CadetHandles' with cadet + * channels to those peers. */ -void -GSF_cadet_stop_client (void); +extern struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; GNUNET_NETWORK_STRUCT_BEGIN diff --git a/src/fs/gnunet-service-fs_cadet_client.c b/src/fs/gnunet-service-fs_cadet_client.c index 4e268b93c2..193fe2263f 100644 --- a/src/fs/gnunet-service-fs_cadet_client.c +++ b/src/fs/gnunet-service-fs_cadet_client.c @@ -155,13 +155,13 @@ struct CadetHandle /** * Cadet channel for creating outbound channels. */ -static struct GNUNET_CADET_Handle *cadet_handle; +struct GNUNET_CADET_Handle *cadet_handle; /** * Map from peer identities to 'struct CadetHandles' with cadet * channels to those peers. */ -static struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; +struct GNUNET_CONTAINER_MultiPeerMap *cadet_map; /* ********************* client-side code ************************* */ @@ -419,9 +419,9 @@ struct HandleReplyClosure * @return #GNUNET_YES (continue to iterate) */ static int -handle_reply (void *cls, - const struct GNUNET_HashCode *key, - void *value) +process_reply (void *cls, + const struct GNUNET_HashCode *key, + void *value) { struct HandleReplyClosure *hrc = cls; struct GSF_CadetRequest *sr = value; @@ -443,38 +443,43 @@ handle_reply (void *cls, * is received. * * @param cls closure with the `struct CadetHandle` - * @param channel channel handle - * @param channel_ctx channel context - * @param message the actual message + * @param srm the actual message * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing */ static int -reply_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - void **channel_ctx, - const struct GNUNET_MessageHeader *message) +check_reply (void *cls, + const struct CadetReplyMessage *srm) { - struct CadetHandle *mh = *channel_ctx; - const struct CadetReplyMessage *srm; + /* We check later... */ + return GNUNET_OK; +} + + +/** + * Functions with this signature are called whenever a complete reply + * is received. + * + * @param cls closure with the `struct CadetHandle` + * @param srm the actual message + */ +static void +handle_reply (void *cls, + const struct CadetReplyMessage *srm) +{ + struct CadetHandle *mh = cls; struct HandleReplyClosure hrc; uint16_t msize; enum GNUNET_BLOCK_Type type; struct GNUNET_HashCode query; - msize = ntohs (message->size); - if (sizeof (struct CadetReplyMessage) > msize) - { - GNUNET_break_op (0); - reset_cadet_async (mh); - return GNUNET_SYSERR; - } - srm = (const struct CadetReplyMessage *) message; - msize -= sizeof (struct CadetReplyMessage); + msize = ntohs (srm->header.size) - sizeof (struct CadetReplyMessage); type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); if (GNUNET_YES != GNUNET_BLOCK_get_key (GSF_block_ctx, type, - &srm[1], msize, &query)) + &srm[1], + msize, + &query)) { GNUNET_break_op (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, @@ -483,13 +488,13 @@ reply_cb (void *cls, msize, GNUNET_i2s (&mh->target)); reset_cadet_async (mh); - return GNUNET_SYSERR; + return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received reply `%s' via cadet from peer %s\n", GNUNET_h2s (&query), GNUNET_i2s (&mh->target)); - GNUNET_CADET_receive_done (channel); + GNUNET_CADET_receive_done (mh->channel); GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies received via cadet"), 1, GNUNET_NO); @@ -500,16 +505,103 @@ reply_cb (void *cls, hrc.found = GNUNET_NO; GNUNET_CONTAINER_multihashmap_get_multiple (mh->waiting_map, &query, - &handle_reply, + &process_reply, &hrc); if (GNUNET_NO == hrc.found) { GNUNET_STATISTICS_update (GSF_stats, gettext_noop ("# replies received via cadet dropped"), 1, GNUNET_NO); - return GNUNET_OK; } - return GNUNET_OK; +} + + +/** + * Iterator called on each entry in a waiting map to + * call the 'proc' continuation and release associated + * resources. + * + * @param cls the `struct CadetHandle` + * @param key the key of the entry in the map (the query) + * @param value the `struct GSF_CadetRequest` to clean up + * @return #GNUNET_YES (continue to iterate) + */ +static int +free_waiting_entry (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GSF_CadetRequest *sr = value; + + GSF_cadet_query_cancel (sr); + return GNUNET_YES; +} + + +/** + * Function called by cadet when a client disconnects. + * Cleans up our `struct CadetClient` of that channel. + * + * @param cls our `struct CadetClient` + * @param channel channel of the disconnecting client + */ +static void +disconnect_cb (void *cls, + const struct GNUNET_CADET_Channel *channel) +{ + struct CadetHandle *mh = cls; + struct GSF_CadetRequest *sr; + + if (NULL == mh->channel) + return; /* being destroyed elsewhere */ + GNUNET_assert (channel == mh->channel); + mh->channel = NULL; + while (NULL != (sr = mh->pending_head)) + GSF_cadet_query_cancel (sr); + /* first remove `mh` from the `cadet_map`, so that if the + callback from `free_waiting_entry()` happens to re-issue + the request, we don't immediately have it back in the + `waiting_map`. */ + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multipeermap_remove (cadet_map, + &mh->target, + mh)); + GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, + &free_waiting_entry, + mh); + if (NULL != mh->wh) + GNUNET_CADET_notify_transmit_ready_cancel (mh->wh); + if (NULL != mh->timeout_task) + GNUNET_SCHEDULER_cancel (mh->timeout_task); + if (NULL != mh->reset_task) + GNUNET_SCHEDULER_cancel (mh->reset_task); + GNUNET_assert (0 == + GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); + GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); + GNUNET_free (mh); +} + + +/** + * Function called whenever an MQ-channel's transmission window size changes. + * + * The first callback in an outgoing channel will be with a non-zero value + * and will mean the channel is connected to the destination. + * + * For an incoming channel it will be called immediately after the + * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. + * + * @param cls Channel closure. + * @param channel Connection to the other end (henceforth invalid). + * @param window_size New window size. If the is more messages than buffer size + * this value will be negative.. + */ +static void +window_change_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) +{ + /* FIXME: for flow control, implement? */ } @@ -552,14 +644,25 @@ get_cadet (const struct GNUNET_PeerIdentity *target) GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), &port); - mh->channel = GNUNET_CADET_channel_create (cadet_handle, - mh, - &mh->target, - &port, - GNUNET_CADET_OPTION_RELIABLE); - GNUNET_assert (mh == - GNUNET_CONTAINER_multipeermap_get (cadet_map, - target)); + + { + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (reply, + GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, + struct CadetReplyMessage, + mh), + GNUNET_MQ_handler_end () + }; + + mh->channel = GNUNET_CADET_channel_creatE (cadet_handle, + mh, + &mh->target, + &port, + GNUNET_CADET_OPTION_RELIABLE, + &window_change_cb, + &disconnect_cb, + handlers); + } return mh; } @@ -646,93 +749,6 @@ GSF_cadet_query_cancel (struct GSF_CadetRequest *sr) /** - * Iterator called on each entry in a waiting map to - * call the 'proc' continuation and release associated - * resources. - * - * @param cls the `struct CadetHandle` - * @param key the key of the entry in the map (the query) - * @param value the `struct GSF_CadetRequest` to clean up - * @return #GNUNET_YES (continue to iterate) - */ -static int -free_waiting_entry (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct GSF_CadetRequest *sr = value; - - GSF_cadet_query_cancel (sr); - return GNUNET_YES; -} - - -/** - * Function called by cadet when a client disconnects. - * Cleans up our `struct CadetClient` of that channel. - * - * @param cls NULL - * @param channel channel of the disconnecting client - * @param channel_ctx our `struct CadetClient` - */ -static void -cleaner_cb (void *cls, - const struct GNUNET_CADET_Channel *channel, - void *channel_ctx) -{ - struct CadetHandle *mh = channel_ctx; - struct GSF_CadetRequest *sr; - - if (NULL == mh->channel) - return; /* being destroyed elsewhere */ - GNUNET_assert (channel == mh->channel); - mh->channel = NULL; - while (NULL != (sr = mh->pending_head)) - GSF_cadet_query_cancel (sr); - /* first remove `mh` from the `cadet_map`, so that if the - callback from `free_waiting_entry()` happens to re-issue - the request, we don't immediately have it back in the - `waiting_map`. */ - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multipeermap_remove (cadet_map, - &mh->target, - mh)); - GNUNET_CONTAINER_multihashmap_iterate (mh->waiting_map, - &free_waiting_entry, - mh); - if (NULL != mh->wh) - GNUNET_CADET_notify_transmit_ready_cancel (mh->wh); - if (NULL != mh->timeout_task) - GNUNET_SCHEDULER_cancel (mh->timeout_task); - if (NULL != mh->reset_task) - GNUNET_SCHEDULER_cancel (mh->reset_task); - GNUNET_assert (0 == - GNUNET_CONTAINER_multihashmap_size (mh->waiting_map)); - GNUNET_CONTAINER_multihashmap_destroy (mh->waiting_map); - GNUNET_free (mh); -} - - -/** - * Initialize subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_start_client () -{ - static const struct GNUNET_CADET_MessageHandler handlers[] = { - { &reply_cb, GNUNET_MESSAGE_TYPE_FS_CADET_REPLY, 0 }, - { NULL, 0, 0 } - }; - - cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES); - cadet_handle = GNUNET_CADET_connect (GSF_cfg, - NULL, - &cleaner_cb, - handlers); -} - - -/** * Function called on each active cadets to shut them down. * * @param cls NULL @@ -740,10 +756,10 @@ GSF_cadet_start_client () * @param value the `struct CadetHandle` to destroy * @return #GNUNET_YES (continue to iterate) */ -static int -release_cadets (void *cls, - const struct GNUNET_PeerIdentity *key, - void *value) +int +GSF_cadet_release_clients (void *cls, + const struct GNUNET_PeerIdentity *key, + void *value) { struct CadetHandle *mh = value; @@ -756,23 +772,5 @@ release_cadets (void *cls, } -/** - * Shutdown subsystem for non-anonymous file-sharing. - */ -void -GSF_cadet_stop_client () -{ - GNUNET_CONTAINER_multipeermap_iterate (cadet_map, - &release_cadets, - NULL); - GNUNET_CONTAINER_multipeermap_destroy (cadet_map); - cadet_map = NULL; - if (NULL != cadet_handle) - { - GNUNET_CADET_disconnect (cadet_handle); - cadet_handle = NULL; - } -} - /* end of gnunet-service-fs_cadet_client.c */ diff --git a/src/fs/gnunet-service-fs_cadet_server.c b/src/fs/gnunet-service-fs_cadet_server.c index ac86537c3e..0a72a82793 100644 --- a/src/fs/gnunet-service-fs_cadet_server.c +++ b/src/fs/gnunet-service-fs_cadet_server.c @@ -124,9 +124,9 @@ struct CadetClient /** - * Listen channel for incoming requests. + * Listen port for incoming requests. */ -static struct GNUNET_CADET_Handle *listen_channel; +static struct GNUNET_CADET_Port *cadet_port; /** * Head of DLL of cadet clients. @@ -188,121 +188,29 @@ refresh_timeout_task (struct CadetClient *sc) /** - * We're done handling a request from a client, read the next one. + * Check if we are done with the write queue, and if so tell CADET + * that we are ready to read more. * - * @param sc client to continue reading requests from + * @param cls where to process the write queue */ static void -continue_reading (struct CadetClient *sc) -{ - refresh_timeout_task (sc); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Finished processing cadet request from client %p, ready to receive the next one\n", - sc); - GNUNET_CADET_receive_done (sc->channel); -} - - -/** - * Transmit the next entry from the write queue. - * - * @param sc where to process the write queue - */ -static void -continue_writing (struct CadetClient *sc); - - -/** - * Send a reply now, cadet is ready. - * - * @param cls closure with the `struct CadetClient` which sent the query - * @param size number of bytes available in @a buf - * @param buf where to write the message - * @return number of bytes written to @a buf - */ -static size_t -write_continuation (void *cls, - size_t size, - void *buf) +continue_writing (void *cls) { struct CadetClient *sc = cls; - struct GNUNET_CADET_Channel *tun; - struct WriteQueueItem *wqi; - size_t ret; - - sc->wh = NULL; - if (NULL == (wqi = sc->wqi_head)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write queue empty, reading more requests\n"); - return 0; - } - if ( (0 == size) || - (size < wqi->msize) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmission of reply failed, terminating cadet\n"); - tun = sc->channel; - sc->channel = NULL; - GNUNET_CADET_channel_destroy (tun); - return 0; - } - GNUNET_CONTAINER_DLL_remove (sc->wqi_head, - sc->wqi_tail, - wqi); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Transmitted %u byte reply via cadet to %p\n", - (unsigned int) size, - sc); - GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# Blocks transferred via cadet"), 1, - GNUNET_NO); - ret = wqi->msize; - GNUNET_memcpy (buf, &wqi[1], ret); - GNUNET_free (wqi); - continue_writing (sc); - return ret; -} - - -/** - * Transmit the next entry from the write queue. - * - * @param sc where to process the write queue - */ -static void -continue_writing (struct CadetClient *sc) -{ - struct WriteQueueItem *wqi; - struct GNUNET_CADET_Channel *tun; + struct GNUNET_MQ_Handle *mq; - if (NULL != sc->wh) + mq = GNUNET_CADET_get_mq (sc->channel); + if (0 != GNUNET_MQ_get_length (mq)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Write pending, waiting for it to complete\n"); - return; /* write already pending */ - } - if (NULL == (wqi = sc->wqi_head)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write queue empty, reading more requests\n"); - continue_reading (sc); - return; - } - sc->wh = GNUNET_CADET_notify_transmit_ready (sc->channel, GNUNET_NO, - GNUNET_TIME_UNIT_FOREVER_REL, - wqi->msize, - &write_continuation, - sc); - if (NULL == sc->wh) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Write failed; terminating cadet\n"); - tun = sc->channel; - sc->channel = NULL; - GNUNET_CADET_channel_destroy (tun); return; } + refresh_timeout_task (sc); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Finished processing cadet request from client %p, ready to receive the next one\n", + sc); + GNUNET_CADET_receive_done (sc->channel); } @@ -333,7 +241,7 @@ handle_datastore_reply (void *cls, { struct CadetClient *sc = cls; size_t msize = size + sizeof (struct CadetReplyMessage); - struct WriteQueueItem *wqi; + struct GNUNET_MQ_Envelope *env; struct CadetReplyMessage *srm; sc->qe = NULL; @@ -357,7 +265,8 @@ handle_datastore_reply (void *cls, GNUNET_h2s (key)); } GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# queries received via CADET not answered"), 1, + gettext_noop ("# queries received via CADET not answered"), + 1, GNUNET_NO); continue_writing (sc); return; @@ -369,9 +278,13 @@ handle_datastore_reply (void *cls, GNUNET_h2s (key)); if (GNUNET_OK != GNUNET_FS_handle_on_demand_block (key, - size, data, type, - priority, anonymity, - expiration, uid, + size, + data, + type, + priority, + anonymity, + expiration, + uid, &handle_datastore_reply, sc)) { @@ -394,19 +307,23 @@ handle_datastore_reply (void *cls, (unsigned int) type, GNUNET_h2s (key), sc); - wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); - wqi->msize = msize; - srm = (struct CadetReplyMessage *) &wqi[1]; - srm->header.size = htons ((uint16_t) msize); - srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_CADET_REPLY); + env = GNUNET_MQ_msg_extra (srm, + size, + GNUNET_MESSAGE_TYPE_FS_CADET_REPLY); srm->type = htonl (type); srm->expiration = GNUNET_TIME_absolute_hton (expiration); - GNUNET_memcpy (&srm[1], data, size); - sc->reply_size = msize; - GNUNET_CONTAINER_DLL_insert (sc->wqi_head, - sc->wqi_tail, - wqi); - continue_writing (sc); + GNUNET_memcpy (&srm[1], + data, + size); + GNUNET_MQ_notify_sent (env, + &continue_writing, + sc); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Blocks transferred via cadet"), + 1, + GNUNET_NO); + GNUNET_MQ_send (GNUNET_CADET_get_mq (sc->channel), + env); } @@ -414,30 +331,22 @@ handle_datastore_reply (void *cls, * Functions with this signature are called whenever a * complete query message is received. * - * Do not call #GNUNET_SERVER_mst_destroy() in callback - * * @param cls closure with the `struct CadetClient` - * @param channel channel handle - * @param channel_ctx channel context - * @param message the actual message - * @return #GNUNET_OK on success, #GNUNET_SYSERR to stop further processing + * @param sqm the actual message */ -static int -request_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - void **channel_ctx, - const struct GNUNET_MessageHeader *message) +static void +handle_request (void *cls, + const struct CadetQueryMessage *sqm) { - struct CadetClient *sc = *channel_ctx; - const struct CadetQueryMessage *sqm; + struct CadetClient *sc = cls; - sqm = (const struct CadetQueryMessage *) message; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received query for `%s' via cadet from client %p\n", GNUNET_h2s (&sqm->query), sc); GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# queries received via cadet"), 1, + gettext_noop ("# queries received via cadet"), + 1, GNUNET_NO); refresh_timeout_task (sc); sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, @@ -446,14 +355,14 @@ request_cb (void *cls, ntohl (sqm->type), 0 /* priority */, GSF_datastore_queue_size, - &handle_datastore_reply, sc); + &handle_datastore_reply, + sc); if (NULL == sc->qe) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queueing request with datastore failed (queue full?)\n"); continue_writing (sc); } - return GNUNET_OK; } @@ -464,16 +373,12 @@ request_cb (void *cls, * @param channel the channel representing the cadet * @param initiator the identity of the peer who wants to establish a cadet * with us; NULL on binding error - * @param port cadet port used for the incoming connection - * @param options channel option flags - * @return initial channel context (our 'struct CadetClient') + * @return initial channel context (our `struct CadetClient`) */ static void * -accept_cb (void *cls, - struct GNUNET_CADET_Channel *channel, - const struct GNUNET_PeerIdentity *initiator, - const struct GNUNET_HashCode *port, - enum GNUNET_CADET_ChannelOption options) +connect_cb (void *cls, + struct GNUNET_CADET_Channel *channel, + const struct GNUNET_PeerIdentity *initiator) { struct CadetClient *sc; @@ -481,13 +386,15 @@ accept_cb (void *cls, if (sc_count >= sc_count_max) { GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# cadet client connections rejected"), 1, + gettext_noop ("# cadet client connections rejected"), + 1, GNUNET_NO); GNUNET_CADET_channel_destroy (channel); return NULL; } GNUNET_STATISTICS_update (GSF_stats, - gettext_noop ("# cadet connections active"), 1, + gettext_noop ("# cadet connections active"), + 1, GNUNET_NO); sc = GNUNET_new (struct CadetClient); sc->channel = channel; @@ -506,18 +413,17 @@ accept_cb (void *cls, /** * Function called by cadet when a client disconnects. - * Cleans up our 'struct CadetClient' of that channel. + * Cleans up our `struct CadetClient` of that channel. * - * @param cls NULL + * @param cls our `struct CadetClient` * @param channel channel of the disconnecting client - * @param channel_ctx our 'struct CadetClient' + * @param channel_ctx */ static void -cleaner_cb (void *cls, - const struct GNUNET_CADET_Channel *channel, - void *channel_ctx) +disconnect_cb (void *cls, + const struct GNUNET_CADET_Channel *channel) { - struct CadetClient *sc = channel_ctx; + struct CadetClient *sc = cls; struct WriteQueueItem *wqi; if (NULL == sc) @@ -552,15 +458,42 @@ cleaner_cb (void *cls, } + +/** + * Function called whenever an MQ-channel's transmission window size changes. + * + * The first callback in an outgoing channel will be with a non-zero value + * and will mean the channel is connected to the destination. + * + * For an incoming channel it will be called immediately after the + * #GNUNET_CADET_ConnectEventHandler, also with a non-zero value. + * + * @param cls Channel closure. + * @param channel Connection to the other end (henceforth invalid). + * @param window_size New window size. If the is more messages than buffer size + * this value will be negative.. + */ +static void +window_change_cb (void *cls, + const struct GNUNET_CADET_Channel *channel, + int window_size) +{ + /* FIXME: could do flow control here... */ +} + + /** * Initialize subsystem for non-anonymous file-sharing. */ void GSF_cadet_start_server () { - static const struct GNUNET_CADET_MessageHandler handlers[] = { - { &request_cb, GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, sizeof (struct CadetQueryMessage)}, - { NULL, 0, 0 } + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_fixed_size (request, + GNUNET_MESSAGE_TYPE_FS_CADET_QUERY, + struct CadetQueryMessage, + NULL), + GNUNET_MQ_handler_end () }; struct GNUNET_HashCode port; @@ -573,18 +506,19 @@ GSF_cadet_start_server () GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Initializing cadet FS server with a limit of %llu connections\n", sc_count_max); - listen_channel = GNUNET_CADET_connect (GSF_cfg, - NULL, - &cleaner_cb, - handlers); - GNUNET_assert (NULL != listen_channel); + cadet_map = GNUNET_CONTAINER_multipeermap_create (16, GNUNET_YES); + cadet_handle = GNUNET_CADET_connecT (GSF_cfg); + GNUNET_assert (NULL != cadet_handle); GNUNET_CRYPTO_hash (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER, strlen (GNUNET_APPLICATION_PORT_FS_BLOCK_TRANSFER), &port); - GNUNET_CADET_open_port (listen_channel, - &port, - &accept_cb, - NULL); + cadet_port = GNUNET_CADET_open_porT (cadet_handle, + &port, + &connect_cb, + NULL, + &window_change_cb, + &disconnect_cb, + handlers); } @@ -594,10 +528,20 @@ GSF_cadet_start_server () void GSF_cadet_stop_server () { - if (NULL != listen_channel) + GNUNET_CONTAINER_multipeermap_iterate (cadet_map, + &GSF_cadet_release_clients, + NULL); + GNUNET_CONTAINER_multipeermap_destroy (cadet_map); + cadet_map = NULL; + if (NULL != cadet_port) + { + GNUNET_CADET_close_port (cadet_port); + cadet_port = NULL; + } + if (NULL != cadet_handle) { - GNUNET_CADET_disconnect (listen_channel); - listen_channel = NULL; + GNUNET_CADET_disconnect (cadet_handle); + cadet_handle = NULL; } GNUNET_assert (NULL == sc_head); GNUNET_assert (0 == sc_count); |