diff options
author | grothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96> | 2012-11-22 15:49:49 +0000 |
---|---|---|
committer | grothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96> | 2012-11-22 15:49:49 +0000 |
commit | 8a41d98a8c38bd0da5806e91095c3a192f726b2a (patch) | |
tree | f2d6c6059e12ed80ef0ec2276d383e8f566b2de4 | |
parent | 57c802fabc10e68ce291dfde52078c64434914b7 (diff) |
starting to use stream in fs
git-svn-id: https://gnunet.org/svn/gnunet@25106 140774ce-b5e7-0310-ab8b-a85725594a96
-rw-r--r-- | src/Makefile.am | 2 | ||||
-rw-r--r-- | src/fs/Makefile.am | 4 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs.c | 3 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_stream.c | 219 | ||||
-rw-r--r-- | src/fs/gnunet-service-fs_stream.h | 42 | ||||
-rw-r--r-- | src/include/gnunet_applications.h | 4 | ||||
-rw-r--r-- | src/include/gnunet_stream_lib.h | 1 | ||||
-rw-r--r-- | src/stream/stream_api.c | 7 |
8 files changed, 278 insertions, 4 deletions
diff --git a/src/Makefile.am b/src/Makefile.am index 08b93efb0b..9a240c8c67 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -52,11 +52,11 @@ SUBDIRS = \ dht \ hostlist \ topology \ - fs \ regex \ mesh \ lockmanager \ stream \ + fs \ $(LINUX_DIR) \ $(MINGW_DIR) \ integration-tests \ diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index f7af076823..419e878d6d 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -164,13 +164,15 @@ gnunet_service_fs_SOURCES = \ gnunet-service-fs_pe.c gnunet-service-fs_pe.h \ gnunet-service-fs_pr.c gnunet-service-fs_pr.h \ gnunet-service-fs_push.c gnunet-service-fs_push.h \ - gnunet-service-fs_put.c gnunet-service-fs_put.h + gnunet-service-fs_put.c gnunet-service-fs_put.h \ + gnunet-service-fs_stream.c gnunet-service-fs_sream.h gnunet_service_fs_LDADD = \ $(top_builddir)/src/fs/libgnunetfs.la \ $(top_builddir)/src/dht/libgnunetdht.la \ $(top_builddir)/src/block/libgnunetblock.la \ $(top_builddir)/src/datastore/libgnunetdatastore.la \ $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/stream/libgnunetstream.la \ $(top_builddir)/src/ats/libgnunetats.la \ $(top_builddir)/src/core/libgnunetcore.la \ $(top_builddir)/src/util/libgnunetutil.la \ diff --git a/src/fs/gnunet-service-fs.c b/src/fs/gnunet-service-fs.c index a129288abc..3a8a076f65 100644 --- a/src/fs/gnunet-service-fs.c +++ b/src/fs/gnunet-service-fs.c @@ -46,6 +46,7 @@ #include "gnunet-service-fs_pr.h" #include "gnunet-service-fs_push.h" #include "gnunet-service-fs_put.h" +#include "gnunet-service-fs_stream.h" #include "fs.h" /** @@ -439,6 +440,7 @@ handle_start_search (void *cls, struct GNUNET_SERVER_Client *client, static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + GSF_stream_stop (); if (NULL != GSF_core) { GNUNET_CORE_disconnect (GSF_core); @@ -595,6 +597,7 @@ main_init (struct GNUNET_SERVER_Handle *server, GNUNET_SCHEDULER_add_delayed (COVER_AGE_FREQUENCY, &age_cover_counters, NULL); datastore_get_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE); + GSF_stream_start (); GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, &shutdown_task, NULL); return GNUNET_OK; diff --git a/src/fs/gnunet-service-fs_stream.c b/src/fs/gnunet-service-fs_stream.c new file mode 100644 index 0000000000..befa90dd5d --- /dev/null +++ b/src/fs/gnunet-service-fs_stream.c @@ -0,0 +1,219 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_stream.c + * @brief non-anonymous file-transfer + * @author Christian Grothoff + * + * TODO: + * - limit # concurrent clients, timeout for read + */ +#include "platform.h" +#include "gnunet_constants.h" +#include "gnunet_util_lib.h" +#include "gnunet_stream_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_applications.h" +#include "gnunet-service-fs.h" +#include "gnunet-service-fs_stream.h" + +/** + * Information we keep around for each active streaming client. + */ +struct StreamClient +{ + /** + * DLL + */ + struct StreamClient *next; + + /** + * DLL + */ + struct StreamClient *prev; + + /** + * Socket for communication. + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Handle for active read operation, or NULL. + */ + struct GNUNET_STREAM_IOReadHandle *rh; + + /** + * Handle for active write operation, or NULL. + */ + struct GNUNET_STREAM_IOWriteHandle *wh; + + /** + * Size of the last write that was initiated. + */ + size_t reply_size; + +}; + + +/** + * Listen socket for incoming requests. + */ +static struct GNUNET_STREAM_ListenSocket *listen_socket; + +/** + * Head of DLL of stream clients. + */ +static struct StreamClient *sc_head; + +/** + * Tail of DLL of stream clients. + */ +static struct StreamClient *sc_tail; + + +/** + * We're done with a particular client, clean up. + * + * @param sc client to clean up + */ +static void +terminate_stream (struct StreamClient *sc) +{ + if (NULL != sc->rh) + GNUNET_STREAM_io_read_cancel (sc->rh); + if (NULL != sc->wh) + GNUNET_STREAM_io_write_cancel (sc->wh); + GNUNET_STREAM_close (sc->socket); + GNUNET_CONTAINER_DLL_remove (sc_head, + sc_tail, + sc); + GNUNET_free (sc); +} + + +/** + * Functions of this signature are called whenever data is available from the + * stream. + * + * @param cls the closure from GNUNET_STREAM_read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read; will be 0 on timeout + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +process_request (void *cls, + enum GNUNET_STREAM_Status status, + const void *data, + size_t size) +{ + struct StreamClient *sc = cls; + + sc->rh = NULL; + switch (status) + { + case GNUNET_STREAM_OK: + // fixme: handle request... + break; + case GNUNET_STREAM_TIMEOUT: + case GNUNET_STREAM_SHUTDOWN: + case GNUNET_STREAM_SYSERR: + case GNUNET_STREAM_BROKEN: + terminate_stream (sc); + return size; + default: + GNUNET_break (0); + return size; + } + sc->rh = GNUNET_STREAM_read (sc->socket, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_request, + sc); + return size; +} + + +/** + * Functions of this type are called upon new stream connection from other peers + * or upon binding error which happen when the app_port given in + * GNUNET_STREAM_listen() is already taken. + * + * @param cls the closure from GNUNET_STREAM_listen + * @param socket the socket representing the stream; NULL on binding error + * @param initiator the identity of the peer who wants to establish a stream + * with us; NULL on binding error + * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the + * stream (the socket will be invalid after the call) + */ +static int +accept_cb (void *cls, + struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_PeerIdentity *initiator) +{ + struct StreamClient *sc; + + if (NULL == socket) + return GNUNET_SYSERR; + sc = GNUNET_malloc (sizeof (struct StreamClient)); + sc->socket = socket; + sc->rh = GNUNET_STREAM_read (sc->socket, + GNUNET_TIME_UNIT_FOREVER_REL, + &process_request, + sc); + GNUNET_CONTAINER_DLL_insert (sc_head, + sc_tail, + sc); + return GNUNET_OK; +} + + +/** + * Initialize subsystem for non-anonymous file-sharing. + */ +void +GSF_stream_start () +{ + listen_socket = GNUNET_STREAM_listen (GSF_cfg, + GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER, + &accept_cb, NULL, + GNUNET_STREAM_OPTION_END); +} + + +/** + * Shutdown subsystem for non-anonymous file-sharing. + */ +void +GSF_stream_stop () +{ + struct StreamClient *sc; + + while (NULL != (sc = sc_head)) + terminate_stream (sc); + if (NULL != listen_socket) + { + GNUNET_STREAM_listen_close (listen_socket); + listen_socket = NULL; + } +} + +/* end of gnunet-service-fs_stream.c */ diff --git a/src/fs/gnunet-service-fs_stream.h b/src/fs/gnunet-service-fs_stream.h new file mode 100644 index 0000000000..daa617290e --- /dev/null +++ b/src/fs/gnunet-service-fs_stream.h @@ -0,0 +1,42 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file fs/gnunet-service-fs_stream.h + * @brief non-anonymous file-transfer + * @author Christian Grothoff + */ +#ifndef GNUNET_SERVICE_FS_STREAM_H +#define GNUNET_SERVICE_FS_STREAM_H + +/** + * Initialize subsystem for non-anonymous file-sharing. + */ +void +GSF_stream_start (void); + + +/** + * Shutdown subsystem for non-anonymous file-sharing. + */ +void +GSF_stream_stop (void); + +#endif diff --git a/src/include/gnunet_applications.h b/src/include/gnunet_applications.h index 1e9db3e724..142194ec0f 100644 --- a/src/include/gnunet_applications.h +++ b/src/include/gnunet_applications.h @@ -50,6 +50,10 @@ extern "C" */ #define GNUNET_APPLICATION_TYPE_INTERNET_RESOLVER 2 +/** + * Transfer of blocks for non-anonymmous file-sharing. + */ +#define GNUNET_APPLICATION_TYPE_FS_BLOCK_TRANSFER 3 /** * Internet IPv4 gateway (any TCP/UDP/ICMP). diff --git a/src/include/gnunet_stream_lib.h b/src/include/gnunet_stream_lib.h index 5d5307f5fd..b7e3e4ce27 100644 --- a/src/include/gnunet_stream_lib.h +++ b/src/include/gnunet_stream_lib.h @@ -66,6 +66,7 @@ enum GNUNET_STREAM_Status /** * An error resulted in an unusable stream + * FIXME: status code unused? */ GNUNET_STREAM_BROKEN }; diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index fd2f86e51b..46f7abb476 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -34,7 +34,6 @@ * @author Sree Harsha Totakura */ - #include "platform.h" #include "gnunet_common.h" #include "gnunet_crypto_lib.h" @@ -1918,6 +1917,8 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket, that that stream has been shutdown */ if (NULL != socket->write_handle) { + // FIXME: this breaks if 'write_cont' decides to + // call SOCKET_close! if (NULL != socket->write_handle->write_cont) socket->write_handle->write_cont (socket->write_handle->write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); @@ -2040,6 +2041,8 @@ handle_close (struct GNUNET_STREAM_Socket *socket, that that stream has been shutdown */ if (NULL != socket->write_handle) { + // FIXME: this breaks if 'write_cont' decides to + // call SOCKET_close! if (NULL != socket->write_handle->write_cont) socket->write_handle->write_cont (socket->write_handle->write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); @@ -3543,11 +3546,11 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, case STATE_RECEIVE_CLOSE_WAIT: case STATE_CLOSED: case STATE_CLOSE_WAIT: - proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n", GNUNET_i2s (&socket->other_peer), __func__); + proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); return NULL; default: break; |