aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgrothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96>2012-11-22 15:49:49 +0000
committergrothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96>2012-11-22 15:49:49 +0000
commit8a41d98a8c38bd0da5806e91095c3a192f726b2a (patch)
treef2d6c6059e12ed80ef0ec2276d383e8f566b2de4
parent57c802fabc10e68ce291dfde52078c64434914b7 (diff)
starting to use stream in fs
git-svn-id: https://gnunet.org/svn/gnunet@25106 140774ce-b5e7-0310-ab8b-a85725594a96
-rw-r--r--src/Makefile.am2
-rw-r--r--src/fs/Makefile.am4
-rw-r--r--src/fs/gnunet-service-fs.c3
-rw-r--r--src/fs/gnunet-service-fs_stream.c219
-rw-r--r--src/fs/gnunet-service-fs_stream.h42
-rw-r--r--src/include/gnunet_applications.h4
-rw-r--r--src/include/gnunet_stream_lib.h1
-rw-r--r--src/stream/stream_api.c7
8 files changed, 278 insertions, 4 deletions
diff --git a/src/Makefile.am b/src/Makefile.am
index 08b93efb0b..9a240c8c67 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -52,11 +52,11 @@ SUBDIRS = \
dht \
hostlist \
topology \
- fs \
regex \
mesh \
lockmanager \
stream \
+ fs \
$(LINUX_DIR) \
$(MINGW_DIR) \
integration-tests \
diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am
index f7af076823..419e878d6d 100644
--- a/src/fs/Makefile.am
+++ b/src/fs/Makefile.am
@@ -164,13 +164,15 @@ gnunet_service_fs_SOURCES = \
gnunet-service-fs_pe.c gnunet-service-fs_pe.h \
gnunet-service-fs_pr.c gnunet-service-fs_pr.h \
gnunet-service-fs_push.c gnunet-service-fs_push.h \
- gnunet-service-fs_put.c gnunet-service-fs_put.h
+ gnunet-service-fs_put.c gnunet-service-fs_put.h \
+ gnunet-service-fs_stream.c gnunet-service-fs_sream.h
gnunet_service_fs_LDADD = \
$(top_builddir)/src/fs/libgnunetfs.la \
$(top_builddir)/src/dht/libgnunetdht.la \
$(top_builddir)/src/block/libgnunetblock.la \
$(top_builddir)/src/datastore/libgnunetdatastore.la \
$(top_builddir)/src/statistics/libgnunetstatistics.la \
+ $(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/ats/libgnunetats.la \
$(top_builddir)/src/core/libgnunetcore.la \
$(top_builddir)/src/util/libgnunetutil.la \
diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c
index a129288abc..3a8a076f65 100644
--- a/src/fs/gnunet-service-fs.c
+++ b/src/fs/gnunet-service-fs.c
@@ -46,6 +46,7 @@
#include "gnunet-service-fs_pr.h"
#include "gnunet-service-fs_push.h"
#include "gnunet-service-fs_put.h"
+#include "gnunet-service-fs_stream.h"
#include "fs.h"
/**
@@ -439,6 +440,7 @@ handle_start_search (void *cls, struct GNUNET_SERVER_Client *client,
static void
shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ GSF_stream_stop ();
if (NULL != GSF_core)
{
GNUNET_CORE_disconnect (GSF_core);
@@ -595,6 +597,7 @@ main_init (struct GNUNET_SERVER_Handle *server,
GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters,
NULL);
datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
+ GSF_stream_start ();
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task,
NULL);
return GNUNET_OK;
diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c
new file mode 100644
index 0000000000..befa90dd5d
--- /dev/null
+++ b/src/fs/gnunet-service-fs_stream.c
@@ -0,0 +1,219 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_stream.c
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ *
+ * TODO:
+ * - limit # concurrent clients, timeout for read
+ */
+#include "platform.h"
+#include "gnunet_constants.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_stream_lib.h"
+#include "gnunet_protocols.h"
+#include "gnunet_applications.h"
+#include "gnunet-service-fs.h"
+#include "gnunet-service-fs_stream.h"
+
+/**
+ * Information we keep around for each active streaming client.
+ */
+struct StreamClient
+{
+ /**
+ * DLL
+ */
+ struct StreamClient *next;
+
+ /**
+ * DLL
+ */
+ struct StreamClient *prev;
+
+ /**
+ * Socket for communication.
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Handle for active read operation, or NULL.
+ */
+ struct GNUNET_STREAM_IOReadHandle *rh;
+
+ /**
+ * Handle for active write operation, or NULL.
+ */
+ struct GNUNET_STREAM_IOWriteHandle *wh;
+
+ /**
+ * Size of the last write that was initiated.
+ */
+ size_t reply_size;
+
+};
+
+
+/**
+ * Listen socket for incoming requests.
+ */
+static struct GNUNET_STREAM_ListenSocket *listen_socket;
+
+/**
+ * Head of DLL of stream clients.
+ */
+static struct StreamClient *sc_head;
+
+/**
+ * Tail of DLL of stream clients.
+ */
+static struct StreamClient *sc_tail;
+
+
+/**
+ * We're done with a particular client, clean up.
+ *
+ * @param sc client to clean up
+ */
+static void
+terminate_stream (struct StreamClient *sc)
+{
+ if (NULL != sc->rh)
+ GNUNET_STREAM_io_read_cancel (sc->rh);
+ if (NULL != sc->wh)
+ GNUNET_STREAM_io_write_cancel (sc->wh);
+ GNUNET_STREAM_close (sc->socket);
+ GNUNET_CONTAINER_DLL_remove (sc_head,
+ sc_tail,
+ sc);
+ GNUNET_free (sc);
+}
+
+
+/**
+ * Functions of this signature are called whenever data is available from the
+ * stream.
+ *
+ * @param cls the closure from GNUNET_STREAM_read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read; will be 0 on timeout
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+process_request (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *data,
+ size_t size)
+{
+ struct StreamClient *sc = cls;
+
+ sc->rh = NULL;
+ switch (status)
+ {
+ case GNUNET_STREAM_OK:
+ // fixme: handle request...
+ break;
+ case GNUNET_STREAM_TIMEOUT:
+ case GNUNET_STREAM_SHUTDOWN:
+ case GNUNET_STREAM_SYSERR:
+ case GNUNET_STREAM_BROKEN:
+ terminate_stream (sc);
+ return size;
+ default:
+ GNUNET_break (0);
+ return size;
+ }
+ sc->rh = GNUNET_STREAM_read (sc->socket,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_request,
+ sc);
+ return size;
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other peers
+ * or upon binding error which happen when the app_port given in
+ * GNUNET_STREAM_listen() is already taken.
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream; NULL on binding error
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us; NULL on binding error
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+accept_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ struct StreamClient *sc;
+
+ if (NULL == socket)
+ return GNUNET_SYSERR;
+ sc = GNUNET_malloc (sizeof (struct StreamClient));
+ sc->socket = socket;
+ sc->rh = GNUNET_STREAM_read (sc->socket,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &process_request,
+ sc);
+ GNUNET_CONTAINER_DLL_insert (sc_head,
+ sc_tail,
+ sc);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_start ()
+{
+ listen_socket = GNUNET_STREAM_listen (GSF_cfg,
+ GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER,
+ &accept_cb, NULL,
+ GNUNET_STREAM_OPTION_END);
+}
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_stop ()
+{
+ struct StreamClient *sc;
+
+ while (NULL != (sc = sc_head))
+ terminate_stream (sc);
+ if (NULL != listen_socket)
+ {
+ GNUNET_STREAM_listen_close (listen_socket);
+ listen_socket = NULL;
+ }
+}
+
+/* end of gnunet-service-fs_stream.c */
diff --git a/src/fs/gnunet-service-fs_stream.h b/src/fs/gnunet-service-fs_stream.h
new file mode 100644
index 0000000000..daa617290e
--- /dev/null
+++ b/src/fs/gnunet-service-fs_stream.h
@@ -0,0 +1,42 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file fs/gnunet-service-fs_stream.h
+ * @brief non-anonymous file-transfer
+ * @author Christian Grothoff
+ */
+#ifndef GNUNET_SERVICE_FS_STREAM_H
+#define GNUNET_SERVICE_FS_STREAM_H
+
+/**
+ * Initialize subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_start (void);
+
+
+/**
+ * Shutdown subsystem for non-anonymous file-sharing.
+ */
+void
+GSF_stream_stop (void);
+
+#endif
diff --git a/src/include/gnunet_applications.h b/src/include/gnunet_applications.h
index 1e9db3e724..142194ec0f 100644
--- a/src/include/gnunet_applications.h
+++ b/src/include/gnunet_applications.h
@@ -50,6 +50,10 @@ extern "C"
*/
#define GNUNET_APPLICATION_TYPE_INTERNET_RESOLVER 2
+/**
+ * Transfer of blocks for non-anonymmous file-sharing.
+ */
+#define GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER 3
/**
* Internet IPv4 gateway (any TCP/UDP/ICMP).
diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h
index 5d5307f5fd..b7e3e4ce27 100644
--- a/src/include/gnunet_stream_lib.h
+++ b/src/include/gnunet_stream_lib.h
@@ -66,6 +66,7 @@ enum GNUNET_STREAM_Status
/**
* An error resulted in an unusable stream
+ * FIXME: status code unused?
*/
GNUNET_STREAM_BROKEN
};
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index fd2f86e51b..46f7abb476 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -34,7 +34,6 @@
* @author Sree Harsha Totakura
*/
-
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_crypto_lib.h"
@@ -1918,6 +1917,8 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
that that stream has been shutdown */
if (NULL != socket->write_handle)
{
+ // FIXME: this breaks if 'write_cont' decides to
+ // call SOCKET_close!
if (NULL != socket->write_handle->write_cont)
socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
GNUNET_STREAM_SHUTDOWN, 0);
@@ -2040,6 +2041,8 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
that that stream has been shutdown */
if (NULL != socket->write_handle)
{
+ // FIXME: this breaks if 'write_cont' decides to
+ // call SOCKET_close!
if (NULL != socket->write_handle->write_cont)
socket->write_handle->write_cont (socket->write_handle->write_cont_cls,
GNUNET_STREAM_SHUTDOWN, 0);
@@ -3543,11 +3546,11 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
case STATE_RECEIVE_CLOSE_WAIT:
case STATE_CLOSED:
case STATE_CLOSE_WAIT:
- proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: %s() END\n",
GNUNET_i2s (&socket->other_peer),
__func__);
+ proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
return NULL;
default:
break;