diff options
Diffstat (limited to 'src/fs/gnunet-service-fs_stream.c')
-rw-r--r-- | src/fs/gnunet-service-fs_stream.c | 1466 |
1 files changed, 1466 insertions, 0 deletions
diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c new file mode 100644 index 0000000..3838b1c --- /dev/null +++ b/src/fs/gnunet-service-fs_stream.c @@ -0,0 +1,1466 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_stream.c + * @brief non-anonymous file-transfer + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_constants.h" +#include "gnunet_util_lib.h" +#include "gnunet_stream_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_applications.h" +#include "gnunet-service-fs.h" +#include "gnunet-service-fs_indexing.h" +#include "gnunet-service-fs_stream.h" + +/** + * After how long do we termiante idle connections? + */ +#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2) + +/** + * After how long do we reset connections without replies? + */ +#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) + + +/** + * A message in the queue to be written to the stream. + */ +struct WriteQueueItem +{ + /** + * Kept in a DLL. + */ + struct WriteQueueItem *next; + + /** + * Kept in a DLL. + */ + struct WriteQueueItem *prev; + + /** + * Number of bytes of payload, allocated at the end of this struct. + */ + size_t msize; +}; + + +/** + * Information we keep around for each active streaming client. + */ +struct StreamClient +{ + /** + * DLL + */ + struct StreamClient *next; + + /** + * DLL + */ + struct StreamClient *prev; + + /** + * Socket for communication. + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Handle for active read operation, or NULL. + */ + struct GNUNET_STREAM_ReadHandle *rh; + + /** + * Handle for active write operation, or NULL. + */ + struct GNUNET_STREAM_WriteHandle *wh; + + /** + * Head of write queue. + */ + struct WriteQueueItem *wqi_head; + + /** + * Tail of write queue. + */ + struct WriteQueueItem *wqi_tail; + + /** + * Tokenizer for requests. + */ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + + /** + * Current active request to the datastore, if we have one pending. + */ + struct GNUNET_DATASTORE_QueueEntry *qe; + + /** + * Task that is scheduled to asynchronously terminate the connection. + */ + GNUNET_SCHEDULER_TaskIdentifier terminate_task; + + /** + * Task that is scheduled to terminate idle connections. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Size of the last write that was initiated. + */ + size_t reply_size; + +}; + + +/** + * Query from one peer, asking the other for CHK-data. + */ +struct StreamQueryMessage +{ + + /** + * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY. + */ + struct GNUNET_MessageHeader header; + + /** + * Block type must be DBLOCK or IBLOCK. + */ + uint32_t type; + + /** + * Query hash from CHK (hash of encrypted block). + */ + struct GNUNET_HashCode query; + +}; + + +/** + * Reply to a StreamQueryMessage. + */ +struct StreamReplyMessage +{ + + /** + * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY. + */ + struct GNUNET_MessageHeader header; + + /** + * Block type must be DBLOCK or IBLOCK. + */ + uint32_t type; + + /** + * Expiration time for the block. + */ + struct GNUNET_TIME_AbsoluteNBO expiration; + + /* followed by the encrypted block */ + +}; + + +/** + * Handle for a stream to another peer. + */ +struct StreamHandle; + + +/** + * Handle for a request that is going out via stream API. + */ +struct GSF_StreamRequest +{ + + /** + * DLL. + */ + struct GSF_StreamRequest *next; + + /** + * DLL. + */ + struct GSF_StreamRequest *prev; + + /** + * Which stream is this request associated with? + */ + struct StreamHandle *sh; + + /** + * Function to call with the result. + */ + GSF_StreamReplyProcessor proc; + + /** + * Closure for 'proc' + */ + void *proc_cls; + + /** + * Query to transmit to the other peer. + */ + struct GNUNET_HashCode query; + + /** + * Desired type for the reply. + */ + enum GNUNET_BLOCK_Type type; + + /** + * Did we transmit this request already? YES if we are + * in the 'waiting' DLL, NO if we are in the 'pending' DLL. + */ + int was_transmitted; +}; + + +/** + * Handle for a stream to another peer. + */ +struct StreamHandle +{ + /** + * Head of DLL of pending requests on this stream. + */ + struct GSF_StreamRequest *pending_head; + + /** + * Tail of DLL of pending requests on this stream. + */ + struct GSF_StreamRequest *pending_tail; + + /** + * Map from query to 'struct GSF_StreamRequest's waiting for + * a reply. + */ + struct GNUNET_CONTAINER_MultiHashMap *waiting_map; + + /** + * Connection to the other peer. + */ + struct GNUNET_STREAM_Socket *stream; + + /** + * Handle for active read operation, or NULL. + */ + struct GNUNET_STREAM_ReadHandle *rh; + + /** + * Handle for active write operation, or NULL. + */ + struct GNUNET_STREAM_WriteHandle *wh; + + /** + * Tokenizer for replies. + */ + struct GNUNET_SERVER_MessageStreamTokenizer *mst; + + /** + * Which peer does this stream go to? + */ + struct GNUNET_PeerIdentity target; + + /** + * Task to kill inactive streams (we keep them around for + * a few seconds to give the application a chance to give + * us another query). + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Task to reset streams that had errors (asynchronously, + * as we may not be able to do it immediately during a + * callback from the stream API). + */ + GNUNET_SCHEDULER_TaskIdentifier reset_task; + + /** + * Is this stream ready for transmission? + */ + int is_ready; + +}; + + +/** + * Listen socket for incoming requests. + */ +static struct GNUNET_STREAM_ListenSocket *listen_socket; + +/** + * Head of DLL of stream clients. + */ +static struct StreamClient *sc_head; + +/** + * Tail of DLL of stream clients. + */ +static struct StreamClient *sc_tail; + +/** + * Number of active stream clients in the 'sc_*'-DLL. + */ +static unsigned int sc_count; + +/** + * Maximum allowed number of stream clients. + */ +static unsigned long long sc_count_max; + +/** + * Map from peer identities to 'struct StreamHandles' with streams to + * those peers. + */ +static struct GNUNET_CONTAINER_MultiHashMap *stream_map; + + +/* ********************* client-side code ************************* */ + +/** + * Iterator called on each entry in a waiting map to + * call the 'proc' continuation and release associated + * resources. + * + * @param cls the 'struct StreamHandle' + * @param key the key of the entry in the map (the query) + * @param value the 'struct GSF_StreamRequest' to clean up + * @return GNUNET_YES (continue to iterate) + */ +static int +free_waiting_entry (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GSF_StreamRequest *sr = value; + + sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, + GNUNET_TIME_UNIT_FOREVER_ABS, + 0, NULL); + GSF_stream_query_cancel (sr); + return GNUNET_YES; +} + + +/** + * Destroy a stream handle. + * + * @param sh stream to process + */ +static void +destroy_stream_handle (struct StreamHandle *sh) +{ + struct GSF_StreamRequest *sr; + + while (NULL != (sr = sh->pending_head)) + { + sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY, + GNUNET_TIME_UNIT_FOREVER_ABS, + 0, NULL); + GSF_stream_query_cancel (sr); + } + GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map, + &free_waiting_entry, + sh); + if (NULL != sh->wh) + GNUNET_STREAM_write_cancel (sh->wh); + if (NULL != sh->rh) + GNUNET_STREAM_read_cancel (sh->rh); + if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task) + GNUNET_SCHEDULER_cancel (sh->timeout_task); + if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task) + GNUNET_SCHEDULER_cancel (sh->reset_task); + GNUNET_STREAM_close (sh->stream); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (stream_map, + &sh->target.hashPubKey, + sh)); + GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map); + GNUNET_free (sh); +} + + +/** + * Transmit pending requests via the stream. + * + * @param sh stream to process + */ +static void +transmit_pending (struct StreamHandle *sh); + + +/** + * Function called once the stream is ready for transmission. + * + * @param cls the 'struct StreamHandle' + * @param socket stream socket handle + */ +static void +stream_ready_cb (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + struct StreamHandle *sh = cls; + + sh->is_ready = GNUNET_YES; + transmit_pending (sh); +} + + +/** + * Iterator called on each entry in a waiting map to + * move it back to the pending list. + * + * @param cls the 'struct StreamHandle' + * @param key the key of the entry in the map (the query) + * @param value the 'struct GSF_StreamRequest' to move to pending + * @return GNUNET_YES (continue to iterate) + */ +static int +move_to_pending (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct StreamHandle *sh = cls; + struct GSF_StreamRequest *sr = value; + + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map, + key, + value)); + GNUNET_CONTAINER_DLL_insert (sh->pending_head, + sh->pending_tail, + sr); + sr->was_transmitted = GNUNET_NO; + return GNUNET_YES; +} + + +/** + * We had a serious error, tear down and re-create stream from scratch. + * + * @param sh stream to reset + */ +static void +reset_stream (struct StreamHandle *sh) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Resetting stream to %s\n", + GNUNET_i2s (&sh->target)); + if (NULL != sh->rh) + { + GNUNET_STREAM_read_cancel (sh->rh); + sh->rh = NULL; + } + GNUNET_STREAM_close (sh->stream); + sh->is_ready = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map, + &move_to_pending, + sh); + sh->stream = GNUNET_STREAM_open (GSF_cfg, + &sh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + &stream_ready_cb, sh, + GNUNET_STREAM_OPTION_END); +} + + +/** + * Task called when it is time to destroy an inactive stream. + * + * @param cls the 'struct StreamHandle' to tear down + * @param tc scheduler context, unused + */ +static void +stream_timeout (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct StreamHandle *sh = cls; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Timeout on stream to %s\n", + GNUNET_i2s (&sh->target)); + sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + destroy_stream_handle (sh); +} + + +/** + * Task called when it is time to reset an stream. + * + * @param cls the 'struct StreamHandle' to tear down + * @param tc scheduler context, unused + */ +static void +reset_stream_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct StreamHandle *sh = cls; + + sh->reset_task = GNUNET_SCHEDULER_NO_TASK; + reset_stream (sh); +} + + +/** + * We had a serious error, tear down and re-create stream from scratch, + * but do so asynchronously. + * + * @param sh stream to reset + */ +static void +reset_stream_async (struct StreamHandle *sh) +{ + if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task) + GNUNET_SCHEDULER_cancel (sh->reset_task); + sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task, + sh); +} + + +/** + * We got a reply from the stream. Process it. + * + * @param cls the struct StreamHandle + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read; will be 0 on timeout + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +handle_stream_reply (void *cls, + enum GNUNET_STREAM_Status status, + const void *data, + size_t size) +{ + struct StreamHandle *sh = cls; + + sh->rh = NULL; + GNUNET_SCHEDULER_cancel (sh->reset_task); + sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, + &reset_stream_task, + sh); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received %u bytes from stream to %s\n", + (unsigned int) size, + GNUNET_i2s (&sh->target)); + if (GNUNET_SYSERR == + GNUNET_SERVER_mst_receive (sh->mst, + NULL, + data, size, + GNUNET_NO, GNUNET_NO)) + { + GNUNET_break_op (0); + reset_stream_async (sh); + return size; + } + if (NULL == sh->rh) + sh->rh = GNUNET_STREAM_read (sh->stream, + GNUNET_TIME_UNIT_FOREVER_REL, + &handle_stream_reply, + sh); + return size; +} + + +/** + * Functions of this signature are called whenever we transmitted a + * query via a stream. + * + * @param cls the struct StreamHandle for which we did the write call + * @param status the status of the stream at the time this function is called; + * GNUNET_OK if writing to stream was completed successfully, + * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the + * mean time. + * @param size the number of bytes written + */ +static void +query_write_continuation (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) +{ + struct StreamHandle *sh = cls; + + sh->wh = NULL; + if ( (GNUNET_STREAM_OK != status) || + (sizeof (struct StreamQueryMessage) != size) ) + { + reset_stream (sh); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Successfully transmitted %u bytes via stream to %s\n", + (unsigned int) size, + GNUNET_i2s (&sh->target)); + if (NULL == sh->rh) + sh->rh = GNUNET_STREAM_read (sh->stream, + GNUNET_TIME_UNIT_FOREVER_REL, + &handle_stream_reply, + sh); + transmit_pending (sh); +} + + +/** + * Transmit pending requests via the stream. + * + * @param sh stream to process + */ +static void +transmit_pending (struct StreamHandle *sh) +{ + struct StreamQueryMessage sqm; + struct GSF_StreamRequest *sr; + + if (NULL != sh->wh) + return; + sr = sh->pending_head; + if (NULL == sr) + return; + GNUNET_CONTAINER_DLL_remove (sh->pending_head, + sh->pending_tail, + sr); + GNUNET_CONTAINER_multihashmap_put (sh->waiting_map, + &sr->query, + sr, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending query via stream to %s\n", + GNUNET_i2s (&sh->target)); + sr->was_transmitted = GNUNET_YES; + sqm.header.size = htons (sizeof (sqm)); + sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY); + sqm.type = htonl (sr->type); + sqm.query = sr->query; + sh->wh = GNUNET_STREAM_write (sh->stream, + &sqm, sizeof (sqm), + GNUNET_TIME_UNIT_FOREVER_REL, + &query_write_continuation, + sh); +} + + +/** + * Closure for 'handle_reply'. + */ +struct HandleReplyClosure +{ + + /** + * Reply payload. + */ + const void *data; + + /** + * Expiration time for the block. + */ + struct GNUNET_TIME_Absolute expiration; + + /** + * Number of bytes in 'data'. + */ + size_t data_size; + + /** + * Type of the block. + */ + enum GNUNET_BLOCK_Type type; + + /** + * Did we have a matching query? + */ + int found; +}; + + +/** + * Iterator called on each entry in a waiting map to + * process a result. + * + * @param cls the 'struct HandleReplyClosure' + * @param key the key of the entry in the map (the query) + * @param value the 'struct GSF_StreamRequest' to handle result for + * @return GNUNET_YES (continue to iterate) + */ +static int +handle_reply (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct HandleReplyClosure *hrc = cls; + struct GSF_StreamRequest *sr = value; + + sr->proc (sr->proc_cls, + hrc->type, + hrc->expiration, + hrc->data_size, + hrc->data); + GSF_stream_query_cancel (sr); + hrc->found = GNUNET_YES; + return GNUNET_YES; +} + + +/** + * Functions with this signature are called whenever a + * complete reply is received. + * + * Do not call GNUNET_SERVER_mst_destroy in callback + * + * @param cls closure with the 'struct StreamHandle' + * @param client identification of the client, NULL + * @param message the actual message + * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing + */ +static int +reply_cb (void *cls, + void *client, + const struct GNUNET_MessageHeader *message) +{ + struct StreamHandle *sh = cls; + const struct StreamReplyMessage *srm; + struct HandleReplyClosure hrc; + uint16_t msize; + enum GNUNET_BLOCK_Type type; + struct GNUNET_HashCode query; + + msize = ntohs (message->size); + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY: + if (sizeof (struct StreamReplyMessage) > msize) + { + GNUNET_break_op (0); + reset_stream_async (sh); + return GNUNET_SYSERR; + } + srm = (const struct StreamReplyMessage *) message; + msize -= sizeof (struct StreamReplyMessage); + type = (enum GNUNET_BLOCK_Type) ntohl (srm->type); + if (GNUNET_YES != + GNUNET_BLOCK_get_key (GSF_block_ctx, + type, + &srm[1], msize, &query)) + { + GNUNET_break_op (0); + reset_stream_async (sh); + return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received reply `%s' via stream\n", + GNUNET_h2s (&query)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via stream"), 1, + GNUNET_NO); + hrc.data = &srm[1]; + hrc.data_size = msize; + hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration); + hrc.type = type; + hrc.found = GNUNET_NO; + GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map, + &query, + &handle_reply, + &hrc); + if (GNUNET_NO == hrc.found) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# replies received via stream dropped"), 1, + GNUNET_NO); + return GNUNET_OK; + } + return GNUNET_OK; + default: + GNUNET_break_op (0); + reset_stream_async (sh); + return GNUNET_SYSERR; + } +} + + +/** + * Get (or create) a stream to talk to the given peer. + * + * @param target peer we want to communicate with + */ +static struct StreamHandle * +get_stream (const struct GNUNET_PeerIdentity *target) +{ + struct StreamHandle *sh; + + sh = GNUNET_CONTAINER_multihashmap_get (stream_map, + &target->hashPubKey); + if (NULL != sh) + { + if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task) + { + GNUNET_SCHEDULER_cancel (sh->timeout_task); + sh->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + return sh; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating stream to %s\n", + GNUNET_i2s (target)); + sh = GNUNET_malloc (sizeof (struct StreamHandle)); + sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT, + &reset_stream_task, + sh); + sh->mst = GNUNET_SERVER_mst_create (&reply_cb, + sh); + sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES); + sh->target = *target; + sh->stream = GNUNET_STREAM_open (GSF_cfg, + &sh->target, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + &stream_ready_cb, sh, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (stream_map, + &sh->target.hashPubKey, + sh, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + return sh; +} + + +/** + * Look for a block by directly contacting a particular peer. + * + * @param target peer that should have the block + * @param query hash to query for the block + * @param type desired type for the block + * @param proc function to call with result + * @param proc_cls closure for 'proc' + * @return handle to cancel the operation + */ +struct GSF_StreamRequest * +GSF_stream_query (const struct GNUNET_PeerIdentity *target, + const struct GNUNET_HashCode *query, + enum GNUNET_BLOCK_Type type, + GSF_StreamReplyProcessor proc, void *proc_cls) +{ + struct StreamHandle *sh; + struct GSF_StreamRequest *sr; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Preparing to send query for %s via stream to %s\n", + GNUNET_h2s (query), + GNUNET_i2s (target)); + sh = get_stream (target); + sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest)); + sr->sh = sh; + sr->proc = proc; + sr->proc_cls = proc_cls; + sr->type = type; + sr->query = *query; + GNUNET_CONTAINER_DLL_insert (sh->pending_head, + sh->pending_tail, + sr); + if (GNUNET_YES == sh->is_ready) + transmit_pending (sh); + return sr; +} + + +/** + * Cancel an active request; must not be called after 'proc' + * was calld. + * + * @param sr request to cancel + */ +void +GSF_stream_query_cancel (struct GSF_StreamRequest *sr) +{ + struct StreamHandle *sh = sr->sh; + + if (GNUNET_YES == sr->was_transmitted) + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map, + &sr->query, + sr)); + else + GNUNET_CONTAINER_DLL_remove (sh->pending_head, + sh->pending_tail, + sr); + GNUNET_free (sr); + if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) && + (NULL == sh->pending_head) ) + sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, + &stream_timeout, + sh); +} + + +/* ********************* server-side code ************************* */ + + +/** + * We're done with a particular client, clean up. + * + * @param sc client to clean up + */ +static void +terminate_stream (struct StreamClient *sc) +{ + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# stream connections active"), -1, + GNUNET_NO); + if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task) + GNUNET_SCHEDULER_cancel (sc->terminate_task); + if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) + GNUNET_SCHEDULER_cancel (sc->timeout_task); + if (NULL != sc->rh) + GNUNET_STREAM_read_cancel (sc->rh); + if (NULL != sc->wh) + GNUNET_STREAM_write_cancel (sc->wh); + if (NULL != sc->qe) + GNUNET_DATASTORE_cancel (sc->qe); + GNUNET_SERVER_mst_destroy (sc->mst); + GNUNET_STREAM_close (sc->socket); + struct WriteQueueItem *wqi; + while (NULL != (wqi = sc->wqi_head)) + { + GNUNET_CONTAINER_DLL_remove (sc->wqi_head, + sc->wqi_tail, + wqi); + GNUNET_free (wqi); + } + + + GNUNET_CONTAINER_DLL_remove (sc_head, + sc_tail, + sc); + sc_count--; + GNUNET_free (sc); +} + + +/** + * Task run to asynchronously terminate the stream. + * + * @param cls the 'struct StreamClient' + * @param tc scheduler context + */ +static void +terminate_stream_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct StreamClient *sc = cls; + + sc->terminate_task = GNUNET_SCHEDULER_NO_TASK; + terminate_stream (sc); +} + + +/** + * Task run to asynchronously terminate the stream due to timeout. + * + * @param cls the 'struct StreamClient' + * @param tc scheduler context + */ +static void +timeout_stream_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct StreamClient *sc = cls; + + sc->timeout_task = GNUNET_SCHEDULER_NO_TASK; + terminate_stream (sc); +} + + +/** + * Reset the timeout for the stream client (due to activity). + * + * @param sc client handle to reset timeout for + */ +static void +refresh_timeout_task (struct StreamClient *sc) +{ + if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task) + GNUNET_SCHEDULER_cancel (sc->timeout_task); + sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT, + &timeout_stream_task, + sc); +} + + +/** + * We had a serious error, termiante stream, + * but do so asynchronously. + * + * @param sc stream to reset + */ +static void +terminate_stream_async (struct StreamClient *sc) +{ + if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task) + sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task, + sc); +} + + +/** + * Functions of this signature are called whenever data is available from the + * stream. + * + * @param cls the closure from GNUNET_STREAM_read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read; will be 0 on timeout + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +process_request (void *cls, + enum GNUNET_STREAM_Status status, + const void *data, + size_t size); + + +/** + * We're done handling a request from a client, read the next one. + * + * @param sc client to continue reading requests from + */ +static void +continue_reading (struct StreamClient *sc) +{ + int ret; + + ret = + GNUNET_SERVER_mst_receive (sc->mst, + NULL, + NULL, 0, + GNUNET_NO, GNUNET_NO); + if (GNUNET_NO == ret) + return; + refresh_timeout_task (sc); + if (NULL != sc->rh) + return; + sc->rh = GNUNET_STREAM_read (sc->socket, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_request, + sc); +} + + +/** + * Transmit the next entry from the write queue. + * + * @param sc where to process the write queue + */ +static void +continue_writing (struct StreamClient *sc); + + +/** + * Functions of this signature are called whenever data is available from the + * stream. + * + * @param cls the closure from GNUNET_STREAM_read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read; will be 0 on timeout + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +process_request (void *cls, + enum GNUNET_STREAM_Status status, + const void *data, + size_t size) +{ + struct StreamClient *sc = cls; + int ret; + + sc->rh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received %u byte query via stream\n", + (unsigned int) size); + switch (status) + { + case GNUNET_STREAM_OK: + ret = + GNUNET_SERVER_mst_receive (sc->mst, + NULL, + data, size, + GNUNET_NO, GNUNET_NO); + if (GNUNET_NO == ret) + return size; /* more messages in MST */ + if (GNUNET_SYSERR == ret) + { + GNUNET_break_op (0); + terminate_stream_async (sc); + return size; + } + break; + case GNUNET_STREAM_TIMEOUT: + case GNUNET_STREAM_SHUTDOWN: + case GNUNET_STREAM_SYSERR: + terminate_stream_async (sc); + return size; + default: + GNUNET_break (0); + return size; + } + continue_writing (sc); + return size; +} + + +/** + * Sending a reply was completed, continue processing. + * + * @param cls closure with the struct StreamClient which sent the query + * @param status result code for the operation + * @param size number of bytes that were transmitted + */ +static void +write_continuation (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) +{ + struct StreamClient *sc = cls; + + sc->wh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write continuation called on 'server' side with status %d\n", + status); + if ( (GNUNET_STREAM_OK != status) || + (size != sc->reply_size) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmission of reply failed, terminating stream\n"); + terminate_stream (sc); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Transmitted %u byte reply via stream\n", + (unsigned int) size); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# Blocks transferred via stream"), 1, + GNUNET_NO); + continue_writing (sc); +} + + +/** + * Transmit the next entry from the write queue. + * + * @param sc where to process the write queue + */ +static void +continue_writing (struct StreamClient *sc) +{ + struct WriteQueueItem *wqi; + + if (NULL != sc->wh) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write pending, waiting for it to complete\n"); + return; /* write already pending */ + } + if (NULL == (wqi = sc->wqi_head)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write queue empty, reading more requests\n"); + continue_reading (sc); + return; + } + GNUNET_CONTAINER_DLL_remove (sc->wqi_head, + sc->wqi_tail, + wqi); + sc->wh = GNUNET_STREAM_write (sc->socket, + &wqi[1], wqi->msize, + GNUNET_TIME_UNIT_FOREVER_REL, + &write_continuation, + sc); + if (NULL != sc->wh) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Gave %u bytes for stream for transmission\n", + (unsigned int) wqi->msize); + GNUNET_free (wqi); + if (NULL == sc->wh) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Write failed; terminating stream\n"); + terminate_stream (sc); + return; + } +} + + +/** + * Process a datum that was stored in the datastore. + * + * @param cls closure with the struct StreamClient which sent the query + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + */ +static void +handle_datastore_reply (void *cls, + const struct GNUNET_HashCode * key, + size_t size, const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute + expiration, uint64_t uid) +{ + struct StreamClient *sc = cls; + size_t msize = size + sizeof (struct StreamReplyMessage); + struct WriteQueueItem *wqi; + struct StreamReplyMessage *srm; + + sc->qe = NULL; + if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Performing on-demand encoding\n"); + if (GNUNET_OK != + GNUNET_FS_handle_on_demand_block (key, + size, data, type, + priority, anonymity, + expiration, uid, + &handle_datastore_reply, + sc)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "On-demand encoding request failed\n"); + continue_writing (sc); + } + return; + } + if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + continue_writing (sc); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting transmission of %u byte reply for query `%s' via stream\n", + (unsigned int) size, + GNUNET_h2s (key)); + wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize); + wqi->msize = msize; + srm = (struct StreamReplyMessage *) &wqi[1]; + srm->header.size = htons ((uint16_t) msize); + srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY); + srm->type = htonl (type); + srm->expiration = GNUNET_TIME_absolute_hton (expiration); + memcpy (&srm[1], data, size); + sc->reply_size = msize; + GNUNET_CONTAINER_DLL_insert (sc->wqi_head, + sc->wqi_tail, + wqi); + continue_writing (sc); +} + + +/** + * Functions with this signature are called whenever a + * complete query message is received. + * + * Do not call GNUNET_SERVER_mst_destroy in callback + * + * @param cls closure with the 'struct StreamClient' + * @param client identification of the client, NULL + * @param message the actual message + * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing + */ +static int +request_cb (void *cls, + void *client, + const struct GNUNET_MessageHeader *message) +{ + struct StreamClient *sc = cls; + const struct StreamQueryMessage *sqm; + + switch (ntohs (message->type)) + { + case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY: + if (sizeof (struct StreamQueryMessage) != + ntohs (message->size)) + { + GNUNET_break_op (0); + terminate_stream_async (sc); + return GNUNET_SYSERR; + } + sqm = (const struct StreamQueryMessage *) message; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received query for `%s' via stream\n", + GNUNET_h2s (&sqm->query)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# queries received via stream"), 1, + GNUNET_NO); + refresh_timeout_task (sc); + sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh, + 0, + &sqm->query, + ntohl (sqm->type), + 0 /* priority */, + GSF_datastore_queue_size, + GNUNET_TIME_UNIT_FOREVER_REL, + &handle_datastore_reply, sc); + if (NULL == sc->qe) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Queueing request with datastore failed (queue full?)\n"); + continue_writing (sc); + } + return GNUNET_OK; + default: + GNUNET_break_op (0); + terminate_stream_async (sc); + return GNUNET_SYSERR; + } +} + + +/** + * Functions of this type are called upon new stream connection from other peers + * or upon binding error which happen when the app_port given in + * GNUNET_STREAM_listen() is already taken. + * + * @param cls the closure from GNUNET_STREAM_listen + * @param socket the socket representing the stream; NULL on binding error + * @param initiator the identity of the peer who wants to establish a stream + * with us; NULL on binding error + * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the + * stream (the socket will be invalid after the call) + */ +static int +accept_cb (void *cls, + struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_PeerIdentity *initiator) +{ + struct StreamClient *sc; + + if (NULL == socket) + return GNUNET_SYSERR; + if (sc_count >= sc_count_max) + { + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# stream client connections rejected"), 1, + GNUNET_NO); + return GNUNET_SYSERR; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Accepting inbound stream connection from `%s'\n", + GNUNET_i2s (initiator)); + GNUNET_STATISTICS_update (GSF_stats, + gettext_noop ("# stream connections active"), 1, + GNUNET_NO); + sc = GNUNET_malloc (sizeof (struct StreamClient)); + sc->socket = socket; + sc->mst = GNUNET_SERVER_mst_create (&request_cb, + sc); + sc->rh = GNUNET_STREAM_read (sc->socket, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_request, + sc); + GNUNET_CONTAINER_DLL_insert (sc_head, + sc_tail, + sc); + sc_count++; + refresh_timeout_task (sc); + return GNUNET_OK; +} + + +/** + * Initialize subsystem for non-anonymous file-sharing. + */ +void +GSF_stream_start () +{ + stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES); + if (GNUNET_YES == + GNUNET_CONFIGURATION_get_value_number (GSF_cfg, + "fs", + "MAX_STREAM_CLIENTS", + &sc_count_max)) + { + listen_socket = GNUNET_STREAM_listen (GSF_cfg, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + &accept_cb, NULL, + GNUNET_STREAM_OPTION_END); + } +} + + +/** + * Function called on each active streams to shut them down. + * + * @param cls NULL + * @param key target peer, unused + * @param value the 'struct StreamHandle' to destroy + * @return GNUNET_YES (continue to iterate) + */ +static int +release_streams (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct StreamHandle *sh = value; + + destroy_stream_handle (sh); + return GNUNET_YES; +} + + +/** + * Shutdown subsystem for non-anonymous file-sharing. + */ +void +GSF_stream_stop () +{ + struct StreamClient *sc; + + while (NULL != (sc = sc_head)) + terminate_stream (sc); + if (NULL != listen_socket) + { + GNUNET_STREAM_listen_close (listen_socket); + listen_socket = NULL; + } + GNUNET_CONTAINER_multihashmap_iterate (stream_map, + &release_streams, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (stream_map); + stream_map = NULL; +} + +/* end of gnunet-service-fs_stream.c */ |