aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_stream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_stream.c')
-rw-r--r--src/fs/gnunet-service-fs_stream.c1466
1 files changed, 1466 insertions, 0 deletions
diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c
new file mode 100644
index 0000000..3838b1c
--- /dev/null
+++ b/src/fs/gnunet-service-fs_stream.c
@@ -0,0 +1,1466 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_stream.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_stream_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_indexing.h"
+#include "gnunet-service-fs_stream.h"
+
+/**
+ * After how long do we termiante idle connections?
+ */
+#define IDLE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 2)
+
+/**
+ * After how long do we reset connections without replies?
+ */
+#define CLIENT_RETRY_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30)
+
+
+/**
+ * A message in the queue to be written to the stream.
+ */
+struct WriteQueueItem
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct WriteQueueItem *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct WriteQueueItem *prev;
+
+ /**
+ * Number of bytes of payload, allocated at the end of this struct.
+ */
+ size_t msize;
+};
+
+
+/**
+ * Information we keep around for each active streaming client.
+ */
+struct StreamClient
+{
+ /**
+ * DLL
+ */
+ struct StreamClient *next;
+
+ /**
+ * DLL
+ */
+ struct StreamClient *prev;
+
+ /**
+ * Socket for communication.
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Handle for active read operation, or NULL.
+ */
+ struct GNUNET_STREAM_ReadHandle *rh;
+
+ /**
+ * Handle for active write operation, or NULL.
+ */
+ struct GNUNET_STREAM_WriteHandle *wh;
+
+ /**
+ * Head of write queue.
+ */
+ struct WriteQueueItem *wqi_head;
+
+ /**
+ * Tail of write queue.
+ */
+ struct WriteQueueItem *wqi_tail;
+
+ /**
+ * Tokenizer for requests.
+ */
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+ /**
+ * Current active request to the datastore, if we have one pending.
+ */
+ struct GNUNET_DATASTORE_QueueEntry *qe;
+
+ /**
+ * Task that is scheduled to asynchronously terminate the connection.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier terminate_task;
+
+ /**
+ * Task that is scheduled to terminate idle connections.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Size of the last write that was initiated.
+ */
+ size_t reply_size;
+
+};
+
+
+/**
+ * Query from one peer, asking the other for CHK-data.
+ */
+struct StreamQueryMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Block type must be DBLOCK or IBLOCK.
+ */
+ uint32_t type;
+
+ /**
+ * Query hash from CHK (hash of encrypted block).
+ */
+ struct GNUNET_HashCode query;
+
+};
+
+
+/**
+ * Reply to a StreamQueryMessage.
+ */
+struct StreamReplyMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Block type must be DBLOCK or IBLOCK.
+ */
+ uint32_t type;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_AbsoluteNBO expiration;
+
+ /* followed by the encrypted block */
+
+};
+
+
+/**
+ * Handle for a stream to another peer.
+ */
+struct StreamHandle;
+
+
+/**
+ * Handle for a request that is going out via stream API.
+ */
+struct GSF_StreamRequest
+{
+
+ /**
+ * DLL.
+ */
+ struct GSF_StreamRequest *next;
+
+ /**
+ * DLL.
+ */
+ struct GSF_StreamRequest *prev;
+
+ /**
+ * Which stream is this request associated with?
+ */
+ struct StreamHandle *sh;
+
+ /**
+ * Function to call with the result.
+ */
+ GSF_StreamReplyProcessor proc;
+
+ /**
+ * Closure for 'proc'
+ */
+ void *proc_cls;
+
+ /**
+ * Query to transmit to the other peer.
+ */
+ struct GNUNET_HashCode query;
+
+ /**
+ * Desired type for the reply.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Did we transmit this request already? YES if we are
+ * in the 'waiting' DLL, NO if we are in the 'pending' DLL.
+ */
+ int was_transmitted;
+};
+
+
+/**
+ * Handle for a stream to another peer.
+ */
+struct StreamHandle
+{
+ /**
+ * Head of DLL of pending requests on this stream.
+ */
+ struct GSF_StreamRequest *pending_head;
+
+ /**
+ * Tail of DLL of pending requests on this stream.
+ */
+ struct GSF_StreamRequest *pending_tail;
+
+ /**
+ * Map from query to 'struct GSF_StreamRequest's waiting for
+ * a reply.
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *waiting_map;
+
+ /**
+ * Connection to the other peer.
+ */
+ struct GNUNET_STREAM_Socket *stream;
+
+ /**
+ * Handle for active read operation, or NULL.
+ */
+ struct GNUNET_STREAM_ReadHandle *rh;
+
+ /**
+ * Handle for active write operation, or NULL.
+ */
+ struct GNUNET_STREAM_WriteHandle *wh;
+
+ /**
+ * Tokenizer for replies.
+ */
+ struct GNUNET_SERVER_MessageStreamTokenizer *mst;
+
+ /**
+ * Which peer does this stream go to?
+ */
+ struct GNUNET_PeerIdentity target;
+
+ /**
+ * Task to kill inactive streams (we keep them around for
+ * a few seconds to give the application a chance to give
+ * us another query).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Task to reset streams that had errors (asynchronously,
+ * as we may not be able to do it immediately during a
+ * callback from the stream API).
+ */
+ GNUNET_SCHEDULER_TaskIdentifier reset_task;
+
+ /**
+ * Is this stream ready for transmission?
+ */
+ int is_ready;
+
+};
+
+
+/**
+ * Listen socket for incoming requests.
+ */
+static struct GNUNET_STREAM_ListenSocket *listen_socket;
+
+/**
+ * Head of DLL of stream clients.
+ */
+static struct StreamClient *sc_head;
+
+/**
+ * Tail of DLL of stream clients.
+ */
+static struct StreamClient *sc_tail;
+
+/**
+ * Number of active stream clients in the 'sc_*'-DLL.
+ */
+static unsigned int sc_count;
+
+/**
+ * Maximum allowed number of stream clients.
+ */
+static unsigned long long sc_count_max;
+
+/**
+ * Map from peer identities to 'struct StreamHandles' with streams to
+ * those peers.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *stream_map;
+
+
+/* ********************* client-side code ************************* */
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * call the 'proc' continuation and release associated
+ * resources.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to clean up
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+free_waiting_entry (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GSF_StreamRequest *sr = value;
+
+ sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ 0, NULL);
+ GSF_stream_query_cancel (sr);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Destroy a stream handle.
+ *
+ * @param sh stream to process
+ */
+static void
+destroy_stream_handle (struct StreamHandle *sh)
+{
+ struct GSF_StreamRequest *sr;
+
+ while (NULL != (sr = sh->pending_head))
+ {
+ sr->proc (sr->proc_cls, GNUNET_BLOCK_TYPE_ANY,
+ GNUNET_TIME_UNIT_FOREVER_ABS,
+ 0, NULL);
+ GSF_stream_query_cancel (sr);
+ }
+ GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+ &free_waiting_entry,
+ sh);
+ if (NULL != sh->wh)
+ GNUNET_STREAM_write_cancel (sh->wh);
+ if (NULL != sh->rh)
+ GNUNET_STREAM_read_cancel (sh->rh);
+ if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
+ GNUNET_SCHEDULER_cancel (sh->timeout_task);
+ if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
+ GNUNET_SCHEDULER_cancel (sh->reset_task);
+ GNUNET_STREAM_close (sh->stream);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (stream_map,
+ &sh->target.hashPubKey,
+ sh));
+ GNUNET_CONTAINER_multihashmap_destroy (sh->waiting_map);
+ GNUNET_free (sh);
+}
+
+
+/**
+ * Transmit pending requests via the stream.
+ *
+ * @param sh stream to process
+ */
+static void
+transmit_pending (struct StreamHandle *sh);
+
+
+/**
+ * Function called once the stream is ready for transmission.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param socket stream socket handle
+ */
+static void
+stream_ready_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct StreamHandle *sh = cls;
+
+ sh->is_ready = GNUNET_YES;
+ transmit_pending (sh);
+}
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * move it back to the pending list.
+ *
+ * @param cls the 'struct StreamHandle'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to move to pending
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+move_to_pending (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct StreamHandle *sh = cls;
+ struct GSF_StreamRequest *sr = value;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+ key,
+ value));
+ GNUNET_CONTAINER_DLL_insert (sh->pending_head,
+ sh->pending_tail,
+ sr);
+ sr->was_transmitted = GNUNET_NO;
+ return GNUNET_YES;
+}
+
+
+/**
+ * We had a serious error, tear down and re-create stream from scratch.
+ *
+ * @param sh stream to reset
+ */
+static void
+reset_stream (struct StreamHandle *sh)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Resetting stream to %s\n",
+ GNUNET_i2s (&sh->target));
+ if (NULL != sh->rh)
+ {
+ GNUNET_STREAM_read_cancel (sh->rh);
+ sh->rh = NULL;
+ }
+ GNUNET_STREAM_close (sh->stream);
+ sh->is_ready = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_iterate (sh->waiting_map,
+ &move_to_pending,
+ sh);
+ sh->stream = GNUNET_STREAM_open (GSF_cfg,
+ &sh->target,
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ &stream_ready_cb, sh,
+ GNUNET_STREAM_OPTION_END);
+}
+
+
+/**
+ * Task called when it is time to destroy an inactive stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+stream_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamHandle *sh = cls;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Timeout on stream to %s\n",
+ GNUNET_i2s (&sh->target));
+ sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ destroy_stream_handle (sh);
+}
+
+
+/**
+ * Task called when it is time to reset an stream.
+ *
+ * @param cls the 'struct StreamHandle' to tear down
+ * @param tc scheduler context, unused
+ */
+static void
+reset_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamHandle *sh = cls;
+
+ sh->reset_task = GNUNET_SCHEDULER_NO_TASK;
+ reset_stream (sh);
+}
+
+
+/**
+ * We had a serious error, tear down and re-create stream from scratch,
+ * but do so asynchronously.
+ *
+ * @param sh stream to reset
+ */
+static void
+reset_stream_async (struct StreamHandle *sh)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != sh->reset_task)
+ GNUNET_SCHEDULER_cancel (sh->reset_task);
+ sh->reset_task = GNUNET_SCHEDULER_add_now (&reset_stream_task,
+ sh);
+}
+
+
+/**
+ * We got a reply from the stream. Process it.
+ *
+ * @param cls the struct StreamHandle
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on timeout
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+handle_stream_reply (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *data,
+ size_t size)
+{
+ struct StreamHandle *sh = cls;
+
+ sh->rh = NULL;
+ GNUNET_SCHEDULER_cancel (sh->reset_task);
+ sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
+ &reset_stream_task,
+ sh);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u bytes from stream to %s\n",
+ (unsigned int) size,
+ GNUNET_i2s (&sh->target));
+ if (GNUNET_SYSERR ==
+ GNUNET_SERVER_mst_receive (sh->mst,
+ NULL,
+ data, size,
+ GNUNET_NO, GNUNET_NO))
+ {
+ GNUNET_break_op (0);
+ reset_stream_async (sh);
+ return size;
+ }
+ if (NULL == sh->rh)
+ sh->rh = GNUNET_STREAM_read (sh->stream,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &handle_stream_reply,
+ sh);
+ return size;
+}
+
+
+/**
+ * Functions of this signature are called whenever we transmitted a
+ * query via a stream.
+ *
+ * @param cls the struct StreamHandle for which we did the write call
+ * @param status the status of the stream at the time this function is called;
+ * GNUNET_OK if writing to stream was completed successfully,
+ * GNUNET_STREAM_SHUTDOWN if the stream is shutdown for writing in the
+ * mean time.
+ * @param size the number of bytes written
+ */
+static void
+query_write_continuation (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size)
+{
+ struct StreamHandle *sh = cls;
+
+ sh->wh = NULL;
+ if ( (GNUNET_STREAM_OK != status) ||
+ (sizeof (struct StreamQueryMessage) != size) )
+ {
+ reset_stream (sh);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Successfully transmitted %u bytes via stream to %s\n",
+ (unsigned int) size,
+ GNUNET_i2s (&sh->target));
+ if (NULL == sh->rh)
+ sh->rh = GNUNET_STREAM_read (sh->stream,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &handle_stream_reply,
+ sh);
+ transmit_pending (sh);
+}
+
+
+/**
+ * Transmit pending requests via the stream.
+ *
+ * @param sh stream to process
+ */
+static void
+transmit_pending (struct StreamHandle *sh)
+{
+ struct StreamQueryMessage sqm;
+ struct GSF_StreamRequest *sr;
+
+ if (NULL != sh->wh)
+ return;
+ sr = sh->pending_head;
+ if (NULL == sr)
+ return;
+ GNUNET_CONTAINER_DLL_remove (sh->pending_head,
+ sh->pending_tail,
+ sr);
+ GNUNET_CONTAINER_multihashmap_put (sh->waiting_map,
+ &sr->query,
+ sr,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending query via stream to %s\n",
+ GNUNET_i2s (&sh->target));
+ sr->was_transmitted = GNUNET_YES;
+ sqm.header.size = htons (sizeof (sqm));
+ sqm.header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY);
+ sqm.type = htonl (sr->type);
+ sqm.query = sr->query;
+ sh->wh = GNUNET_STREAM_write (sh->stream,
+ &sqm, sizeof (sqm),
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &query_write_continuation,
+ sh);
+}
+
+
+/**
+ * Closure for 'handle_reply'.
+ */
+struct HandleReplyClosure
+{
+
+ /**
+ * Reply payload.
+ */
+ const void *data;
+
+ /**
+ * Expiration time for the block.
+ */
+ struct GNUNET_TIME_Absolute expiration;
+
+ /**
+ * Number of bytes in 'data'.
+ */
+ size_t data_size;
+
+ /**
+ * Type of the block.
+ */
+ enum GNUNET_BLOCK_Type type;
+
+ /**
+ * Did we have a matching query?
+ */
+ int found;
+};
+
+
+/**
+ * Iterator called on each entry in a waiting map to
+ * process a result.
+ *
+ * @param cls the 'struct HandleReplyClosure'
+ * @param key the key of the entry in the map (the query)
+ * @param value the 'struct GSF_StreamRequest' to handle result for
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+handle_reply (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct HandleReplyClosure *hrc = cls;
+ struct GSF_StreamRequest *sr = value;
+
+ sr->proc (sr->proc_cls,
+ hrc->type,
+ hrc->expiration,
+ hrc->data_size,
+ hrc->data);
+ GSF_stream_query_cancel (sr);
+ hrc->found = GNUNET_YES;
+ return GNUNET_YES;
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete reply is received.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure with the 'struct StreamHandle'
+ * @param client identification of the client, NULL
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+reply_cb (void *cls,
+ void *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct StreamHandle *sh = cls;
+ const struct StreamReplyMessage *srm;
+ struct HandleReplyClosure hrc;
+ uint16_t msize;
+ enum GNUNET_BLOCK_Type type;
+ struct GNUNET_HashCode query;
+
+ msize = ntohs (message->size);
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY:
+ if (sizeof (struct StreamReplyMessage) > msize)
+ {
+ GNUNET_break_op (0);
+ reset_stream_async (sh);
+ return GNUNET_SYSERR;
+ }
+ srm = (const struct StreamReplyMessage *) message;
+ msize -= sizeof (struct StreamReplyMessage);
+ type = (enum GNUNET_BLOCK_Type) ntohl (srm->type);
+ if (GNUNET_YES !=
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
+ type,
+ &srm[1], msize, &query))
+ {
+ GNUNET_break_op (0);
+ reset_stream_async (sh);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received reply `%s' via stream\n",
+ GNUNET_h2s (&query));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies received via stream"), 1,
+ GNUNET_NO);
+ hrc.data = &srm[1];
+ hrc.data_size = msize;
+ hrc.expiration = GNUNET_TIME_absolute_ntoh (srm->expiration);
+ hrc.type = type;
+ hrc.found = GNUNET_NO;
+ GNUNET_CONTAINER_multihashmap_get_multiple (sh->waiting_map,
+ &query,
+ &handle_reply,
+ &hrc);
+ if (GNUNET_NO == hrc.found)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# replies received via stream dropped"), 1,
+ GNUNET_NO);
+ return GNUNET_OK;
+ }
+ return GNUNET_OK;
+ default:
+ GNUNET_break_op (0);
+ reset_stream_async (sh);
+ return GNUNET_SYSERR;
+ }
+}
+
+
+/**
+ * Get (or create) a stream to talk to the given peer.
+ *
+ * @param target peer we want to communicate with
+ */
+static struct StreamHandle *
+get_stream (const struct GNUNET_PeerIdentity *target)
+{
+ struct StreamHandle *sh;
+
+ sh = GNUNET_CONTAINER_multihashmap_get (stream_map,
+ &target->hashPubKey);
+ if (NULL != sh)
+ {
+ if (GNUNET_SCHEDULER_NO_TASK != sh->timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (sh->timeout_task);
+ sh->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ return sh;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating stream to %s\n",
+ GNUNET_i2s (target));
+ sh = GNUNET_malloc (sizeof (struct StreamHandle));
+ sh->reset_task = GNUNET_SCHEDULER_add_delayed (CLIENT_RETRY_TIMEOUT,
+ &reset_stream_task,
+ sh);
+ sh->mst = GNUNET_SERVER_mst_create (&reply_cb,
+ sh);
+ sh->waiting_map = GNUNET_CONTAINER_multihashmap_create (512, GNUNET_YES);
+ sh->target = *target;
+ sh->stream = GNUNET_STREAM_open (GSF_cfg,
+ &sh->target,
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ &stream_ready_cb, sh,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (stream_map,
+ &sh->target.hashPubKey,
+ sh,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ return sh;
+}
+
+
+/**
+ * Look for a block by directly contacting a particular peer.
+ *
+ * @param target peer that should have the block
+ * @param query hash to query for the block
+ * @param type desired type for the block
+ * @param proc function to call with result
+ * @param proc_cls closure for 'proc'
+ * @return handle to cancel the operation
+ */
+struct GSF_StreamRequest *
+GSF_stream_query (const struct GNUNET_PeerIdentity *target,
+ const struct GNUNET_HashCode *query,
+ enum GNUNET_BLOCK_Type type,
+ GSF_StreamReplyProcessor proc, void *proc_cls)
+{
+ struct StreamHandle *sh;
+ struct GSF_StreamRequest *sr;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Preparing to send query for %s via stream to %s\n",
+ GNUNET_h2s (query),
+ GNUNET_i2s (target));
+ sh = get_stream (target);
+ sr = GNUNET_malloc (sizeof (struct GSF_StreamRequest));
+ sr->sh = sh;
+ sr->proc = proc;
+ sr->proc_cls = proc_cls;
+ sr->type = type;
+ sr->query = *query;
+ GNUNET_CONTAINER_DLL_insert (sh->pending_head,
+ sh->pending_tail,
+ sr);
+ if (GNUNET_YES == sh->is_ready)
+ transmit_pending (sh);
+ return sr;
+}
+
+
+/**
+ * Cancel an active request; must not be called after 'proc'
+ * was calld.
+ *
+ * @param sr request to cancel
+ */
+void
+GSF_stream_query_cancel (struct GSF_StreamRequest *sr)
+{
+ struct StreamHandle *sh = sr->sh;
+
+ if (GNUNET_YES == sr->was_transmitted)
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (sh->waiting_map,
+ &sr->query,
+ sr));
+ else
+ GNUNET_CONTAINER_DLL_remove (sh->pending_head,
+ sh->pending_tail,
+ sr);
+ GNUNET_free (sr);
+ if ( (0 == GNUNET_CONTAINER_multihashmap_size (sh->waiting_map)) &&
+ (NULL == sh->pending_head) )
+ sh->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
+ &stream_timeout,
+ sh);
+}
+
+
+/* ********************* server-side code ************************* */
+
+
+/**
+ * We're done with a particular client, clean up.
+ *
+ * @param sc client to clean up
+ */
+static void
+terminate_stream (struct StreamClient *sc)
+{
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# stream connections active"), -1,
+ GNUNET_NO);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->terminate_task)
+ GNUNET_SCHEDULER_cancel (sc->terminate_task);
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ if (NULL != sc->rh)
+ GNUNET_STREAM_read_cancel (sc->rh);
+ if (NULL != sc->wh)
+ GNUNET_STREAM_write_cancel (sc->wh);
+ if (NULL != sc->qe)
+ GNUNET_DATASTORE_cancel (sc->qe);
+ GNUNET_SERVER_mst_destroy (sc->mst);
+ GNUNET_STREAM_close (sc->socket);
+ struct WriteQueueItem *wqi;
+ while (NULL != (wqi = sc->wqi_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ GNUNET_free (wqi);
+ }
+
+
+ GNUNET_CONTAINER_DLL_remove (sc_head,
+ sc_tail,
+ sc);
+ sc_count--;
+ GNUNET_free (sc);
+}
+
+
+/**
+ * Task run to asynchronously terminate the stream.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */
+static void
+terminate_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamClient *sc = cls;
+
+ sc->terminate_task = GNUNET_SCHEDULER_NO_TASK;
+ terminate_stream (sc);
+}
+
+
+/**
+ * Task run to asynchronously terminate the stream due to timeout.
+ *
+ * @param cls the 'struct StreamClient'
+ * @param tc scheduler context
+ */
+static void
+timeout_stream_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct StreamClient *sc = cls;
+
+ sc->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ terminate_stream (sc);
+}
+
+
+/**
+ * Reset the timeout for the stream client (due to activity).
+ *
+ * @param sc client handle to reset timeout for
+ */
+static void
+refresh_timeout_task (struct StreamClient *sc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != sc->timeout_task)
+ GNUNET_SCHEDULER_cancel (sc->timeout_task);
+ sc->timeout_task = GNUNET_SCHEDULER_add_delayed (IDLE_TIMEOUT,
+ &timeout_stream_task,
+ sc);
+}
+
+
+/**
+ * We had a serious error, termiante stream,
+ * but do so asynchronously.
+ *
+ * @param sc stream to reset
+ */
+static void
+terminate_stream_async (struct StreamClient *sc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK == sc->terminate_task)
+ sc->terminate_task = GNUNET_SCHEDULER_add_now (&terminate_stream_task,
+ sc);
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on timeout
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+process_request (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *data,
+ size_t size);
+
+
+/**
+ * We're done handling a request from a client, read the next one.
+ *
+ * @param sc client to continue reading requests from
+ */
+static void
+continue_reading (struct StreamClient *sc)
+{
+ int ret;
+
+ ret =
+ GNUNET_SERVER_mst_receive (sc->mst,
+ NULL,
+ NULL, 0,
+ GNUNET_NO, GNUNET_NO);
+ if (GNUNET_NO == ret)
+ return;
+ refresh_timeout_task (sc);
+ if (NULL != sc->rh)
+ return;
+ sc->rh = GNUNET_STREAM_read (sc->socket,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_request,
+ sc);
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct StreamClient *sc);
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on timeout
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+process_request (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *data,
+ size_t size)
+{
+ struct StreamClient *sc = cls;
+ int ret;
+
+ sc->rh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received %u byte query via stream\n",
+ (unsigned int) size);
+ switch (status)
+ {
+ case GNUNET_STREAM_OK:
+ ret =
+ GNUNET_SERVER_mst_receive (sc->mst,
+ NULL,
+ data, size,
+ GNUNET_NO, GNUNET_NO);
+ if (GNUNET_NO == ret)
+ return size; /* more messages in MST */
+ if (GNUNET_SYSERR == ret)
+ {
+ GNUNET_break_op (0);
+ terminate_stream_async (sc);
+ return size;
+ }
+ break;
+ case GNUNET_STREAM_TIMEOUT:
+ case GNUNET_STREAM_SHUTDOWN:
+ case GNUNET_STREAM_SYSERR:
+ terminate_stream_async (sc);
+ return size;
+ default:
+ GNUNET_break (0);
+ return size;
+ }
+ continue_writing (sc);
+ return size;
+}
+
+
+/**
+ * Sending a reply was completed, continue processing.
+ *
+ * @param cls closure with the struct StreamClient which sent the query
+ * @param status result code for the operation
+ * @param size number of bytes that were transmitted
+ */
+static void
+write_continuation (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size)
+{
+ struct StreamClient *sc = cls;
+
+ sc->wh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write continuation called on 'server' side with status %d\n",
+ status);
+ if ( (GNUNET_STREAM_OK != status) ||
+ (size != sc->reply_size) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission of reply failed, terminating stream\n");
+ terminate_stream (sc);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmitted %u byte reply via stream\n",
+ (unsigned int) size);
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Blocks transferred via stream"), 1,
+ GNUNET_NO);
+ continue_writing (sc);
+}
+
+
+/**
+ * Transmit the next entry from the write queue.
+ *
+ * @param sc where to process the write queue
+ */
+static void
+continue_writing (struct StreamClient *sc)
+{
+ struct WriteQueueItem *wqi;
+
+ if (NULL != sc->wh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write pending, waiting for it to complete\n");
+ return; /* write already pending */
+ }
+ if (NULL == (wqi = sc->wqi_head))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write queue empty, reading more requests\n");
+ continue_reading (sc);
+ return;
+ }
+ GNUNET_CONTAINER_DLL_remove (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ sc->wh = GNUNET_STREAM_write (sc->socket,
+ &wqi[1], wqi->msize,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &write_continuation,
+ sc);
+ if (NULL != sc->wh)
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Gave %u bytes for stream for transmission\n",
+ (unsigned int) wqi->msize);
+ GNUNET_free (wqi);
+ if (NULL == sc->wh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Write failed; terminating stream\n");
+ terminate_stream (sc);
+ return;
+ }
+}
+
+
+/**
+ * Process a datum that was stored in the datastore.
+ *
+ * @param cls closure with the struct StreamClient which sent the query
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ */
+static void
+handle_datastore_reply (void *cls,
+ const struct GNUNET_HashCode * key,
+ size_t size, const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute
+ expiration, uint64_t uid)
+{
+ struct StreamClient *sc = cls;
+ size_t msize = size + sizeof (struct StreamReplyMessage);
+ struct WriteQueueItem *wqi;
+ struct StreamReplyMessage *srm;
+
+ sc->qe = NULL;
+ if (GNUNET_BLOCK_TYPE_FS_ONDEMAND == type)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Performing on-demand encoding\n");
+ if (GNUNET_OK !=
+ GNUNET_FS_handle_on_demand_block (key,
+ size, data, type,
+ priority, anonymity,
+ expiration, uid,
+ &handle_datastore_reply,
+ sc))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "On-demand encoding request failed\n");
+ continue_writing (sc);
+ }
+ return;
+ }
+ if (msize > GNUNET_SERVER_MAX_MESSAGE_SIZE)
+ {
+ GNUNET_break (0);
+ continue_writing (sc);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting transmission of %u byte reply for query `%s' via stream\n",
+ (unsigned int) size,
+ GNUNET_h2s (key));
+ wqi = GNUNET_malloc (sizeof (struct WriteQueueItem) + msize);
+ wqi->msize = msize;
+ srm = (struct StreamReplyMessage *) &wqi[1];
+ srm->header.size = htons ((uint16_t) msize);
+ srm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_STREAM_REPLY);
+ srm->type = htonl (type);
+ srm->expiration = GNUNET_TIME_absolute_hton (expiration);
+ memcpy (&srm[1], data, size);
+ sc->reply_size = msize;
+ GNUNET_CONTAINER_DLL_insert (sc->wqi_head,
+ sc->wqi_tail,
+ wqi);
+ continue_writing (sc);
+}
+
+
+/**
+ * Functions with this signature are called whenever a
+ * complete query message is received.
+ *
+ * Do not call GNUNET_SERVER_mst_destroy in callback
+ *
+ * @param cls closure with the 'struct StreamClient'
+ * @param client identification of the client, NULL
+ * @param message the actual message
+ * @return GNUNET_OK on success, GNUNET_SYSERR to stop further processing
+ */
+static int
+request_cb (void *cls,
+ void *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct StreamClient *sc = cls;
+ const struct StreamQueryMessage *sqm;
+
+ switch (ntohs (message->type))
+ {
+ case GNUNET_MESSAGE_TYPE_FS_STREAM_QUERY:
+ if (sizeof (struct StreamQueryMessage) !=
+ ntohs (message->size))
+ {
+ GNUNET_break_op (0);
+ terminate_stream_async (sc);
+ return GNUNET_SYSERR;
+ }
+ sqm = (const struct StreamQueryMessage *) message;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received query for `%s' via stream\n",
+ GNUNET_h2s (&sqm->query));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# queries received via stream"), 1,
+ GNUNET_NO);
+ refresh_timeout_task (sc);
+ sc->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+ 0,
+ &sqm->query,
+ ntohl (sqm->type),
+ 0 /* priority */,
+ GSF_datastore_queue_size,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &handle_datastore_reply, sc);
+ if (NULL == sc->qe)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing request with datastore failed (queue full?)\n");
+ continue_writing (sc);
+ }
+ return GNUNET_OK;
+ default:
+ GNUNET_break_op (0);
+ terminate_stream_async (sc);
+ return GNUNET_SYSERR;
+ }
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other peers
+ * or upon binding error which happen when the app_port given in
+ * GNUNET_STREAM_listen() is already taken.
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream; NULL on binding error
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us; NULL on binding error
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+accept_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ struct StreamClient *sc;
+
+ if (NULL == socket)
+ return GNUNET_SYSERR;
+ if (sc_count >= sc_count_max)
+ {
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# stream client connections rejected"), 1,
+ GNUNET_NO);
+ return GNUNET_SYSERR;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Accepting inbound stream connection from `%s'\n",
+ GNUNET_i2s (initiator));
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# stream connections active"), 1,
+ GNUNET_NO);
+ sc = GNUNET_malloc (sizeof (struct StreamClient));
+ sc->socket = socket;
+ sc->mst = GNUNET_SERVER_mst_create (&request_cb,
+ sc);
+ sc->rh = GNUNET_STREAM_read (sc->socket,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_request,
+ sc);
+ GNUNET_CONTAINER_DLL_insert (sc_head,
+ sc_tail,
+ sc);
+ sc_count++;
+ refresh_timeout_task (sc);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_start ()
+{
+ stream_map = GNUNET_CONTAINER_multihashmap_create (16, GNUNET_YES);
+ if (GNUNET_YES ==
+ GNUNET_CONFIGURATION_get_value_number (GSF_cfg,
+ "fs",
+ "MAX_STREAM_CLIENTS",
+ &sc_count_max))
+ {
+ listen_socket = GNUNET_STREAM_listen (GSF_cfg,
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ &accept_cb, NULL,
+ GNUNET_STREAM_OPTION_END);
+ }
+}
+
+
+/**
+ * Function called on each active streams to shut them down.
+ *
+ * @param cls NULL
+ * @param key target peer, unused
+ * @param value the 'struct StreamHandle' to destroy
+ * @return GNUNET_YES (continue to iterate)
+ */
+static int
+release_streams (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct StreamHandle *sh = value;
+
+ destroy_stream_handle (sh);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_stop ()
+{
+ struct StreamClient *sc;
+
+ while (NULL != (sc = sc_head))
+ terminate_stream (sc);
+ if (NULL != listen_socket)
+ {
+ GNUNET_STREAM_listen_close (listen_socket);
+ listen_socket = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_iterate (stream_map,
+ &release_streams,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (stream_map);
+ stream_map = NULL;
+}
+
+/* end of gnunet-service-fs_stream.c */