diff options
Diffstat (limited to 'src/stream')
-rw-r--r-- | src/stream/Makefile.am | 29 | ||||
-rw-r--r-- | src/stream/Makefile.in | 67 | ||||
-rw-r--r-- | src/stream/README | 12 | ||||
-rw-r--r-- | src/stream/stream_api.c | 2185 | ||||
-rw-r--r-- | src/stream/stream_protocol.h | 8 | ||||
-rw-r--r-- | src/stream/test_stream_2peers.c | 560 | ||||
-rw-r--r-- | src/stream/test_stream_2peers_halfclose.c | 786 | ||||
-rw-r--r-- | src/stream/test_stream_local.c | 272 | ||||
-rw-r--r-- | src/stream/test_stream_local.conf | 14 |
9 files changed, 3258 insertions, 675 deletions
diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am index 385c0cf..8d74417 100644 --- a/src/stream/Makefile.am +++ b/src/stream/Makefile.am @@ -20,8 +20,9 @@ libgnunetstream_la_LDFLAGS = \ $(GN_LIB_LDFLAGS) check_PROGRAMS = \ - test_stream_local -# test_stream_halfclose + test-stream-2peers \ + test-stream-2peers_halfclose \ + test-stream-local EXTRA_DIST = test_stream_local.conf @@ -29,15 +30,23 @@ if ENABLE_TEST_RUN TESTS = $(check_PROGRAMS) endif +test_stream_2peers_SOURCES = \ + test_stream_2peers.c +test_stream_2peers_LDADD = \ + $(top_builddir)/src/stream/libgnunetstream.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la + +test_stream_2peers_halfclose_SOURCES = \ + test_stream_2peers_halfclose.c +test_stream_2peers_halfclose_LDADD = \ + $(top_builddir)/src/stream/libgnunetstream.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la + test_stream_local_SOURCES = \ test_stream_local.c test_stream_local_LDADD = \ $(top_builddir)/src/stream/libgnunetstream.la \ - $(top_builddir)/src/util/libgnunetutil.la - -#test_stream_halfclose_SOURCES = \ -# test_stream_halfclose.c -#test_stream_halfclose_LDADD = \ -# $(top_builddir)/src/stream/libgnunetstream.la \ -# $(top_builddir)/src/util/libgnunetutil.la - + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la
\ No newline at end of file diff --git a/src/stream/Makefile.in b/src/stream/Makefile.in index 44b63f7..9f1c278 100644 --- a/src/stream/Makefile.in +++ b/src/stream/Makefile.in @@ -35,7 +35,9 @@ POST_UNINSTALL = : build_triplet = @build@ host_triplet = @host@ target_triplet = @target@ -check_PROGRAMS = test_stream_local$(EXEEXT) +check_PROGRAMS = test-stream-2peers$(EXEEXT) \ + test-stream-2peers_halfclose$(EXEEXT) \ + test-stream-local$(EXEEXT) subdir = src/stream DIST_COMMON = README $(srcdir)/Makefile.am $(srcdir)/Makefile.in ACLOCAL_M4 = $(top_srcdir)/aclocal.m4 @@ -94,11 +96,26 @@ libgnunetstream_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \ $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CCLD) \ $(AM_CFLAGS) $(CFLAGS) $(libgnunetstream_la_LDFLAGS) \ $(LDFLAGS) -o $@ +am_test_stream_2peers_OBJECTS = test_stream_2peers.$(OBJEXT) +test_stream_2peers_OBJECTS = $(am_test_stream_2peers_OBJECTS) +test_stream_2peers_DEPENDENCIES = \ + $(top_builddir)/src/stream/libgnunetstream.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la +am_test_stream_2peers_halfclose_OBJECTS = \ + test_stream_2peers_halfclose.$(OBJEXT) +test_stream_2peers_halfclose_OBJECTS = \ + $(am_test_stream_2peers_halfclose_OBJECTS) +test_stream_2peers_halfclose_DEPENDENCIES = \ + $(top_builddir)/src/stream/libgnunetstream.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la am_test_stream_local_OBJECTS = test_stream_local.$(OBJEXT) test_stream_local_OBJECTS = $(am_test_stream_local_OBJECTS) test_stream_local_DEPENDENCIES = \ $(top_builddir)/src/stream/libgnunetstream.la \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir) depcomp = $(SHELL) $(top_srcdir)/depcomp am__depfiles_maybe = depfiles @@ -125,8 +142,12 @@ am__v_CCLD_0 = @echo " CCLD " $@; AM_V_GEN = $(am__v_GEN_$(V)) am__v_GEN_ = $(am__v_GEN_$(AM_DEFAULT_VERBOSITY)) am__v_GEN_0 = @echo " GEN " $@; -SOURCES = $(libgnunetstream_la_SOURCES) $(test_stream_local_SOURCES) +SOURCES = $(libgnunetstream_la_SOURCES) $(test_stream_2peers_SOURCES) \ + $(test_stream_2peers_halfclose_SOURCES) \ + $(test_stream_local_SOURCES) DIST_SOURCES = $(libgnunetstream_la_SOURCES) \ + $(test_stream_2peers_SOURCES) \ + $(test_stream_2peers_halfclose_SOURCES) \ $(test_stream_local_SOURCES) ETAGS = etags CTAGS = ctags @@ -188,6 +209,7 @@ INSTALL_SCRIPT = @INSTALL_SCRIPT@ INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ INTLLIBS = @INTLLIBS@ INTL_MACOSX_LIBS = @INTL_MACOSX_LIBS@ +JAVAPORT = @JAVAPORT@ LD = @LD@ LDFLAGS = @LDFLAGS@ LIBADD_DL = @LIBADD_DL@ @@ -221,6 +243,7 @@ LT_DLLOADERS = @LT_DLLOADERS@ LT_DLPREOPEN = @LT_DLPREOPEN@ MAKEINFO = @MAKEINFO@ MKDIR_P = @MKDIR_P@ +MONKEYPREFIX = @MONKEYPREFIX@ MSGFMT = @MSGFMT@ MSGFMT_015 = @MSGFMT_015@ MSGMERGE = @MSGMERGE@ @@ -352,15 +375,31 @@ libgnunetstream_la_LIBADD = \ libgnunetstream_la_LDFLAGS = \ $(GN_LIB_LDFLAGS) -# test_stream_halfclose EXTRA_DIST = test_stream_local.conf @ENABLE_TEST_RUN_TRUE@TESTS = $(check_PROGRAMS) +test_stream_2peers_SOURCES = \ + test_stream_2peers.c + +test_stream_2peers_LDADD = \ + $(top_builddir)/src/stream/libgnunetstream.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la + +test_stream_2peers_halfclose_SOURCES = \ + test_stream_2peers_halfclose.c + +test_stream_2peers_halfclose_LDADD = \ + $(top_builddir)/src/stream/libgnunetstream.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la + test_stream_local_SOURCES = \ test_stream_local.c test_stream_local_LDADD = \ $(top_builddir)/src/stream/libgnunetstream.la \ - $(top_builddir)/src/util/libgnunetutil.la + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/testing/libgnunettesting.la all: all-am @@ -438,8 +477,14 @@ clean-checkPROGRAMS: list=`for p in $$list; do echo "$$p"; done | sed 's/$(EXEEXT)$$//'`; \ echo " rm -f" $$list; \ rm -f $$list -test_stream_local$(EXEEXT): $(test_stream_local_OBJECTS) $(test_stream_local_DEPENDENCIES) - @rm -f test_stream_local$(EXEEXT) +test-stream-2peers$(EXEEXT): $(test_stream_2peers_OBJECTS) $(test_stream_2peers_DEPENDENCIES) + @rm -f test-stream-2peers$(EXEEXT) + $(AM_V_CCLD)$(LINK) $(test_stream_2peers_OBJECTS) $(test_stream_2peers_LDADD) $(LIBS) +test-stream-2peers_halfclose$(EXEEXT): $(test_stream_2peers_halfclose_OBJECTS) $(test_stream_2peers_halfclose_DEPENDENCIES) + @rm -f test-stream-2peers_halfclose$(EXEEXT) + $(AM_V_CCLD)$(LINK) $(test_stream_2peers_halfclose_OBJECTS) $(test_stream_2peers_halfclose_LDADD) $(LIBS) +test-stream-local$(EXEEXT): $(test_stream_local_OBJECTS) $(test_stream_local_DEPENDENCIES) + @rm -f test-stream-local$(EXEEXT) $(AM_V_CCLD)$(LINK) $(test_stream_local_OBJECTS) $(test_stream_local_LDADD) $(LIBS) mostlyclean-compile: @@ -449,6 +494,8 @@ distclean-compile: -rm -f *.tab.c @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream_api.Plo@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_2peers.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_2peers_halfclose.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_local.Po@am__quote@ .c.o: @@ -778,12 +825,6 @@ uninstall-am: uninstall-libLTLIBRARIES uninstall-am uninstall-libLTLIBRARIES -#test_stream_halfclose_SOURCES = \ -# test_stream_halfclose.c -#test_stream_halfclose_LDADD = \ -# $(top_builddir)/src/stream/libgnunetstream.la \ -# $(top_builddir)/src/util/libgnunetutil.la - # Tell versions [3.59,3.63) of GNU make to not export all variables. # Otherwise a system limit (for SysV at least) may be exceeded. .NOEXPORT: diff --git a/src/stream/README b/src/stream/README index 9b550b0..977ca2d 100644 --- a/src/stream/README +++ b/src/stream/README @@ -1,11 +1,11 @@ -The aim of the stream library is to provide stream connections between peers in -GNUnet. This is a convenience library which hides the complexity of dividing -data stream into packets, transmitting them and retransmitting them in case of +Stream library provides stream connections between peers in GNUnet. This is a +convenience library which hides the complexity of dividing data stream into +packets, transmitting them and retransmitting them in case of communication errors. This library's API are similar to unix PIPE API. The user is expected to open a stream to a listening target peer. Once the stream is established, the user can -use it as a pipe. Any data written into the stream will be readable by the -target peer. +use it as a pipe. Any data written into the stream at one peer will be readable +by the other peer and vice versa. -This library uses mesh API for establishing streams between peers.
\ No newline at end of file +This library uses mesh API for establishing tunnels between peers.
\ No newline at end of file diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c index 84fcdfd..dadba33 100644 --- a/src/stream/stream_api.c +++ b/src/stream/stream_api.c @@ -18,17 +18,31 @@ Boston, MA 02111-1307, USA. */ +/* TODO: + * + * Checks for matching the sender and socket->other_peer in server + * message handlers + * + * Add code for write io timeout + * + * Include retransmission for control messages + **/ + /** * @file stream/stream_api.c * @brief Implementation of the stream library * @author Sree Harsha Totakura */ + + #include "platform.h" #include "gnunet_common.h" #include "gnunet_crypto_lib.h" #include "gnunet_stream_lib.h" #include "stream_protocol.h" +#define LOG(kind,...) \ + GNUNET_log_from (kind, "stream-api", __VA_ARGS__) /** * The maximum packet size of a stream packet @@ -36,15 +50,15 @@ #define MAX_PACKET_SIZE 64000 /** - * The maximum payload a data message packet can carry + * Receive buffer */ -static size_t max_payload_size = - MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); +#define RECEIVE_BUFFER_SIZE 4096000 /** - * Receive buffer + * The maximum payload a data message packet can carry */ -#define RECEIVE_BUFFER_SIZE 4096000 +static size_t max_payload_size = + MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage); /** * states in the Protocol @@ -150,12 +164,6 @@ struct MessageQueue */ struct GNUNET_STREAM_Socket { - - /** - * The peer identity of the peer at the other end of the stream - */ - struct GNUNET_PeerIdentity other_peer; - /** * Retransmission timeout */ @@ -177,16 +185,6 @@ struct GNUNET_STREAM_Socket struct GNUNET_TIME_Relative ack_time_deadline; /** - * The task for sending timely Acks - */ - GNUNET_SCHEDULER_TaskIdentifier ack_task_id; - - /** - * Task scheduled to continue a read operation. - */ - GNUNET_SCHEDULER_TaskIdentifier read_task; - - /** * The mesh handle */ struct GNUNET_MESH_Handle *mesh; @@ -212,6 +210,16 @@ struct GNUNET_STREAM_Socket struct GNUNET_MESH_TransmitHandle *transmit_handle; /** + * The current act transmit handle (if a pending ack transmit request exists) + */ + struct GNUNET_MESH_TransmitHandle *ack_transmit_handle; + + /** + * Pointer to the current ack message using in ack_task + */ + struct GNUNET_STREAM_AckMessage *ack_msg; + + /** * The current message associated with the transmit handle */ struct MessageQueue *queue_head; @@ -232,14 +240,45 @@ struct GNUNET_STREAM_Socket struct GNUNET_STREAM_IOReadHandle *read_handle; /** + * The shutdown handle associated with this socket + */ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + /** * Buffer for storing received messages */ void *receive_buffer; /** + * The listen socket from which this socket is derived. Should be NULL if it + * is not a derived socket + */ + struct GNUNET_STREAM_ListenSocket *lsocket; + + /** + * The peer identity of the peer at the other end of the stream + */ + struct GNUNET_PeerIdentity other_peer; + + /** * Task identifier for the read io timeout task */ - GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task; + GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id; + + /** + * Task identifier for retransmission task after timeout + */ + GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id; + + /** + * The task for sending timely Acks + */ + GNUNET_SCHEDULER_TaskIdentifier ack_task_id; + + /** + * Task scheduled to continue a read operation. + */ + GNUNET_SCHEDULER_TaskIdentifier read_task_id; /** * The state of the protocol associated with this socket @@ -257,6 +296,11 @@ struct GNUNET_STREAM_Socket unsigned int retries; /** + * The application port number (type: uint32_t) + */ + GNUNET_MESH_ApplicationType app_port; + + /** * The session id associated with this stream connection * FIXME: Not used currently, may be removed */ @@ -286,7 +330,7 @@ struct GNUNET_STREAM_Socket /** * receiver's available buffer after the last acknowledged packet */ - uint32_t receive_window_available; + uint32_t receiver_window_available; /** * The offset pointer used during write operation @@ -310,7 +354,6 @@ struct GNUNET_STREAM_Socket */ struct GNUNET_STREAM_ListenSocket { - /** * The mesh handle */ @@ -328,6 +371,7 @@ struct GNUNET_STREAM_ListenSocket /** * The service port + * FIXME: Remove if not required! */ GNUNET_MESH_ApplicationType port; }; @@ -339,9 +383,24 @@ struct GNUNET_STREAM_ListenSocket struct GNUNET_STREAM_IOWriteHandle { /** + * The socket to which this write handle is associated + */ + struct GNUNET_STREAM_Socket *socket; + + /** * The packet_buffers associated with this Handle */ - struct GNUNET_STREAM_DataMessage *messages[64]; + struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH]; + + /** + * The write continuation callback + */ + GNUNET_STREAM_CompletionContinuation write_cont; + + /** + * Write continuation closure + */ + void *write_cont_cls; /** * The bitmap of this IOHandle; Corresponding bit for a message is set when @@ -350,11 +409,9 @@ struct GNUNET_STREAM_IOWriteHandle GNUNET_STREAM_AckBitmap ack_bitmap; /** - * Number of packets sent before waiting for an ack - * - * FIXME: Do we need this? + * Number of bytes in this write handle */ - unsigned int sent_packets; + size_t size; }; @@ -376,13 +433,45 @@ struct GNUNET_STREAM_IOReadHandle /** + * Handle for Shutdown + */ +struct GNUNET_STREAM_ShutdownHandle +{ + /** + * The socket associated with this shutdown handle + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Shutdown completion callback + */ + GNUNET_STREAM_ShutdownCompletion completion_cb; + + /** + * Closure for completion callback + */ + void *completion_cls; + + /** + * Close message retransmission task id + */ + GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id; + + /** + * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR + */ + int operation; +}; + + +/** * Default value in seconds for various timeouts */ -static unsigned int default_timeout = 300; +static unsigned int default_timeout = 10; /** - * Callback function for sending hello message + * Callback function for sending queued message * * @param cls closure the socket * @param size number of bytes available in buf @@ -401,31 +490,32 @@ send_message_notify (void *cls, size_t size, void *buf) if (NULL == head) return 0; /* just to be safe */ if (0 == size) /* request timed out */ - { - socket->retries++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Message sending timed out. Retry %d \n", - socket->retries); - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - /* FIXME: exponential backoff */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (head->message->header.size), - &send_message_notify, - socket); - return 0; - } + { + socket->retries++; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Message sending timed out. Retry %d \n", + GNUNET_i2s (&socket->other_peer), + socket->retries); + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (head->message->header.size), + &send_message_notify, + socket); + return 0; + } ret = ntohs (head->message->header.size); GNUNET_assert (size >= ret); memcpy (buf, head->message, ret); if (NULL != head->finish_cb) - { - head->finish_cb (socket, head->finish_cb_cls); - } + { + head->finish_cb (head->finish_cb_cls, socket); + } GNUNET_CONTAINER_DLL_remove (socket->queue_head, socket->queue_tail, head); @@ -433,19 +523,19 @@ send_message_notify (void *cls, size_t size, void *buf) GNUNET_free (head); head = socket->queue_head; if (NULL != head) /* more pending messages to send */ - { - socket->retries = 0; - socket->transmit_handle = - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - /* FIXME: exponential backoff */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (head->message->header.size), - &send_message_notify, - socket); - } + { + socket->retries = 0; + socket->transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + /* FIXME: exponential backoff */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (head->message->header.size), + &send_message_notify, + socket); + } return ret; } @@ -466,6 +556,16 @@ queue_message (struct GNUNET_STREAM_Socket *socket, { struct MessageQueue *queue_entity; + GNUNET_assert + ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA) + && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK)); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Queueing message of type %d and size %d\n", + GNUNET_i2s (&socket->other_peer), + ntohs (message->header.type), + ntohs (message->header.size)); + GNUNET_assert (NULL != message); queue_entity = GNUNET_malloc (sizeof (struct MessageQueue)); queue_entity->message = message; queue_entity->finish_cb = finish_cb; @@ -490,6 +590,31 @@ queue_message (struct GNUNET_STREAM_Socket *socket, /** + * Copies a message and queues it for sending using the mesh connection of + * given socket + * + * @param socket the socket whose mesh connection is used + * @param message the message to be sent + * @param finish_cb the callback to be called when the message is sent + * @param finish_cb_cls the closure for the callback + */ +static void +copy_and_queue_message (struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_STREAM_MessageHeader *message, + SendFinishCallback finish_cb, + void *finish_cb_cls) +{ + struct GNUNET_STREAM_MessageHeader *msg_copy; + uint16_t size; + + size = ntohs (message->header.size); + msg_copy = GNUNET_malloc (size); + memcpy (msg_copy, message, size); + queue_message (socket, msg_copy, finish_cb, finish_cb_cls); +} + + +/** * Callback function for sending ack message * * @param cls closure the ACK message created in ack_task @@ -500,21 +625,56 @@ queue_message (struct GNUNET_STREAM_Socket *socket, static size_t send_ack_notify (void *cls, size_t size, void *buf) { - struct GNUNET_STREAM_AckMessage *ack_msg = cls; + struct GNUNET_STREAM_Socket *socket = cls; if (0 == size) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s called with size 0\n", __func__); - return 0; - } - GNUNET_assert (ack_msg->header.header.size <= size); + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s called with size 0\n", __func__); + return 0; + } + GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size); + + size = ntohs (socket->ack_msg->header.header.size); + memcpy (buf, socket->ack_msg, size); - size = ack_msg->header.header.size; - memcpy (buf, ack_msg, size); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; return size; } +/** + * Writes data using the given socket. The amount of data written is limited by + * the receiver_window_size + * + * @param socket the socket to use + */ +static void +write_data (struct GNUNET_STREAM_Socket *socket); + +/** + * Task for retransmitting data messages if they aren't ACK before their ack + * deadline + * + * @param cls the socket + * @param tc the Task context + */ +static void +retransmission_timeout_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_Socket *socket = cls; + + if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) + return; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer)); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + write_data (socket); +} + /** * Task for sending ACK message @@ -530,12 +690,10 @@ ack_task (void *cls, struct GNUNET_STREAM_AckMessage *ack_msg; if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason) - { - return; - } - - socket->ack_task_id = 0; - + { + return; + } + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; /* Create the ACK Message */ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage)); ack_msg->header.header.size = htons (sizeof (struct @@ -545,18 +703,61 @@ ack_task (void *cls, ack_msg->base_sequence_number = htonl (socket->read_sequence_number); ack_msg->receive_window_remaining = htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size); - + socket->ack_msg = ack_msg; /* Request MESH for sending ACK */ - GNUNET_MESH_notify_transmit_ready (socket->tunnel, - 0, /* Corking */ - 1, /* Priority */ - socket->retransmit_timeout, - &socket->other_peer, - ntohs (ack_msg->header.header.size), - &send_ack_notify, - ack_msg); + socket->ack_transmit_handle = + GNUNET_MESH_notify_transmit_ready (socket->tunnel, + 0, /* Corking */ + 1, /* Priority */ + socket->retransmit_timeout, + &socket->other_peer, + ntohs (ack_msg->header.header.size), + &send_ack_notify, + socket); +} - + +/** + * Retransmission task for shutdown messages + * + * @param cls the shutdown handle + * @param tc the Task Context + */ +static void +close_msg_retransmission_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls; + struct GNUNET_STREAM_MessageHeader *msg; + struct GNUNET_STREAM_Socket *socket; + + GNUNET_assert (NULL != shutdown_handle); + socket = shutdown_handle->socket; + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + switch (shutdown_handle->operation) + { + case SHUT_RDWR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + break; + case SHUT_RD: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + break; + case SHUT_WR: + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + break; + default: + GNUNET_free (msg); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + return; + } + queue_message (socket, msg, NULL, NULL); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + shutdown_handle); } @@ -572,7 +773,7 @@ ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap, unsigned int bit, int value) { - GNUNET_assert (bit < 64); + GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH); if (GNUNET_YES == value) *bitmap |= (1LL << bit); else @@ -591,31 +792,14 @@ static uint8_t ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap, unsigned int bit) { - GNUNET_assert (bit < 64); + GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH); return 0 != (*bitmap & (1LL << bit)); } - -/** - * Function called when Data Message is sent - * - * @param cls the io_handle corresponding to the Data Message - * @param socket the socket which was used - */ -static void -write_data_finish_cb (void *cls, - struct GNUNET_STREAM_Socket *socket) -{ - struct GNUNET_STREAM_IOWriteHandle *io_handle = cls; - - io_handle->sent_packets++; -} - - /** * Writes data using the given socket. The amount of data written is limited by - * the receive_window_size + * the receiver_window_size * * @param socket the socket to use */ @@ -623,43 +807,59 @@ static void write_data (struct GNUNET_STREAM_Socket *socket) { struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle; - unsigned int packet; + int packet; /* Although an int, should never be negative */ int ack_packet; ack_packet = -1; /* Find the last acknowledged packet */ - for (packet=0; packet < 64; packet++) - { - if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, - packet)) - ack_packet = packet; - else if (NULL == io_handle->messages[packet]) - break; - } + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) + ack_packet = packet; + else if (NULL == io_handle->messages[packet]) + break; + } /* Resend packets which weren't ack'ed */ for (packet=0; packet < ack_packet; packet++) + { + if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, + packet)) { - if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap, - packet)) - { - queue_message (socket, - &io_handle->messages[packet]->header, - NULL, - NULL); - } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Placing DATA message with sequence %u in send queue\n", + GNUNET_i2s (&socket->other_peer), + ntohl (io_handle->messages[packet]->sequence_number)); + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); } + } packet = ack_packet + 1; /* Now send new packets if there is enough buffer space */ while ( (NULL != io_handle->messages[packet]) && - (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) ) - { - socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size); - queue_message (socket, - &io_handle->messages[packet]->header, - &write_data_finish_cb, - io_handle); - packet++; - } + (socket->receiver_window_available + >= ntohs (io_handle->messages[packet]->header.header.size)) ) + { + socket->receiver_window_available -= + ntohs (io_handle->messages[packet]->header.header.size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Placing DATA message with sequence %u in send queue\n", + GNUNET_i2s (&socket->other_peer), + ntohl (io_handle->messages[packet]->sequence_number)); + copy_and_queue_message (socket, + &io_handle->messages[packet]->header, + NULL, + NULL); + packet++; + } + if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id) + socket->retransmission_timeout_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 8), + &retransmission_timeout_task, + socket); } @@ -671,7 +871,7 @@ write_data (struct GNUNET_STREAM_Socket *socket) */ static void call_read_processor (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; size_t read_size; @@ -680,87 +880,95 @@ call_read_processor (void *cls, uint32_t sequence_increase; uint32_t offset_increase; - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + if (NULL == socket->receive_buffer) + return; + GNUNET_assert (NULL != socket->read_handle); GNUNET_assert (NULL != socket->read_handle->proc); /* Check the bitmap for any holes */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) - { - if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, - packet)) - break; - } + { + if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap, + packet)) + break; + } /* We only call read processor if we have the first packet */ GNUNET_assert (0 < packet); - valid_read_size = socket->receive_buffer_boundaries[packet-1] - socket->copy_offset; - GNUNET_assert (0 != valid_read_size); - /* Cancel the read_io_timeout_task */ - GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task); - socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; - + GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id); + socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; /* Call the data processor */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Calling read processor\n", + GNUNET_i2s (&socket->other_peer)); read_size = socket->read_handle->proc (socket->read_handle->proc_cls, socket->status, socket->receive_buffer + socket->copy_offset, valid_read_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read processor read %d bytes\n", + GNUNET_i2s (&socket->other_peer), read_size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read processor completed successfully\n", + GNUNET_i2s (&socket->other_peer)); /* Free the read handle */ GNUNET_free (socket->read_handle); socket->read_handle = NULL; - GNUNET_assert (read_size <= valid_read_size); socket->copy_offset += read_size; - /* Determine upto which packet we can remove from the buffer */ for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (socket->copy_offset == socket->receive_buffer_boundaries[packet]) + { packet++; break; } if (socket->copy_offset < socket->receive_buffer_boundaries[packet]) break; + } /* If no packets can be removed we can't move the buffer */ if (0 == packet) return; - sequence_increase = packet; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Sequence increase after read processor completion: %u\n", + GNUNET_i2s (&socket->other_peer), sequence_increase); /* Shift the data in the receive buffer */ memmove (socket->receive_buffer, socket->receive_buffer + socket->receive_buffer_boundaries[sequence_increase-1], - socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]); - + socket->receive_buffer_size + - socket->receive_buffer_boundaries[sequence_increase-1]); /* Shift the bitmap */ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase; - /* Set read_sequence_number */ socket->read_sequence_number += sequence_increase; - /* Set read_offset */ offset_increase = socket->receive_buffer_boundaries[sequence_increase-1]; socket->read_offset += offset_increase; - /* Fix copy_offset */ GNUNET_assert (offset_increase <= socket->copy_offset); socket->copy_offset -= offset_increase; - /* Fix relative boundaries */ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) { - if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase) - { - socket->receive_buffer_boundaries[packet] = - socket->receive_buffer_boundaries[packet + sequence_increase] - - offset_increase; - } - else - socket->receive_buffer_boundaries[packet] = 0; + socket->receive_buffer_boundaries[packet] = + socket->receive_buffer_boundaries[packet + sequence_increase] + - offset_increase; } + else + socket->receive_buffer_boundaries[packet] = 0; + } } @@ -772,20 +980,32 @@ call_read_processor (void *cls, */ static void read_io_timeout (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) + const struct GNUNET_SCHEDULER_TaskContext *tc) { struct GNUNET_STREAM_Socket *socket = cls; + GNUNET_STREAM_DataProcessor proc; + void *proc_cls; - socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK; - if (socket->read_task != GNUNET_SCHEDULER_NO_TASK) + socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { - GNUNET_SCHEDULER_cancel (socket->read_task); - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read task timedout - Cancelling it\n", + GNUNET_i2s (&socket->other_peer)); + GNUNET_SCHEDULER_cancel (socket->read_task_id); + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; } GNUNET_assert (NULL != socket->read_handle); - + proc = socket->read_handle->proc; + proc_cls = socket->read_handle->proc_cls; + GNUNET_free (socket->read_handle); socket->read_handle = NULL; + /* Call the read processor to signal timeout */ + proc (proc_cls, + GNUNET_STREAM_TIMEOUT, + NULL, + 0); } @@ -815,96 +1035,150 @@ handle_data (struct GNUNET_STREAM_Socket *socket, size = htons (msg->header.header.size); if (size < sizeof (struct GNUNET_STREAM_DataMessage)) - { - GNUNET_break_op (0); - return GNUNET_SYSERR; - } + { + GNUNET_break_op (0); + return GNUNET_SYSERR; + } + + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } switch (socket->state) + { + case STATE_ESTABLISHED: + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + + /* check if the message's sequence number is in the range we are + expecting */ + relative_sequence_number = + ntohl (msg->sequence_number) - socket->read_sequence_number; + if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) { - case STATE_ESTABLISHED: - case STATE_TRANSMIT_CLOSED: - case STATE_TRANSMIT_CLOSE_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring received message with sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } + return GNUNET_YES; + } + + /* Check if we have already seen this message */ + if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, + relative_sequence_number)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring already received message with sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, + socket); + } + return GNUNET_YES; + } - /* check if the message's sequence number is in the range we are - expecting */ - relative_sequence_number = - ntohl (msg->sequence_number) - socket->read_sequence_number; - if ( relative_sequence_number > 64) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Ignoring received message with sequence number %d", - ntohl (msg->sequence_number)); - return GNUNET_YES; - } - - /* Check if we have to allocate the buffer */ - size -= sizeof (struct GNUNET_STREAM_DataMessage); - relative_offset = ntohl (msg->offset) - socket->read_offset; - bytes_needed = relative_offset + size; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Receiving DATA with sequence number: %u and size: %d from %s\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number), + ntohs (msg->header.header.size), + GNUNET_i2s (&socket->other_peer)); - if (bytes_needed > socket->receive_buffer_size) - { - if (bytes_needed <= RECEIVE_BUFFER_SIZE) - { - socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, - bytes_needed); - socket->receive_buffer_size = bytes_needed; - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Cannot accommodate packet %d as buffer is full\n", - ntohl (msg->sequence_number)); - return GNUNET_YES; - } - } + /* Check if we have to allocate the buffer */ + size -= sizeof (struct GNUNET_STREAM_DataMessage); + relative_offset = ntohl (msg->offset) - socket->read_offset; + bytes_needed = relative_offset + size; + if (bytes_needed > socket->receive_buffer_size) + { + if (bytes_needed <= RECEIVE_BUFFER_SIZE) + { + socket->receive_buffer = GNUNET_realloc (socket->receive_buffer, + bytes_needed); + socket->receive_buffer_size = bytes_needed; + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Cannot accommodate packet %d as buffer is full\n", + GNUNET_i2s (&socket->other_peer), + ntohl (msg->sequence_number)); + return GNUNET_YES; + } + } - /* Copy Data to buffer */ - payload = &msg[1]; - GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); - memcpy (socket->receive_buffer + relative_offset, - payload, - size); - socket->receive_buffer_boundaries[relative_sequence_number] = - relative_offset + size; + /* Copy Data to buffer */ + payload = &msg[1]; + GNUNET_assert (relative_offset + size <= socket->receive_buffer_size); + memcpy (socket->receive_buffer + relative_offset, + payload, + size); + socket->receive_buffer_boundaries[relative_sequence_number] = + relative_offset + size; - /* Modify the ACK bitmap */ - ackbitmap_modify_bit (&socket->ack_bitmap, - relative_sequence_number, - GNUNET_YES); + /* Modify the ACK bitmap */ + ackbitmap_modify_bit (&socket->ack_bitmap, + relative_sequence_number, + GNUNET_YES); - /* Start ACK sending task if one is not already present */ - if (0 == socket->ack_task_id) - { - socket->ack_task_id = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh - (msg->ack_deadline), - &ack_task, - socket); - } - - if ((NULL != socket->read_handle) /* A read handle is waiting */ - /* There is no current read task */ - && (GNUNET_SCHEDULER_NO_TASK == socket->read_task) - /* We have the first packet */ - && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, - 0))) - { - socket->read_task = - GNUNET_SCHEDULER_add_now (&call_read_processor, + /* Start ACK sending task if one is not already present */ + if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id) + { + socket->ack_task_id = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh + (msg->ack_deadline), + &ack_task, socket); - } - - break; + } - default: - /* FIXME: call statistics */ - break; + if ((NULL != socket->read_handle) /* A read handle is waiting */ + /* There is no current read task */ + && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id) + /* We have the first packet */ + && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, + 0))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Scheduling read processor\n", + GNUNET_i2s (&socket->other_peer)); + + socket->read_task_id = + GNUNET_SCHEDULER_add_now (&call_read_processor, + socket); } + + break; + + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received data message when it cannot be handled\n", + GNUNET_i2s (&socket->other_peer)); + break; + } return GNUNET_YES; } + /** * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA * @@ -919,11 +1193,11 @@ handle_data (struct GNUNET_STREAM_Socket *socket, */ static int client_handle_data (void *cls, - struct GNUNET_MESH_Tunnel *tunnel, - void **tunnel_ctx, - const struct GNUNET_PeerIdentity *sender, - const struct GNUNET_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) + struct GNUNET_MESH_Tunnel *tunnel, + void **tunnel_ctx, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = cls; @@ -945,10 +1219,31 @@ static void set_state_established (void *cls, struct GNUNET_STREAM_Socket *socket) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaining ESTABLISHED state\n", + GNUNET_i2s (&socket->other_peer)); socket->write_offset = 0; socket->read_offset = 0; socket->state = STATE_ESTABLISHED; + /* FIXME: What if listen_cb is NULL */ + if (NULL != socket->lsocket) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Calling listen callback\n", + GNUNET_i2s (&socket->other_peer)); + if (GNUNET_SYSERR == + socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls, + socket, + &socket->other_peer)) + { + socket->state = STATE_CLOSED; + /* FIXME: We should close in a decent way */ + GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */ + GNUNET_free (socket); + } + } + else if (socket->open_cb) + socket->open_cb (socket->open_cls, socket); } @@ -963,12 +1258,115 @@ set_state_hello_wait (void *cls, struct GNUNET_STREAM_Socket *socket) { GNUNET_assert (STATE_INIT == socket->state); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n"); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaining HELLO_WAIT state\n", + GNUNET_i2s (&socket->other_peer)); socket->state = STATE_HELLO_WAIT; } /** + * Callback to set state to CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaing CLOSE_WAIT state\n", + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_CLOSE_WAIT; + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; +} + + +/** + * Callback to set state to RECEIVE_CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_receive_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaing RECEIVE_CLOSE_WAIT state\n", + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_RECEIVE_CLOSE_WAIT; + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; +} + + +/** + * Callback to set state to TRANSMIT_CLOSE_WAIT + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_transmit_close_wait (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Attaing TRANSMIT_CLOSE_WAIT state\n", + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_TRANSMIT_CLOSE_WAIT; +} + + +/** + * Callback to set state to CLOSED + * + * @param cls the closure from queue_message + * @param socket the socket requiring state change + */ +static void +set_state_closed (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + socket->state = STATE_CLOSED; +} + +/** + * Returns a new HelloAckMessage. Also sets the write sequence number for the + * socket + * + * @param socket the socket for which this HelloAckMessage has to be generated + * @return the HelloAckMessage + */ +static struct GNUNET_STREAM_HelloAckMessage * +generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket) +{ + struct GNUNET_STREAM_HelloAckMessage *msg; + + /* Get the random sequence number */ + socket->write_sequence_number = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Generated write sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + (unsigned int) socket->write_sequence_number); + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); + msg->header.header.size = + htons (sizeof (struct GNUNET_STREAM_HelloAckMessage)); + msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); + msg->sequence_number = htonl (socket->write_sequence_number); + msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE); + + return msg; +} + + +/** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK * * @param cls the socket (set from GNUNET_MESH_connect) @@ -992,37 +1390,48 @@ client_handle_hello_ack (void *cls, const struct GNUNET_STREAM_HelloAckMessage *ack_msg; struct GNUNET_STREAM_HelloAckMessage *reply; + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO_ACK from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + GNUNET_assert (socket->tunnel == tunnel); switch (socket->state) { case STATE_HELLO_WAIT: - socket->read_sequence_number = ntohl (ack_msg->sequence_number); - socket->receive_window_available = ntohl (ack_msg->receive_window_size); - /* Get the random sequence number */ - socket->write_sequence_number = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - reply = - GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); - reply->header.header.size = - htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); - reply->sequence_number = htonl (socket->write_sequence_number); - reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE); - queue_message (socket, - &reply->header, - &set_state_established, - NULL); - return GNUNET_OK; + socket->read_sequence_number = ntohl (ack_msg->sequence_number); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + (unsigned int) socket->read_sequence_number); + socket->receiver_window_available = ntohl (ack_msg->receiver_window_size); + reply = generate_hello_ack_msg (socket); + queue_message (socket, + &reply->header, + &set_state_established, + NULL); + return GNUNET_OK; case STATE_ESTABLISHED: case STATE_RECEIVE_CLOSE_WAIT: // call statistics (# ACKs ignored++) return GNUNET_OK; case STATE_INIT: default: - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Server sent HELLO_ACK when in state %d\n", socket->state); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Server %s sent HELLO_ACK when in state %d\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer), + socket->state); socket->state = STATE_CLOSED; // introduce STATE_ERROR? return GNUNET_SYSERR; } @@ -1050,7 +1459,7 @@ client_handle_reset (void *cls, const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information*atsi) { - struct GNUNET_STREAM_Socket *socket = cls; + // struct GNUNET_STREAM_Socket *socket = cls; return GNUNET_OK; } @@ -1077,22 +1486,22 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket, struct GNUNET_STREAM_MessageHeader *reply; switch (socket->state) - { - case STATE_ESTABLISHED: - socket->state = STATE_RECEIVE_CLOSED; + { + case STATE_ESTABLISHED: + socket->state = STATE_RECEIVE_CLOSED; - /* Send TRANSMIT_CLOSE_ACK */ - reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); - reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - queue_message (socket, reply, NULL, NULL); - break; + /* Send TRANSMIT_CLOSE_ACK */ + reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + reply->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK); + reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + queue_message (socket, reply, NULL, NULL); + break; - default: - /* FIXME: Call statistics? */ - break; - } + default: + /* FIXME: Call statistics? */ + break; + } return GNUNET_YES; } @@ -1128,6 +1537,141 @@ client_handle_transmit_close (void *cls, /** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @param operation the close operation which is being ACK'ed + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + int operation) +{ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + shutdown_handle = socket->shutdown_handle; + if (NULL == shutdown_handle) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE_ACK when shutdown handle is NULL\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + switch (operation) + { + case SHUT_RDWR: + switch (socket->state) + { + case STATE_CLOSE_WAIT: + if (SHUT_RDWR != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE_ACK when shutdown handle is not for " + "SHUT_RDWR\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_CLOSED; + break; + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE_ACK when in it not expected\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + break; + + case SHUT_RD: + switch (socket->state) + { + case STATE_RECEIVE_CLOSE_WAIT: + if (SHUT_RD != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received RECEIVE_CLOSE_ACK when shutdown handle " + "is not for SHUT_RD\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received RECEIVE_CLOSE_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_RECEIVE_CLOSED; + break; + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + break; + case SHUT_WR: + switch (socket->state) + { + case STATE_TRANSMIT_CLOSE_WAIT: + if (SHUT_WR != shutdown_handle->operation) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle " + "is not for SHUT_WR\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received TRANSMIT_CLOSE_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + socket->state = STATE_TRANSMIT_CLOSED; + break; + default: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n", + GNUNET_i2s (&socket->other_peer)); + + return GNUNET_OK; + } + break; + default: + GNUNET_assert (0); + } + + if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */ + shutdown_handle->completion_cb(shutdown_handle->completion_cls, + operation); + GNUNET_free (shutdown_handle); /* Free shutdown handle */ + socket->shutdown_handle = NULL; + if (GNUNET_SCHEDULER_NO_TASK + != shutdown_handle->close_msg_retransmission_task_id) + { + GNUNET_SCHEDULER_cancel + (shutdown_handle->close_msg_retransmission_task_id); + shutdown_handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_NO_TASK; + } + return GNUNET_OK; +} + + +/** * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK * * @param cls the socket (set from GNUNET_MESH_connect) @@ -1149,6 +1693,67 @@ client_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_WR); +} + + +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_receive_close (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi) +{ + struct GNUNET_STREAM_MessageHeader *receive_close_ack; + + switch (socket->state) + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + default: + break; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received RECEIVE_CLOSE from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + receive_close_ack = + GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + receive_close_ack->header.size = + htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + receive_close_ack->header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK); + queue_message (socket, + receive_close_ack, + &set_state_closed, + NULL); + + /* FIXME: Handle the case where write handle is present; the write operation + should be deemed as finised and the write continuation callback + has to be called with the stream status GNUNET_STREAM_SHUTDOWN */ return GNUNET_OK; } @@ -1175,7 +1780,12 @@ client_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return + handle_receive_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1201,6 +1811,66 @@ client_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RD); +} + + +/** + * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE + * + * @param socket the socket + * @param tunnel connection to the other end + * @param sender who sent the message + * @param message the actual message + * @param atsi performance data for the connection + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_close (struct GNUNET_STREAM_Socket *socket, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_STREAM_MessageHeader *message, + const struct GNUNET_ATS_Information*atsi) +{ + struct GNUNET_STREAM_MessageHeader *close_ack; + + switch (socket->state) + { + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + default: + break; + } + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received CLOSE from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK); + queue_message (socket, + close_ack, + &set_state_closed, + NULL); + if (socket->state == STATE_CLOSED) + return GNUNET_OK; + + GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */ + socket->receive_buffer = NULL; + socket->receive_buffer_size = 0; return GNUNET_OK; } @@ -1227,7 +1897,11 @@ client_handle_close (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return handle_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1249,11 +1923,17 @@ client_handle_close_ack (void *cls, void **tunnel_ctx, const struct GNUNET_PeerIdentity *sender, const struct GNUNET_MessageHeader *message, - const struct GNUNET_ATS_Information*atsi) + const struct GNUNET_ATS_Information *atsi) { struct GNUNET_STREAM_Socket *socket = cls; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RDWR); } /*****************************/ @@ -1313,31 +1993,39 @@ server_handle_hello (void *cls, struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; struct GNUNET_STREAM_HelloAckMessage *reply; + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } + + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == + ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + if (STATE_INIT == socket->state) - { - /* Get the random sequence number */ - socket->write_sequence_number = - GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX); - reply = - GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage)); - reply->header.header.size = - htons (sizeof (struct GNUNET_STREAM_MessageHeader)); - reply->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK); - reply->sequence_number = htonl (socket->write_sequence_number); - queue_message (socket, - &reply->header, - &set_state_hello_wait, - NULL); - } + { + reply = generate_hello_ack_msg (socket); + queue_message (socket, + &reply->header, + &set_state_hello_wait, + NULL); + } else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client sent HELLO when in state %d\n", socket->state); - /* FIXME: Send RESET? */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client sent HELLO when in state %d\n", socket->state); + /* FIXME: Send RESET? */ - } + } return GNUNET_OK; } @@ -1365,23 +2053,33 @@ server_handle_hello_ack (void *cls, struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; const struct GNUNET_STREAM_HelloAckMessage *ack_message; - ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; + GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK == + ntohs (message->type)); GNUNET_assert (socket->tunnel == tunnel); + ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message; if (STATE_HELLO_WAIT == socket->state) - { - socket->read_sequence_number = ntohl (ack_message->sequence_number); - socket->receive_window_available = - ntohl (ack_message->receive_window_size); - /* Attain ESTABLISHED state */ - set_state_established (NULL, socket); - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received HELLO_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + socket->read_sequence_number = ntohl (ack_message->sequence_number); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Read sequence number %u\n", + GNUNET_i2s (&socket->other_peer), + (unsigned int) socket->read_sequence_number); + socket->receiver_window_available = + ntohl (ack_message->receiver_window_size); + /* Attain ESTABLISHED state */ + set_state_established (NULL, socket); + } else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Client sent HELLO_ACK when in state %d\n", socket->state); - /* FIXME: Send RESET? */ + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client sent HELLO_ACK when in state %d\n", socket->state); + /* FIXME: Send RESET? */ - } + } return GNUNET_OK; } @@ -1406,7 +2104,7 @@ server_handle_reset (void *cls, const struct GNUNET_MessageHeader *message, const struct GNUNET_ATS_Information*atsi) { - struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; + // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; return GNUNET_OK; } @@ -1464,7 +2162,13 @@ server_handle_transmit_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_WR); } @@ -1490,7 +2194,12 @@ server_handle_receive_close (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return + handle_receive_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1516,14 +2225,21 @@ server_handle_receive_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RD); } /** * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE * - * @param cls the closure + * @param cls the listen socket (from GNUNET_MESH_connect in + * GNUNET_STREAM_listen) * @param tunnel connection to the other end * @param tunnel_ctx the socket * @param sender who sent the message @@ -1541,8 +2257,12 @@ server_handle_close (void *cls, const struct GNUNET_ATS_Information*atsi) { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - - return GNUNET_OK; + + return handle_close (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) message, + atsi); } @@ -1568,12 +2288,18 @@ server_handle_close_ack (void *cls, { struct GNUNET_STREAM_Socket *socket = *tunnel_ctx; - return GNUNET_OK; + return handle_generic_close_ack (socket, + tunnel, + sender, + (const struct GNUNET_STREAM_MessageHeader *) + message, + atsi, + SHUT_RDWR); } /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param socket the socket through which the ack was received * @param tunnel connection to the other end @@ -1590,30 +2316,132 @@ handle_ack (struct GNUNET_STREAM_Socket *socket, const struct GNUNET_STREAM_AckMessage *ack, const struct GNUNET_ATS_Information*atsi) { + unsigned int packet; + int need_retransmission; + + + if (0 != memcmp (sender, + &socket->other_peer, + sizeof (struct GNUNET_PeerIdentity))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received ACK from non-confirming peer\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_YES; + } + switch (socket->state) + { + case (STATE_ESTABLISHED): + case (STATE_RECEIVE_CLOSED): + case (STATE_RECEIVE_CLOSE_WAIT): + if (NULL == socket->write_handle) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA_ACK when write_handle is NULL\n", + GNUNET_i2s (&socket->other_peer)); + return GNUNET_OK; + } + /* FIXME: increment in the base sequence number is breaking current flow + */ + if (!((socket->write_sequence_number + - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA_ACK with unexpected base sequence number\n", + GNUNET_i2s (&socket->other_peer)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Current write sequence: %u; Ack's base sequence: %u\n", + GNUNET_i2s (&socket->other_peer), + socket->write_sequence_number, + ntohl (ack->base_sequence_number)); + return GNUNET_OK; + } + /* FIXME: include the case when write_handle is cancelled - ignore the + acks */ + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Received DATA_ACK from %s\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + + /* Cancel the retransmission task */ + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = + GNUNET_SCHEDULER_NO_TASK; + } + + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == socket->write_handle->messages[packet]) break; + if (ntohl (ack->base_sequence_number) + >= ntohl (socket->write_handle->messages[packet]->sequence_number)) + ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, + packet, + GNUNET_YES); + else + if (GNUNET_YES == + ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap, + ntohl (socket->write_handle->messages[packet]->sequence_number) + - ntohl (ack->base_sequence_number))) + ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, + packet, + GNUNET_YES); + } + + /* Update the receive window remaining + FIXME : Should update with the value from a data ack with greater + sequence number */ + socket->receiver_window_available = + ntohl (ack->receive_window_remaining); + + /* Check if we have received all acknowledgements */ + need_retransmission = GNUNET_NO; + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == socket->write_handle->messages[packet]) break; + if (GNUNET_YES != ackbitmap_is_bit_set + (&socket->write_handle->ack_bitmap,packet)) + { + need_retransmission = GNUNET_YES; + break; + } + } + if (GNUNET_YES == need_retransmission) { - case (STATE_ESTABLISHED): - if (NULL == socket->write_handle) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received DATA ACK when write_handle is NULL\n"); - return GNUNET_OK; - } - - socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap); - socket->receive_window_available = - ntohl (ack->receive_window_remaining); write_data (socket); - break; - default: - break; } + else /* We have to call the write continuation callback now */ + { + /* Free the packets */ + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + GNUNET_free_non_null (socket->write_handle->messages[packet]); + } + if (NULL != socket->write_handle->write_cont) + socket->write_handle->write_cont + (socket->write_handle->write_cont_cls, + socket->status, + socket->write_handle->size); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Write completion callback completed\n", + GNUNET_i2s (&socket->other_peer)); + /* We are done with the write handle - Freeing it */ + GNUNET_free (socket->write_handle); + socket->write_handle = NULL; + } + break; + default: + break; + } return GNUNET_OK; } /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param cls the 'struct GNUNET_STREAM_Socket' * @param tunnel connection to the other end @@ -1640,7 +2468,7 @@ client_handle_ack (void *cls, /** - * Message Handler for mesh + * Handler for DATA_ACK messages * * @param cls the server's listen socket * @param tunnel connection to the other end @@ -1738,19 +2566,21 @@ mesh_peer_connect_callback (void *cls, { struct GNUNET_STREAM_Socket *socket = cls; struct GNUNET_STREAM_MessageHeader *message; - - if (0 != memcmp (&socket->other_peer, - peer, + + if (0 != memcmp (peer, + &socket->other_peer, sizeof (struct GNUNET_PeerIdentity))) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "A peer (%s) which is not our target has connected to our tunnel", - GNUNET_i2s (peer)); - return; - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: A peer which is not our target has connected to our tunnel\n", + GNUNET_i2s(peer)); + return; + } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Target peer %s connected\n", GNUNET_i2s (peer)); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Target peer %s connected\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); /* Set state to INIT */ socket->state = STATE_INIT; @@ -1766,14 +2596,10 @@ mesh_peer_connect_callback (void *cls, /* Call open callback */ if (NULL == socket->open_cb) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "STREAM_open callback is NULL\n"); - } - else - { - socket->open_cb (socket->open_cls, socket); - } + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "STREAM_open callback is NULL\n"); + } } @@ -1787,7 +2613,112 @@ static void mesh_peer_disconnect_callback (void *cls, const struct GNUNET_PeerIdentity *peer) { + struct GNUNET_STREAM_Socket *socket=cls; + + /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Other peer %s disconnected \n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); +} + +/** + * Method called whenever a peer creates a tunnel to us + * + * @param cls closure + * @param tunnel new handle to the tunnel + * @param initiator peer that started the tunnel + * @param atsi performance information for the tunnel + * @return initial tunnel context for the tunnel + * (can be NULL -- that's not an error) + */ +static void * +new_tunnel_notify (void *cls, + struct GNUNET_MESH_Tunnel *tunnel, + const struct GNUNET_PeerIdentity *initiator, + const struct GNUNET_ATS_Information *atsi) +{ + struct GNUNET_STREAM_ListenSocket *lsocket = cls; + struct GNUNET_STREAM_Socket *socket; + + /* FIXME: If a tunnel is already created, we should not accept new tunnels + from the same peer again until the socket is closed */ + + socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); + socket->other_peer = *initiator; + socket->tunnel = tunnel; + socket->session_id = 0; /* FIXME */ + socket->state = STATE_INIT; + socket->lsocket = lsocket; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Peer %s initiated tunnel to us\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + + /* FIXME: Copy MESH handle from lsocket to socket */ + + return socket; +} + + +/** + * Function called whenever an inbound tunnel is destroyed. Should clean up + * any associated state. This function is NOT called if the client has + * explicitly asked for the tunnel to be destroyed using + * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on + * the tunnel. + * + * @param cls closure (set from GNUNET_MESH_connect) + * @param tunnel connection to the other end (henceforth invalid) + * @param tunnel_ctx place where local state associated + * with the tunnel is stored + */ +static void +tunnel_cleaner (void *cls, + const struct GNUNET_MESH_Tunnel *tunnel, + void *tunnel_ctx) +{ + struct GNUNET_STREAM_Socket *socket = tunnel_ctx; + + if (tunnel != socket->tunnel) + return; + + GNUNET_break_op(0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: Peer %s has terminated connection abruptly\n", + GNUNET_i2s (&socket->other_peer), + GNUNET_i2s (&socket->other_peer)); + + socket->status = GNUNET_STREAM_SHUTDOWN; + + /* Clear Transmit handles */ + if (NULL != socket->transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } + if (NULL != socket->ack_transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; + } + /* Stop Tasks using socket->tunnel */ + if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id) + { + GNUNET_SCHEDULER_cancel (socket->ack_task_id); + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; + } + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + } + /* FIXME: Cancel all other tasks using socket->tunnel */ + socket->tunnel = NULL; } @@ -1819,55 +2750,167 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg, { struct GNUNET_STREAM_Socket *socket; enum GNUNET_STREAM_Option option; + GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; va_list vargs; /* Variable arguments */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s\n", __func__); socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); socket->other_peer = *target; socket->open_cb = open_cb; socket->open_cls = open_cb_cls; - /* Set defaults */ socket->retransmit_timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout); - va_start (vargs, open_cb_cls); /* Parse variable args */ do { option = va_arg (vargs, enum GNUNET_STREAM_Option); switch (option) - { - case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT: - /* Expect struct GNUNET_TIME_Relative */ - socket->retransmit_timeout = va_arg (vargs, - struct GNUNET_TIME_Relative); - break; - case GNUNET_STREAM_OPTION_END: - break; - } + { + case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT: + /* Expect struct GNUNET_TIME_Relative */ + socket->retransmit_timeout = va_arg (vargs, + struct GNUNET_TIME_Relative); + break; + case GNUNET_STREAM_OPTION_END: + break; + } } while (GNUNET_STREAM_OPTION_END != option); va_end (vargs); /* End of variable args parsing */ - socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */ - 1, /* QUEUE size as parameter? */ + 10, /* QUEUE size as parameter? */ socket, /* cls */ NULL, /* No inbound tunnel handler */ - NULL, /* No inbound tunnel cleaner */ + NULL, /* No in-tunnel cleaner */ client_message_handlers, - NULL); /* We don't get inbound tunnels */ - // FIXME: if (NULL == socket->mesh) ... + ports); /* We don't get inbound tunnels */ + if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */ + { + GNUNET_free (socket); + return NULL; + } /* Now create the mesh tunnel to target */ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Creating MESH Tunnel\n"); socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh, NULL, /* Tunnel context */ &mesh_peer_connect_callback, &mesh_peer_disconnect_callback, socket); - // FIXME: if (NULL == socket->tunnel) ... - + GNUNET_assert (NULL != socket->tunnel); + GNUNET_MESH_peer_request_connect_add (socket->tunnel, + &socket->other_peer); + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); return socket; } /** + * Shutdown the stream for reading or writing (similar to man 2 shutdown). + * + * @param socket the stream socket + * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR + * @param completion_cb the callback that will be called upon successful + * shutdown of given operation + * @param completion_cls the closure for the completion callback + * @return the shutdown handle + */ +struct GNUNET_STREAM_ShutdownHandle * +GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket, + int operation, + GNUNET_STREAM_ShutdownCompletion completion_cb, + void *completion_cls) +{ + struct GNUNET_STREAM_ShutdownHandle *handle; + struct GNUNET_STREAM_MessageHeader *msg; + + GNUNET_assert (NULL == socket->shutdown_handle); + + handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle)); + handle->socket = socket; + handle->completion_cb = completion_cb; + handle->completion_cls = completion_cls; + socket->shutdown_handle = handle; + + msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader)); + msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader)); + switch (operation) + { + case SHUT_RD: + handle->operation = SHUT_RD; + if (NULL != socket->read_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing read handle should be cancelled before shutting" + " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE); + queue_message (socket, + msg, + &set_state_receive_close_wait, + NULL); + break; + case SHUT_WR: + handle->operation = SHUT_WR; + if (NULL != socket->write_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE); + queue_message (socket, + msg, + &set_state_transmit_close_wait, + NULL); + break; + case SHUT_RDWR: + handle->operation = SHUT_RDWR; + if (NULL != socket->write_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing write handle should be cancelled before shutting" + " down writing\n"); + if (NULL != socket->read_handle) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Existing read handle should be cancelled before shutting" + " down reading\n"); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE); + queue_message (socket, + msg, + &set_state_close_wait, + NULL); + break; + default: + LOG (GNUNET_ERROR_TYPE_WARNING, + "GNUNET_STREAM_shutdown called with invalid value for " + "parameter operation -- Ignoring\n"); + GNUNET_free (msg); + GNUNET_free (handle); + return NULL; + } + handle->close_msg_retransmission_task_id = + GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout, + &close_msg_retransmission_task, + handle); + return handle; +} + + +/** + * Cancels a pending shutdown + * + * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown + */ +void +GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle) +{ + if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id) + GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id); + GNUNET_free (handle); + return; +} + + +/** * Closes the stream * * @param socket the stream socket @@ -1877,20 +2920,45 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) { struct MessageQueue *head; - if (socket->read_task != GNUNET_SCHEDULER_NO_TASK) + if (NULL != socket->read_handle) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Closing STREAM socket when a read handle is pending\n"); + } + if (NULL != socket->write_handle) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Closing STREAM socket when a write handle is pending\n"); + } + + if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK) { /* socket closed with read task pending!? */ GNUNET_break (0); - GNUNET_SCHEDULER_cancel (socket->read_task); - socket->read_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_SCHEDULER_cancel (socket->read_task_id); + socket->read_task_id = GNUNET_SCHEDULER_NO_TASK; + } + + /* Terminate the ack'ing tasks if they are still present */ + if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (socket->ack_task_id); + socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK; } /* Clear Transmit handles */ if (NULL != socket->transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); - socket->transmit_handle = NULL; - } + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); + socket->transmit_handle = NULL; + } + if (NULL != socket->ack_transmit_handle) + { + GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle); + GNUNET_free (socket->ack_msg); + socket->ack_msg = NULL; + socket->ack_transmit_handle = NULL; + } /* Clear existing message queue */ while (NULL != (head = socket->queue_head)) { @@ -1903,101 +2971,29 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket) /* Close associated tunnel */ if (NULL != socket->tunnel) - { - GNUNET_MESH_tunnel_destroy (socket->tunnel); - socket->tunnel = NULL; - } + { + GNUNET_MESH_tunnel_destroy (socket->tunnel); + socket->tunnel = NULL; + } /* Close mesh connection */ - if (NULL != socket->mesh) - { - GNUNET_MESH_disconnect (socket->mesh); - socket->mesh = NULL; - } + if (NULL != socket->mesh && NULL == socket->lsocket) + { + GNUNET_MESH_disconnect (socket->mesh); + socket->mesh = NULL; + } /* Release receive buffer */ if (NULL != socket->receive_buffer) - { - GNUNET_free (socket->receive_buffer); - } + { + GNUNET_free (socket->receive_buffer); + } GNUNET_free (socket); } /** - * Method called whenever a peer creates a tunnel to us - * - * @param cls closure - * @param tunnel new handle to the tunnel - * @param initiator peer that started the tunnel - * @param atsi performance information for the tunnel - * @return initial tunnel context for the tunnel - * (can be NULL -- that's not an error) - */ -static void * -new_tunnel_notify (void *cls, - struct GNUNET_MESH_Tunnel *tunnel, - const struct GNUNET_PeerIdentity *initiator, - const struct GNUNET_ATS_Information *atsi) -{ - struct GNUNET_STREAM_ListenSocket *lsocket = cls; - struct GNUNET_STREAM_Socket *socket; - - socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket)); - socket->tunnel = tunnel; - socket->session_id = 0; /* FIXME */ - socket->other_peer = *initiator; - socket->state = STATE_INIT; - - if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls, - socket, - &socket->other_peer)) - { - socket->state = STATE_CLOSED; - /* FIXME: Send CLOSE message and then free */ - GNUNET_free (socket); - GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */ - } - return socket; -} - - -/** - * Function called whenever an inbound tunnel is destroyed. Should clean up - * any associated state. This function is NOT called if the client has - * explicitly asked for the tunnel to be destroyed using - * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on - * the tunnel. - * - * @param cls closure (set from GNUNET_MESH_connect) - * @param tunnel connection to the other end (henceforth invalid) - * @param tunnel_ctx place where local state associated - * with the tunnel is stored - */ -static void -tunnel_cleaner (void *cls, - const struct GNUNET_MESH_Tunnel *tunnel, - void *tunnel_ctx) -{ - struct GNUNET_STREAM_Socket *socket = tunnel_ctx; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer %s has terminated connection abruptly\n", - GNUNET_i2s (&socket->other_peer)); - - socket->status = GNUNET_STREAM_SHUTDOWN; - /* Clear Transmit handles */ - if (NULL != socket->transmit_handle) - { - GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle); - socket->transmit_handle = NULL; - } - socket->tunnel = NULL; -} - - -/** * Listens for stream connections for a specific application ports * * @param cfg the configuration to use @@ -2015,10 +3011,8 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, { /* FIXME: Add variable args for passing configration options? */ struct GNUNET_STREAM_ListenSocket *lsocket; - GNUNET_MESH_ApplicationType app_types[2]; + GNUNET_MESH_ApplicationType ports[] = {app_port, 0}; - app_types[0] = app_port; - app_types[1] = 0; lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket)); lsocket->port = app_port; lsocket->listen_cb = listen_cb; @@ -2029,7 +3023,8 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg, &new_tunnel_notify, &tunnel_cleaner, server_message_handlers, - app_types); + ports); + GNUNET_assert (NULL != lsocket->mesh); return lsocket; } @@ -2043,6 +3038,7 @@ void GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) { /* Close MESH connection */ + GNUNET_assert (NULL != lsocket->mesh); GNUNET_MESH_disconnect (lsocket->mesh); GNUNET_free (lsocket); @@ -2050,15 +3046,22 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket) /** - * Tries to write the given data to the stream + * Tries to write the given data to the stream. The maximum size of data that + * can be written as part of a write operation is (64 * (64000 - sizeof (struct + * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API + * violation, however only the said number of maximum bytes will be written. * * @param socket the socket representing a stream * @param data the data buffer from where the data is written into the stream * @param size the number of bytes to be written from the data buffer * @param timeout the timeout period - * @param write_cont the function to call upon writing some bytes into the stream + * @param write_cont the function to call upon writing some bytes into the + * stream * @param write_cont_cls the closure - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if a previous write is pending or + * the stream has been shutdown for this operation then write_cont is + * immediately called and NULL is returned. */ struct GNUNET_STREAM_IOWriteHandle * GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, @@ -2075,6 +3078,10 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, uint32_t payload_size; struct GNUNET_STREAM_DataMessage *data_msg; const void *sweep; + struct GNUNET_TIME_Relative ack_deadline; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s\n", __func__); /* Return NULL if there is already a write request pending */ if (NULL != socket->write_handle) @@ -2082,70 +3089,101 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket, GNUNET_break (0); return NULL; } - if (!((STATE_ESTABLISHED == socket->state) - || (STATE_RECEIVE_CLOSE_WAIT == socket->state) - || (STATE_RECEIVE_CLOSED == socket->state))) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Attempting to write on a closed (OR) not-yet-established" - "stream\n"); - return NULL; - } + + switch (socket->state) + { + case STATE_TRANSMIT_CLOSED: + case STATE_TRANSMIT_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + if (NULL != write_cont) + write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return NULL; + case STATE_INIT: + case STATE_LISTEN: + case STATE_HELLO_WAIT: + if (NULL != write_cont) + /* FIXME: GNUNET_STREAM_SYSERR?? */ + write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return NULL; + case STATE_ESTABLISHED: + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + break; + } + if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size) size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size; num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size; io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle)); + io_handle->socket = socket; + io_handle->write_cont = write_cont; + io_handle->write_cont_cls = write_cont_cls; + io_handle->size = size; sweep = data; + /* FIXME: Remove the fixed delay for ack deadline; Set it to the value + determined from RTT */ + ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5); /* Divide the given buffer into packets for sending */ for (packet=0; packet < num_needed_packets; packet++) + { + if ((packet + 1) * max_payload_size < size) + { + payload_size = max_payload_size; + packet_size = MAX_PACKET_SIZE; + } + else { - if ((packet + 1) * max_payload_size < size) - { - payload_size = max_payload_size; - packet_size = MAX_PACKET_SIZE; - } - else - { - payload_size = size - packet * max_payload_size; - packet_size = payload_size + sizeof (struct - GNUNET_STREAM_DataMessage); - } - io_handle->messages[packet] = GNUNET_malloc (packet_size); - io_handle->messages[packet]->header.header.size = htons (packet_size); - io_handle->messages[packet]->header.header.type = - htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); - io_handle->messages[packet]->sequence_number = - htons (socket->write_sequence_number++); - io_handle->messages[packet]->offset = htons (socket->write_offset); - - /* FIXME: Remove the fixed delay for ack deadline; Set it to the value - determined from RTT */ - io_handle->messages[packet]->ack_deadline = - GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5)); - data_msg = io_handle->messages[packet]; - /* Copy data from given buffer to the packet */ - memcpy (&data_msg[1], - sweep, - payload_size); - sweep += payload_size; - socket->write_offset += payload_size; + payload_size = size - packet * max_payload_size; + packet_size = payload_size + sizeof (struct + GNUNET_STREAM_DataMessage); } + io_handle->messages[packet] = GNUNET_malloc (packet_size); + io_handle->messages[packet]->header.header.size = htons (packet_size); + io_handle->messages[packet]->header.header.type = + htons (GNUNET_MESSAGE_TYPE_STREAM_DATA); + io_handle->messages[packet]->sequence_number = + htonl (socket->write_sequence_number++); + io_handle->messages[packet]->offset = htonl (socket->write_offset); + + /* FIXME: Remove the fixed delay for ack deadline; Set it to the value + determined from RTT */ + io_handle->messages[packet]->ack_deadline = + GNUNET_TIME_relative_hton (ack_deadline); + data_msg = io_handle->messages[packet]; + /* Copy data from given buffer to the packet */ + memcpy (&data_msg[1], + sweep, + payload_size); + sweep += payload_size; + socket->write_offset += payload_size; + } socket->write_handle = io_handle; write_data (socket); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s() END\n", __func__); + return io_handle; } + /** - * Tries to read data from the stream + * Tries to read data from the stream. * * @param socket the socket representing a stream * @param timeout the timeout period * @param proc function to call with data (once only) * @param proc_cls the closure for proc - * @return handle to cancel the operation + * + * @return handle to cancel the operation; if the stream has been shutdown for + * this type of opeartion then the DataProcessor is immediately + * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned */ struct GNUNET_STREAM_IOReadHandle * GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, @@ -2155,26 +3193,99 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket, { struct GNUNET_STREAM_IOReadHandle *read_handle; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: %s()\n", + GNUNET_i2s (&socket->other_peer), + __func__); + /* Return NULL if there is already a read handle; the user has to cancel that - first before continuing or has to wait until it is completed */ + first before continuing or has to wait until it is completed */ if (NULL != socket->read_handle) return NULL; + GNUNET_assert (NULL != proc); + + switch (socket->state) + { + case STATE_RECEIVE_CLOSED: + case STATE_RECEIVE_CLOSE_WAIT: + case STATE_CLOSED: + case STATE_CLOSE_WAIT: + proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: %s() END\n", + GNUNET_i2s (&socket->other_peer), + __func__); + return NULL; + default: + break; + } + read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle)); read_handle->proc = proc; + read_handle->proc_cls = proc_cls; socket->read_handle = read_handle; /* Check if we have a packet at bitmap 0 */ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap, 0)) - { - socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor, - socket); + { + socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor, + socket); - } + } /* Setup the read timeout task */ - socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, - &read_io_timeout, - socket); + socket->read_io_timeout_task_id = + GNUNET_SCHEDULER_add_delayed (timeout, + &read_io_timeout, + socket); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "%s: %s() END\n", + GNUNET_i2s (&socket->other_peer), + __func__); return read_handle; } + + +/** + * Cancel pending write operation. + * + * @param ioh handle to operation to cancel + */ +void +GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh) +{ + struct GNUNET_STREAM_Socket *socket = ioh->socket; + unsigned int packet; + + GNUNET_assert (NULL != socket->write_handle); + GNUNET_assert (socket->write_handle == ioh); + + if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id) + { + GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id); + socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK; + } + + for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++) + { + if (NULL == ioh->messages[packet]) break; + GNUNET_free (ioh->messages[packet]); + } + + GNUNET_free (socket->write_handle); + socket->write_handle = NULL; + return; +} + + +/** + * Cancel pending read operation. + * + * @param ioh handle to operation to cancel + */ +void +GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh) +{ + return; +} diff --git a/src/stream/stream_protocol.h b/src/stream/stream_protocol.h index 0c1987e..0c5376f 100644 --- a/src/stream/stream_protocol.h +++ b/src/stream/stream_protocol.h @@ -127,10 +127,8 @@ struct GNUNET_STREAM_AckMessage GNUNET_STREAM_AckBitmap bitmap GNUNET_PACKED; /** - * The sequence number of the Data Message upto which the receiver has filled - * its buffer without any missing packets - * - * FIXME: Do we need this? + * The sequence number of the next Data Message receiver is + * anticipating. Data messages less than this number are received by receiver */ uint32_t base_sequence_number GNUNET_PACKED; @@ -163,7 +161,7 @@ struct GNUNET_STREAM_HelloAckMessage * * FIXME: Remove if not needed */ - uint32_t receive_window_size; + uint32_t receiver_window_size; }; diff --git a/src/stream/test_stream_2peers.c b/src/stream/test_stream_2peers.c new file mode 100644 index 0000000..1fdc0ee --- /dev/null +++ b/src/stream/test_stream_2peers.c @@ -0,0 +1,560 @@ +/* + This file is part of GNUnet. + (C) 2011, 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file stream/test_stream_2peers.c + * @brief Stream API testing between 2 peers using testing API + * @author Sree Harsha Totakura + */ + +#include <string.h> + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_mesh_service.h" +#include "gnunet_stream_lib.h" +#include "gnunet_testing_lib.h" + +#define VERBOSE 1 + +/** + * Number of peers + */ +#define NUM_PEERS 2 + +/** + * Structure for holding peer's sockets and IO Handles + */ +struct PeerData +{ + /** + * Peer's stream socket + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Peer's io write handle + */ + struct GNUNET_STREAM_IOWriteHandle *io_write_handle; + + /** + * Peer's io read handle + */ + struct GNUNET_STREAM_IOReadHandle *io_read_handle; + + /** + * Peer's shutdown handle + */ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + /** + * Our Peer id + */ + struct GNUNET_PeerIdentity our_id; + + /** + * Bytes the peer has written + */ + unsigned int bytes_wrote; + + /** + * Byte the peer has read + */ + unsigned int bytes_read; +}; + +/** + * The current peer group + */ +static struct GNUNET_TESTING_PeerGroup *pg; + +/** + * Peer 1 daemon + */ +static struct GNUNET_TESTING_Daemon *d1; + +/** + * Peer 2 daemon + */ +static struct GNUNET_TESTING_Daemon *d2; + +static struct PeerData peer1; +static struct PeerData peer2; +static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; +static struct GNUNET_CONFIGURATION_Handle *config; + +static GNUNET_SCHEDULER_TaskIdentifier abort_task; + +static char *data = "ABCD"; +static int result; + +static int writing_success; +static int reading_success; + + +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size); + +/** + * Task for calling STREAM_read + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_read_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_read_handle = GNUNET_STREAM_read (peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + peer); + GNUNET_assert (NULL != peer->io_read_handle); +} + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size); + + +/** + * Task for calling STREAM_write + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_write_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_write_handle = + GNUNET_STREAM_write (peer->socket, + (void *) data, + strlen(data) - peer->bytes_wrote, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + peer); + + GNUNET_assert (NULL != peer->io_write_handle); + } + + +/** + * Check whether peers successfully shut down. + */ +static void +peergroup_shutdown_callback (void *cls, const char *emsg) +{ + if (emsg != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Shutdown of peers failed!\n"); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "All peers successfully shut down!\n"); + } + GNUNET_CONFIGURATION_destroy (config); +} + + +/** + * Close sockets and stop testing deamons nicely + */ +static void +do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if (NULL != peer1.socket) + GNUNET_STREAM_close (peer1.socket); + if (NULL != peer2.socket) + GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); + if (0 != abort_task) + { + GNUNET_SCHEDULER_cancel (abort_task); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); + + GNUNET_TESTING_daemons_stop (pg, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &peergroup_shutdown_callback, + NULL); +} + + +/** + * Completion callback for shutdown + * + * @param cls the closure from GNUNET_STREAM_shutdown call + * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, + * SHUT_RDWR) + */ +static void +shutdown_completion (void *cls, + int operation) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "STREAM shutdown successful\n"); + GNUNET_SCHEDULER_add_now (&do_close, + cls); +} + + + +/** + * Shutdown sockets gracefully + */ +static void +do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, + SHUT_RDWR, + &shutdown_completion, + cls); +} + + +/** + * Something went wrong and timed out. Kill everything and set error flag + */ +static void +do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n"); + result = GNUNET_SYSERR; + abort_task = 0; + do_close (cls, tc); +} + + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) +{ + struct PeerData *peer=cls; + + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + peer->bytes_wrote += size; + + if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ + { + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Writing completed\n"); + + if (&peer1 == peer) /* Peer1 has finished writing; should read now */ + { + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + } + else + { + writing_success = GNUNET_YES; + if (GNUNET_YES == reading_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + } + } +} + + +/** + * Function executed after stream has been established + * + * @param cls the closure from GNUNET_STREAM_open + * @param socket socket to use to communicate with the other side (read/write) + */ +static void +stream_open_cb (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + struct PeerData *peer=cls; + + GNUNET_assert (&peer1 == peer); + GNUNET_assert (socket == peer1.socket); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Stream established from peer1\n", + GNUNET_i2s (&peer1.our_id)); + peer->bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); +} + + +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size) +{ + struct PeerData *peer; + + peer = (struct PeerData *) cls; + + if (GNUNET_STREAM_TIMEOUT == status) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Read operation timedout - reading again!\n"); + GNUNET_assert (0 == size); + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + return 0; + } + + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, + (const char *) input_data, + size)); + peer->bytes_read += size; + + if (peer->bytes_read < strlen (data)) + { + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + } + else + { + if (&peer2 == peer) /* Peer2 has completed reading; should write */ + { + peer->bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); + } + else /* Peer1 has completed reading. End of tests */ + { + reading_success = GNUNET_YES; + if (GNUNET_YES == writing_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + } + } + return size; +} + + +/** + * Functions of this type are called upon new stream connection from other peers + * + * @param cls the closure from GNUNET_STREAM_listen + * @param socket the socket representing the stream + * @param initiator the identity of the peer who wants to establish a stream + * with us + * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the + * stream (the socket will be invalid after the call) + */ +static int +stream_listen_cb (void *cls, + struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_PeerIdentity *initiator) +{ + GNUNET_assert (NULL != socket); + GNUNET_assert (NULL != initiator); + GNUNET_assert (socket != peer1.socket); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Peer connected: %s\n", + GNUNET_i2s (&peer2.our_id), + GNUNET_i2s(initiator)); + + peer2.socket = socket; + peer2.bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); + return GNUNET_OK; +} + + +/** + * Callback to be called when testing peer group is ready + * + * @param cls NULL + * @param emsg NULL on success + */ +void +peergroup_ready (void *cls, const char *emsg) +{ + if (NULL != emsg) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Starting peer group failed: %s\n", emsg); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer group is now ready\n"); + + GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg)); + + d1 = GNUNET_TESTING_daemon_get (pg, 0); + GNUNET_assert (NULL != d1); + + d2 = GNUNET_TESTING_daemon_get (pg, 1); + GNUNET_assert (NULL != d2); + + GNUNET_TESTING_get_peer_identity (d1->cfg, + &peer1.our_id); + GNUNET_TESTING_get_peer_identity (d2->cfg, + &peer2.our_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s : %s\n", + GNUNET_i2s (&peer1.our_id), + GNUNET_i2s (&d1->id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s : %s\n", + GNUNET_i2s (&peer2.our_id), + GNUNET_i2s (&d2->id)); + + peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg, + 10, /* App port */ + &stream_listen_cb, + NULL); + GNUNET_assert (NULL != peer2_listen_socket); + + /* Connect to stream library */ + peer1.socket = GNUNET_STREAM_open (d1->cfg, + &d2->id, /* Null for local peer? */ + 10, /* App port */ + &stream_open_cb, + &peer1, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer1.socket); +} + + +/** + * Initialize framework and start test + */ +static void +run (void *cls, char *const *args, const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */ + + GNUNET_log_setup ("test_stream_2peers", + "DEBUG", + NULL); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting test\n"); + /* Duplicate the configuration */ + config = GNUNET_CONFIGURATION_dup (cfg); + + hosts = GNUNET_TESTING_hosts_load (config); + + pg = GNUNET_TESTING_peergroup_start (config, + 2, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 3), + NULL, + &peergroup_ready, + NULL, + hosts); + GNUNET_assert (NULL != pg); + + abort_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort, + NULL); +} + +/** + * Main function + */ +int main (int argc, char **argv) +{ + int ret; + + char *argv2[] = { "test-stream-2peers", + "-L", "DEBUG", + "-c", "test_stream_local.conf", + NULL}; + + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_OPTION_END + }; + + ret = + GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, + "test-stream-2peers", "nohelp", options, &run, NULL); + + if (GNUNET_OK != ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n", + ret); + return 1; + } + if (GNUNET_SYSERR == result) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n"); + return 1; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n"); + return 0; +} diff --git a/src/stream/test_stream_2peers_halfclose.c b/src/stream/test_stream_2peers_halfclose.c new file mode 100644 index 0000000..7997c20 --- /dev/null +++ b/src/stream/test_stream_2peers_halfclose.c @@ -0,0 +1,786 @@ +/* + This file is part of GNUnet. + (C) 2011, 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file stream/test_stream_2peers_halfclose.c + * @brief Testcases for Stream API halfclosed connections between 2 peers + * @author Sree Harsha Totakura + */ + +#include <string.h> + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_mesh_service.h" +#include "gnunet_stream_lib.h" +#include "gnunet_testing_lib.h" +#include "gnunet_scheduler_lib.h" + +#define VERBOSE 1 + +/** + * Number of peers + */ +#define NUM_PEERS 2 + +/** + * Structure for holding peer's sockets and IO Handles + */ +struct PeerData +{ + /** + * Peer's stream socket + */ + struct GNUNET_STREAM_Socket *socket; + + /** + * Peer's io write handle + */ + struct GNUNET_STREAM_IOWriteHandle *io_write_handle; + + /** + * Peer's io read handle + */ + struct GNUNET_STREAM_IOReadHandle *io_read_handle; + + /** + * Peer's shutdown handle + */ + struct GNUNET_STREAM_ShutdownHandle *shutdown_handle; + + /** + * Our Peer id + */ + struct GNUNET_PeerIdentity our_id; + + /** + * Bytes the peer has written + */ + unsigned int bytes_wrote; + + /** + * Byte the peer has read + */ + unsigned int bytes_read; + + /** + * GNUNET_YES if the peer has successfully completed the current test + */ + unsigned int test_ok; + + /** + * The shutdown operation that has to be used by the stream_shutdown_task + */ + int shutdown_operation; +}; + +/** + * The current peer group + */ +static struct GNUNET_TESTING_PeerGroup *pg; + +/** + * Peer 1 daemon + */ +static struct GNUNET_TESTING_Daemon *d1; + +/** + * Peer 2 daemon + */ +static struct GNUNET_TESTING_Daemon *d2; + + +/** + * Peer1 writes first and then calls for SHUT_WR + * Peer2 reads first and then calls for SHUT_RD + * Attempt to write again by Peer1 should be rejected + * Attempt to read again by Peer2 should be rejected + * Peer1 then reads from Peer2 which writes + */ +static struct PeerData peer1; +static struct PeerData peer2; +static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; +static struct GNUNET_CONFIGURATION_Handle *config; + +static GNUNET_SCHEDULER_TaskIdentifier abort_task; +static GNUNET_SCHEDULER_TaskIdentifier read_task; + +static char *data = "ABCD"; +static int result; + +/** + * Enumeration for various tests that are to be passed in the same order as + * below + */ +enum Test + { + /** + * Peer1 writing; Peer2 reading + */ + PEER1_WRITE, + + /** + * Peer1 write shutdown; Peer2 should get an error when it tries to read; + */ + PEER1_WRITE_SHUTDOWN, + + /** + * Peer1 reads; Peer2 writes (connection is halfclosed) + */ + PEER1_HALFCLOSE_READ, + + /** + * Peer1 attempts to write; Should fail with stream already shutdown error + */ + PEER1_HALFCLOSE_WRITE_FAIL, + + /** + * Peer1 read shutdown; Peer2 should get stream shutdown error during write + */ + PEER1_READ_SHUTDOWN, + + /** + * All tests successfully finished + */ + SUCCESS + }; + +/** + * Current running test + */ +enum Test current_test; + +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size); + + +/** + * The transition function; responsible for the transitions among tests + */ +static void +transition(); + + +/** + * Task for calling STREAM_read + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_read_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_read_handle = GNUNET_STREAM_read (peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + cls); + switch (current_test) + { + case PEER1_WRITE_SHUTDOWN: + GNUNET_assert (&peer2 == peer); + GNUNET_assert (NULL == peer->io_read_handle); + transition (); /* to PEER1_HALFCLOSE_READ */ + break; + default: + GNUNET_assert (NULL != peer->io_read_handle); + } +} + + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size); + + +/** + * Task for calling STREAM_write + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_write_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_write_handle = + GNUNET_STREAM_write (peer->socket, + (void *) data, + strlen(data) - peer->bytes_wrote, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + peer); + switch (current_test) + { + case PEER1_HALFCLOSE_WRITE_FAIL: + GNUNET_assert (&peer1 == peer); + GNUNET_assert (NULL == peer->io_write_handle); + transition(); /* To PEER1_READ_SHUTDOWN */ + break; + case PEER1_READ_SHUTDOWN: + GNUNET_assert (&peer2 == peer); + GNUNET_assert (NULL == peer->io_write_handle); + transition (); /* To SUCCESS */ + break; + default: + GNUNET_assert (NULL != peer->io_write_handle); + } +} + + +/** + * Check whether peers successfully shut down. + */ +static void +peergroup_shutdown_callback (void *cls, const char *emsg) +{ + if (emsg != NULL) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Shutdown of peers failed!\n"); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "All peers successfully shut down!\n"); + } + GNUNET_CONFIGURATION_destroy (config); +} + + +/** + * Close sockets and stop testing deamons nicely + */ +static void +do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if (NULL != peer1.socket) + GNUNET_STREAM_close (peer1.socket); + if (NULL != peer2.socket) + GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); + if (0 != abort_task) + { + GNUNET_SCHEDULER_cancel (abort_task); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); + + GNUNET_TESTING_daemons_stop (pg, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &peergroup_shutdown_callback, + NULL); +} + + +/** + * Completion callback for shutdown + * + * @param cls the closure from GNUNET_STREAM_shutdown call + * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR, + * SHUT_RDWR) + */ +void +shutdown_completion (void *cls, + int operation) +{ + switch (current_test) + { + case PEER1_WRITE: + GNUNET_assert (0); + case PEER1_WRITE_SHUTDOWN: + GNUNET_assert (cls == &peer1); + GNUNET_assert (SHUT_WR == operation); + peer1.test_ok = GNUNET_YES; + /* Peer2 should read with error */ + peer2.bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); + break; + case PEER1_READ_SHUTDOWN: + peer1.test_ok = GNUNET_YES; + peer2.bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2); + break; + case PEER1_HALFCLOSE_READ: + case PEER1_HALFCLOSE_WRITE_FAIL: + case SUCCESS: + GNUNET_assert (0); /* We shouldn't reach here */ + } +} + + +/** + * Task for calling STREAM_shutdown + * + * @param cls the peer entity + * @param tc the TaskContext + */ +static void +stream_shutdown_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->shutdown_handle = GNUNET_STREAM_shutdown (peer->socket, + peer->shutdown_operation, + &shutdown_completion, + peer); + GNUNET_assert (NULL != peer->shutdown_handle); +} + + +/** + * Something went wrong and timed out. Kill everything and set error flag + */ +static void +do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n"); + if (0 != read_task) + { + GNUNET_SCHEDULER_cancel (read_task); + } + result = GNUNET_SYSERR; + abort_task = 0; + do_close (cls, tc); +} + + +/** + * The transition function; responsible for the transitions among tests + */ +static void +transition() +{ + if ((GNUNET_YES == peer1.test_ok) && (GNUNET_YES == peer2.test_ok)) + { + peer1.test_ok = GNUNET_NO; + peer2.test_ok = GNUNET_NO; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "TEST %d SUCCESSFULL\n", current_test); + switch (current_test) + { + case PEER1_WRITE: + current_test = PEER1_WRITE_SHUTDOWN; + /* Peer1 should shutdown writing */ + peer1.shutdown_operation = SHUT_WR; + GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1); + break; + case PEER1_WRITE_SHUTDOWN: + current_test = PEER1_HALFCLOSE_READ; + /* Peer2 should be able to write successfully */ + peer2.bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2); + + /* Peer1 should be able to read successfully */ + peer1.bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1); + break; + case PEER1_HALFCLOSE_READ: + current_test = PEER1_HALFCLOSE_WRITE_FAIL; + peer1.bytes_wrote = 0; + peer2.bytes_read = 0; + peer2.test_ok = GNUNET_YES; + GNUNET_SCHEDULER_add_now (&stream_write_task, &peer1); + break; + case PEER1_HALFCLOSE_WRITE_FAIL: + current_test = PEER1_READ_SHUTDOWN; + peer1.shutdown_operation = SHUT_RD; + GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1); + break; + case PEER1_READ_SHUTDOWN: + current_test = SUCCESS; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "All tests successful\n"); + GNUNET_SCHEDULER_add_now (&do_close, NULL); + break; + case SUCCESS: + GNUNET_assert (0); /* We shouldn't reach here */ + + } + } +} + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size) +{ + struct PeerData *peer = cls; + + switch (current_test) + { + case PEER1_WRITE: + case PEER1_HALFCLOSE_READ: + + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + peer->bytes_wrote += size; + + if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ + { + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Writing completed\n"); + + if (&peer1 == peer) + { + peer1.test_ok = GNUNET_YES; + transition (); /* to PEER1_WRITE_SHUTDOWN */ + } + else /* This will happen during PEER1_HALFCLOSE_READ */ + { + peer2.test_ok = GNUNET_YES; + transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */ + } + } + break; + case PEER1_HALFCLOSE_WRITE_FAIL: + GNUNET_assert (peer == &peer1); + GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); + GNUNET_assert (0 == size); + peer1.test_ok = GNUNET_YES; + break; + case PEER1_READ_SHUTDOWN: + GNUNET_assert (peer == &peer2); + GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); + GNUNET_assert (0 == size); + peer2.test_ok = GNUNET_YES; + break; + case PEER1_WRITE_SHUTDOWN: + case SUCCESS: + GNUNET_assert (0); /* We shouldn't reach here */ + } +} + + +/** + * Function executed after stream has been established + * + * @param cls the closure from GNUNET_STREAM_open + * @param socket socket to use to communicate with the other side (read/write) + */ +static void +stream_open_cb (void *cls, + struct GNUNET_STREAM_Socket *socket) +{ + struct PeerData *peer; + + GNUNET_assert (socket == peer1.socket); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Stream established from peer1\n", + GNUNET_i2s (&peer1.our_id)); + peer = (struct PeerData *) cls; + peer->bytes_wrote = 0; + GNUNET_assert (socket == peer1.socket); + GNUNET_assert (socket == peer->socket); + peer1.test_ok = GNUNET_NO; + peer2.test_ok = GNUNET_NO; + current_test = PEER1_WRITE; + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); +} + + +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size) +{ + struct PeerData *peer; + + peer = (struct PeerData *) cls; + + switch (current_test) + { + case PEER1_WRITE: + case PEER1_HALFCLOSE_READ: + if (GNUNET_STREAM_TIMEOUT == status) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Read operation timedout - reading again!\n"); + GNUNET_assert (0 == size); + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + return 0; + } + + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, + (const char *) input_data, + size)); + peer->bytes_read += size; + + if (peer->bytes_read < strlen (data)) + { + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + } + else + { + if (&peer2 == peer) /* Peer2 has completed reading; should write */ + { + peer2.test_ok = GNUNET_YES; + transition (); /* Transition to PEER1_WRITE_SHUTDOWN */ + } + else /* Peer1 has completed reading. End of tests */ + { + peer1.test_ok = GNUNET_YES; + transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */ + } + } + break; + case PEER1_WRITE_SHUTDOWN: + GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status); + peer2.test_ok = GNUNET_YES; + break; + case PEER1_HALFCLOSE_WRITE_FAIL: + case PEER1_READ_SHUTDOWN: + case SUCCESS: + GNUNET_assert (0); /* We shouldn't reach here */ + } + + return size; +} + + +/** + * Scheduler call back; to be executed when a new stream is connected + * Called from listen connect for peer2 + */ +static void +stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + read_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_assert (NULL != cls); + peer2.bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); +} + + +/** + * Functions of this type are called upon new stream connection from other peers + * + * @param cls the closure from GNUNET_STREAM_listen + * @param socket the socket representing the stream + * @param initiator the identity of the peer who wants to establish a stream + * with us + * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the + * stream (the socket will be invalid after the call) + */ +static int +stream_listen_cb (void *cls, + struct GNUNET_STREAM_Socket *socket, + const struct GNUNET_PeerIdentity *initiator) +{ + GNUNET_assert (NULL != socket); + GNUNET_assert (NULL != initiator); + GNUNET_assert (socket != peer1.socket); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s: Peer connected: %s\n", + GNUNET_i2s (&peer2.our_id), + GNUNET_i2s(initiator)); + + peer2.socket = socket; + /* FIXME: reading should be done right now instead of a scheduled call */ + read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); + return GNUNET_OK; +} + + +/** + * Callback to be called when testing peer group is ready + * + * @param cls NULL + * @param emsg NULL on success + */ +void +peergroup_ready (void *cls, const char *emsg) +{ + if (NULL != emsg) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Starting peer group failed: %s\n", emsg); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer group is now ready\n"); + + GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg)); + + d1 = GNUNET_TESTING_daemon_get (pg, 0); + GNUNET_assert (NULL != d1); + + d2 = GNUNET_TESTING_daemon_get (pg, 1); + GNUNET_assert (NULL != d2); + + GNUNET_TESTING_get_peer_identity (d1->cfg, + &peer1.our_id); + GNUNET_TESTING_get_peer_identity (d2->cfg, + &peer2.our_id); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s : %s\n", + GNUNET_i2s (&peer1.our_id), + GNUNET_i2s (&d1->id)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s : %s\n", + GNUNET_i2s (&peer2.our_id), + GNUNET_i2s (&d2->id)); + + peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg, + 10, /* App port */ + &stream_listen_cb, + NULL); + GNUNET_assert (NULL != peer2_listen_socket); + + /* Connect to stream library */ + peer1.socket = GNUNET_STREAM_open (d1->cfg, + &d2->id, /* Null for local peer? */ + 10, /* App port */ + &stream_open_cb, + &peer1, + GNUNET_STREAM_OPTION_END); + GNUNET_assert (NULL != peer1.socket); +} + + +/** + * Initialize framework and start test + */ +static void +run (void *cls, char *const *args, const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */ + + /* GNUNET_log_setup ("test_stream_local", */ + /* "DEBUG", */ + /* NULL); */ + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Starting test\n"); + /* Duplicate the configuration */ + config = GNUNET_CONFIGURATION_dup (cfg); + + hosts = GNUNET_TESTING_hosts_load (config); + + pg = GNUNET_TESTING_peergroup_start (config, + 2, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 3), + NULL, + &peergroup_ready, + NULL, + hosts); + GNUNET_assert (NULL != pg); + + abort_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort, + NULL); +} + +/** + * Main function + */ +int main (int argc, char **argv) +{ + int ret; + + char *argv2[] = { "test-stream-2peers-halfclose", + "-L", "DEBUG", + "-c", "test_stream_local.conf", + NULL}; + + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_OPTION_END + }; + + ret = + GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, + "test-stream-2peers-halfclose", "nohelp", options, &run, NULL); + + if (GNUNET_OK != ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n", + ret); + return 1; + } + if (GNUNET_SYSERR == result) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n"); + return 1; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n"); + return 0; +} diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c index 4da1258..c9fab84 100644 --- a/src/stream/test_stream_local.c +++ b/src/stream/test_stream_local.c @@ -30,6 +30,7 @@ #include "gnunet_util_lib.h" #include "gnunet_mesh_service.h" #include "gnunet_stream_lib.h" +#include "gnunet_testing_lib.h" #define VERBOSE 1 @@ -44,9 +45,14 @@ struct PeerData struct GNUNET_STREAM_Socket *socket; /** - * Peer's io handle + * Peer's io write handle */ - struct GNUNET_STREAM_IOHandle *io_handle; + struct GNUNET_STREAM_IOWriteHandle *io_write_handle; + + /** + * Peer's io read handle + */ + struct GNUNET_STREAM_IOReadHandle *io_read_handle; /** * Bytes the peer has written @@ -63,14 +69,92 @@ static struct GNUNET_OS_Process *arm_pid; static struct PeerData peer1; static struct PeerData peer2; static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket; +static struct GNUNET_CONFIGURATION_Handle *config_peer1; +static struct GNUNET_CONFIGURATION_Handle *config_peer2; static GNUNET_SCHEDULER_TaskIdentifier abort_task; static GNUNET_SCHEDULER_TaskIdentifier test_task; -static GNUNET_SCHEDULER_TaskIdentifier read_task; static char *data = "ABCD"; static int result; +static int writing_success; +static int reading_success; + +/** + * Input processor + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param data traffic from the other side + * @param size the number of bytes available in data read + * @return number of bytes of processed from 'data' (any data remaining should be + * given to the next time the read processor is called). + */ +static size_t +input_processor (void *cls, + enum GNUNET_STREAM_Status status, + const void *input_data, + size_t size); + +/** + * Task for calling STREAM_read + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_read_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_read_handle = GNUNET_STREAM_read (peer->socket, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &input_processor, + peer); + GNUNET_assert (NULL != peer->io_read_handle); +} + +/** + * The write completion function; called upon writing some data to stream or + * upon error + * + * @param cls the closure from GNUNET_STREAM_write/read + * @param status the status of the stream at the time this function is called + * @param size the number of bytes read or written + */ +static void +write_completion (void *cls, + enum GNUNET_STREAM_Status status, + size_t size); + + +/** + * Task for calling STREAM_write + * + * @param cls the peer data entity + * @param tc the task context + */ +static void +stream_write_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerData *peer = cls; + + peer->io_write_handle = + GNUNET_STREAM_write (peer->socket, + (void *) data, + strlen(data) - peer->bytes_wrote, + GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, 5), + &write_completion, + peer); + + GNUNET_assert (NULL != peer->io_write_handle); + } + /** * Shutdown nicely */ @@ -78,7 +162,10 @@ static void do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_STREAM_close (peer1.socket); - GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2.socket) + GNUNET_STREAM_close (peer2.socket); + if (NULL != peer2_listen_socket) + GNUNET_STREAM_listen_close (peer2_listen_socket); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n"); if (0 != abort_task) { @@ -90,8 +177,11 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n"); + /* Free the duplicated configuration */ + GNUNET_CONFIGURATION_destroy (config_peer1); + GNUNET_CONFIGURATION_destroy (config_peer2); GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid)); - GNUNET_OS_process_close (arm_pid); + GNUNET_OS_process_destroy (arm_pid); } @@ -106,10 +196,7 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_SCHEDULER_cancel (test_task); } - if (0 != read_task) - { - GNUNET_SCHEDULER_cancel (read_task); - } + result = GNUNET_SYSERR; abort_task = 0; do_shutdown (cls, tc); @@ -129,35 +216,31 @@ write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size) { - struct PeerData *peer; + struct PeerData *peer=cls; - peer = (struct PeerData *) cls; GNUNET_assert (GNUNET_STREAM_OK == status); - GNUNET_assert (size < strlen (data)); + GNUNET_assert (size <= strlen (data)); peer->bytes_wrote += size; if (peer->bytes_wrote < strlen(data)) /* Have more data to send */ { - peer->io_handle = GNUNET_STREAM_write (peer->socket, - (void *) data, - strlen(data) - peer->bytes_wrote, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_handle); + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } else { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Writing completed\n"); + if (&peer1 == peer) /* Peer1 has finished writing; should read now */ { - peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL!=peer->io_handle); + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); + } + else + { + writing_success = GNUNET_YES; + if (GNUNET_YES == reading_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); } } } @@ -173,20 +256,15 @@ static void stream_open_cb (void *cls, struct GNUNET_STREAM_Socket *socket) { - struct PeerData *peer; + struct PeerData *peer=cls; - peer = (struct PeerData *) cls; - peer->bytes_wrote = 0; + GNUNET_assert (&peer1 == peer); GNUNET_assert (socket == peer1.socket); GNUNET_assert (socket == peer->socket); - peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */ - (void *) data, /* data */ - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); - GNUNET_assert (NULL != peer->io_handle); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n"); + peer->bytes_wrote = 0; + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } @@ -206,44 +284,31 @@ input_processor (void *cls, const void *input_data, size_t size) { - struct PeerData *peer; + struct PeerData *peer = cls; - peer = (struct PeerData *) cls; - - GNUNET_assert (GNUNET_STERAM_OK == status); - GNUNET_assert (size < strlen (data)); - GNUNET_assert (strncmp ((const char *) data + peer->bytes_read, - (const char *) input_data, - size)); + GNUNET_assert (GNUNET_STREAM_OK == status); + GNUNET_assert (size <= strlen (data)); + GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read, + (const char *) input_data, + size)); peer->bytes_read += size; if (peer->bytes_read < strlen (data)) { - peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) - peer->socket, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - cls); - GNUNET_assert (NULL != peer->io_handle); + GNUNET_SCHEDULER_add_now (&stream_read_task, peer); } else { if (&peer2 == peer) /* Peer2 has completed reading; should write */ { peer->bytes_wrote = 0; - peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *) - peer->socket, - (void *) data, - strlen(data), - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &write_completion, - cls); + GNUNET_SCHEDULER_add_now (&stream_write_task, peer); } else /* Peer1 has completed reading. End of tests */ { - GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); + reading_success = GNUNET_YES; + if (GNUNET_YES == writing_success) + GNUNET_SCHEDULER_add_now (&do_shutdown, NULL); } } return size; @@ -251,29 +316,9 @@ input_processor (void *cls, /** - * Scheduler call back; to be executed when a new stream is connected - * Called from listen connect for peer2 - */ -static void -stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - read_task = GNUNET_SCHEDULER_NO_TASK; - GNUNET_assert (NULL != cls); - peer2.bytes_read = 0; - GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */ - peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), - &input_processor, - (void *) &peer2); - GNUNET_assert (NULL != peer2.io_handle); -} - - -/** * Functions of this type are called upon new stream connection from other peers * - * @param cls the closure from GNUNET_STREAM_listen + * @param cls the PeerData of peer2 * @param socket the socket representing the stream * @param initiator the identity of the peer who wants to establish a stream * with us @@ -285,35 +330,60 @@ stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket, const struct GNUNET_PeerIdentity *initiator) { + struct PeerData *peer=cls; + struct GNUNET_PeerIdentity self; + GNUNET_assert (NULL != socket); - GNUNET_assert (NULL == initiator); /* Local peer=NULL? */ GNUNET_assert (socket != peer1.socket); + GNUNET_assert (&peer2 == peer); + + /* Get our identity */ + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1, + &self)); + GNUNET_assert (0 == memcmp (&self, + initiator, + sizeof (struct GNUNET_PeerIdentity))); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Peer connected: %s\n", GNUNET_i2s(initiator)); - peer2.socket = socket; - read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket); + peer->socket = socket; + peer->bytes_read = 0; + GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2); return GNUNET_OK; } /** * Testing function + * + * @param cls NULL + * @param tc the task context */ static void test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct GNUNET_PeerIdentity self; + test_task = GNUNET_SCHEDULER_NO_TASK; + /* Get our identity */ + GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1, + &self)); + + peer2_listen_socket = GNUNET_STREAM_listen (config_peer2, + 10, /* App port */ + &stream_listen_cb, + &peer2); + GNUNET_assert (NULL != peer2_listen_socket); /* Connect to stream library */ - peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */ + peer1.socket = GNUNET_STREAM_open (config_peer1, + &self, /* Null for local peer? */ 10, /* App port */ &stream_open_cb, - (void *) &peer1); + &peer1, + GNUNET_STREAM_OPTION_END); GNUNET_assert (NULL != peer1.socket); - peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */ - &stream_listen_cb, - NULL); - GNUNET_assert (NULL != peer2_listen_socket); - } /** @@ -330,6 +400,9 @@ run (void *cls, char *const *args, const char *cfgfile, "WARNING", #endif NULL); + /* Duplicate the configuration */ + config_peer1 = GNUNET_CONFIGURATION_dup (cfg); + config_peer2 = GNUNET_CONFIGURATION_dup (cfg); arm_pid = GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm", "gnunet-service-arm", @@ -340,11 +413,10 @@ run (void *cls, char *const *args, const char *cfgfile, abort_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort, + (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort, NULL); - - test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg); - + + test_task = GNUNET_SCHEDULER_add_now (&test, NULL); } /** @@ -355,7 +427,7 @@ int main (int argc, char **argv) int ret; char *const argv2[] = { "test-stream-local", - "-c", "test_stream.conf", + "-c", "test_stream_local.conf", #if VERBOSE "-L", "DEBUG", #endif @@ -365,7 +437,7 @@ int main (int argc, char **argv) struct GNUNET_GETOPT_CommandLineOption options[] = { GNUNET_GETOPT_OPTION_END }; - + ret = GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2, "test-stream-local", "nohelp", options, &run, NULL); @@ -381,6 +453,6 @@ int main (int argc, char **argv) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n"); return 1; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n"); return 0; } diff --git a/src/stream/test_stream_local.conf b/src/stream/test_stream_local.conf index 3d955f0..884ecbc 100644 --- a/src/stream/test_stream_local.conf +++ b/src/stream/test_stream_local.conf @@ -9,7 +9,7 @@ DEBUG = YES AUTOSTART = YES ACCEPT_FROM = 127.0.0.1; HOSTNAME = localhost -PORT = 10511 +PORT = 10700 # PREFIX = valgrind --leak-check=full # PREFIX = xterm -geometry 100x85 -T peer1 -e gdb --args @@ -19,7 +19,7 @@ AUTOSTART = YES ACCEPT_FROM6 = ::1; ACCEPT_FROM = 127.0.0.1; HOSTNAME = localhost -PORT = 2100 +PORT = 12100 [block] plugins = dht test @@ -53,14 +53,20 @@ TIMEOUT = 300 s PORT = 12368 [TESTING] +NUM_PEERS = 5 WEAKRANDOM = YES +DEBUG = YES +HOSTKEYSFILE = ../../contrib/testing_hostkeys.dat +MAX_CONCURRENT_SSH = 10 +USE_PROGRESSBARS = YES +PEERGROUP_TIMEOUT = 2400 s [gnunetd] HOSTKEY = $SERVICEHOME/.hostkey [PATHS] -DEFAULTCONFIG = test_mesh.conf -SERVICEHOME = /tmp/test-mesh/ +DEFAULTCONFIG = test_stream_local.conf +SERVICEHOME = /tmp/test-stream/ [dns] AUTOSTART = NO |