aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorBertrand Marc <beberking@gmail.com>2012-06-06 20:47:48 +0200
committerBertrand Marc <beberking@gmail.com>2012-06-06 20:47:48 +0200
commit740b30688bd745a527f96f9116c19acb3480971a (patch)
tree2709a3f4dba11c174aa9e1ba3612e30c578e76a9 /src/stream
parent2b81464a43485fcc8ce079fafdee7b7a171835f4 (diff)
Imported Upstream version 0.9.3upstream/0.9.3
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/Makefile.am29
-rw-r--r--src/stream/Makefile.in67
-rw-r--r--src/stream/README12
-rw-r--r--src/stream/stream_api.c2185
-rw-r--r--src/stream/stream_protocol.h8
-rw-r--r--src/stream/test_stream_2peers.c560
-rw-r--r--src/stream/test_stream_2peers_halfclose.c786
-rw-r--r--src/stream/test_stream_local.c272
-rw-r--r--src/stream/test_stream_local.conf14
9 files changed, 3258 insertions, 675 deletions
diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am
index 385c0cf..8d74417 100644
--- a/src/stream/Makefile.am
+++ b/src/stream/Makefile.am
@@ -20,8 +20,9 @@ libgnunetstream_la_LDFLAGS = \
$(GN_LIB_LDFLAGS)
check_PROGRAMS = \
- test_stream_local
-# test_stream_halfclose
+ test-stream-2peers \
+ test-stream-2peers_halfclose \
+ test-stream-local
EXTRA_DIST = test_stream_local.conf
@@ -29,15 +30,23 @@ if ENABLE_TEST_RUN
TESTS = $(check_PROGRAMS)
endif
+test_stream_2peers_SOURCES = \
+ test_stream_2peers.c
+test_stream_2peers_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+
+test_stream_2peers_halfclose_SOURCES = \
+ test_stream_2peers_halfclose.c
+test_stream_2peers_halfclose_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+
test_stream_local_SOURCES = \
test_stream_local.c
test_stream_local_LDADD = \
$(top_builddir)/src/stream/libgnunetstream.la \
- $(top_builddir)/src/util/libgnunetutil.la
-
-#test_stream_halfclose_SOURCES = \
-# test_stream_halfclose.c
-#test_stream_halfclose_LDADD = \
-# $(top_builddir)/src/stream/libgnunetstream.la \
-# $(top_builddir)/src/util/libgnunetutil.la
-
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \ No newline at end of file
diff --git a/src/stream/Makefile.in b/src/stream/Makefile.in
index 44b63f7..9f1c278 100644
--- a/src/stream/Makefile.in
+++ b/src/stream/Makefile.in
@@ -35,7 +35,9 @@ POST_UNINSTALL = :
build_triplet = @build@
host_triplet = @host@
target_triplet = @target@
-check_PROGRAMS = test_stream_local$(EXEEXT)
+check_PROGRAMS = test-stream-2peers$(EXEEXT) \
+ test-stream-2peers_halfclose$(EXEEXT) \
+ test-stream-local$(EXEEXT)
subdir = src/stream
DIST_COMMON = README $(srcdir)/Makefile.am $(srcdir)/Makefile.in
ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
@@ -94,11 +96,26 @@ libgnunetstream_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \
$(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CCLD) \
$(AM_CFLAGS) $(CFLAGS) $(libgnunetstream_la_LDFLAGS) \
$(LDFLAGS) -o $@
+am_test_stream_2peers_OBJECTS = test_stream_2peers.$(OBJEXT)
+test_stream_2peers_OBJECTS = $(am_test_stream_2peers_OBJECTS)
+test_stream_2peers_DEPENDENCIES = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+am_test_stream_2peers_halfclose_OBJECTS = \
+ test_stream_2peers_halfclose.$(OBJEXT)
+test_stream_2peers_halfclose_OBJECTS = \
+ $(am_test_stream_2peers_halfclose_OBJECTS)
+test_stream_2peers_halfclose_DEPENDENCIES = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
am_test_stream_local_OBJECTS = test_stream_local.$(OBJEXT)
test_stream_local_OBJECTS = $(am_test_stream_local_OBJECTS)
test_stream_local_DEPENDENCIES = \
$(top_builddir)/src/stream/libgnunetstream.la \
- $(top_builddir)/src/util/libgnunetutil.la
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir)
depcomp = $(SHELL) $(top_srcdir)/depcomp
am__depfiles_maybe = depfiles
@@ -125,8 +142,12 @@ am__v_CCLD_0 = @echo " CCLD " $@;
AM_V_GEN = $(am__v_GEN_$(V))
am__v_GEN_ = $(am__v_GEN_$(AM_DEFAULT_VERBOSITY))
am__v_GEN_0 = @echo " GEN " $@;
-SOURCES = $(libgnunetstream_la_SOURCES) $(test_stream_local_SOURCES)
+SOURCES = $(libgnunetstream_la_SOURCES) $(test_stream_2peers_SOURCES) \
+ $(test_stream_2peers_halfclose_SOURCES) \
+ $(test_stream_local_SOURCES)
DIST_SOURCES = $(libgnunetstream_la_SOURCES) \
+ $(test_stream_2peers_SOURCES) \
+ $(test_stream_2peers_halfclose_SOURCES) \
$(test_stream_local_SOURCES)
ETAGS = etags
CTAGS = ctags
@@ -188,6 +209,7 @@ INSTALL_SCRIPT = @INSTALL_SCRIPT@
INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
INTLLIBS = @INTLLIBS@
INTL_MACOSX_LIBS = @INTL_MACOSX_LIBS@
+JAVAPORT = @JAVAPORT@
LD = @LD@
LDFLAGS = @LDFLAGS@
LIBADD_DL = @LIBADD_DL@
@@ -221,6 +243,7 @@ LT_DLLOADERS = @LT_DLLOADERS@
LT_DLPREOPEN = @LT_DLPREOPEN@
MAKEINFO = @MAKEINFO@
MKDIR_P = @MKDIR_P@
+MONKEYPREFIX = @MONKEYPREFIX@
MSGFMT = @MSGFMT@
MSGFMT_015 = @MSGFMT_015@
MSGMERGE = @MSGMERGE@
@@ -352,15 +375,31 @@ libgnunetstream_la_LIBADD = \
libgnunetstream_la_LDFLAGS = \
$(GN_LIB_LDFLAGS)
-# test_stream_halfclose
EXTRA_DIST = test_stream_local.conf
@ENABLE_TEST_RUN_TRUE@TESTS = $(check_PROGRAMS)
+test_stream_2peers_SOURCES = \
+ test_stream_2peers.c
+
+test_stream_2peers_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+
+test_stream_2peers_halfclose_SOURCES = \
+ test_stream_2peers_halfclose.c
+
+test_stream_2peers_halfclose_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+
test_stream_local_SOURCES = \
test_stream_local.c
test_stream_local_LDADD = \
$(top_builddir)/src/stream/libgnunetstream.la \
- $(top_builddir)/src/util/libgnunetutil.la
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
all: all-am
@@ -438,8 +477,14 @@ clean-checkPROGRAMS:
list=`for p in $$list; do echo "$$p"; done | sed 's/$(EXEEXT)$$//'`; \
echo " rm -f" $$list; \
rm -f $$list
-test_stream_local$(EXEEXT): $(test_stream_local_OBJECTS) $(test_stream_local_DEPENDENCIES)
- @rm -f test_stream_local$(EXEEXT)
+test-stream-2peers$(EXEEXT): $(test_stream_2peers_OBJECTS) $(test_stream_2peers_DEPENDENCIES)
+ @rm -f test-stream-2peers$(EXEEXT)
+ $(AM_V_CCLD)$(LINK) $(test_stream_2peers_OBJECTS) $(test_stream_2peers_LDADD) $(LIBS)
+test-stream-2peers_halfclose$(EXEEXT): $(test_stream_2peers_halfclose_OBJECTS) $(test_stream_2peers_halfclose_DEPENDENCIES)
+ @rm -f test-stream-2peers_halfclose$(EXEEXT)
+ $(AM_V_CCLD)$(LINK) $(test_stream_2peers_halfclose_OBJECTS) $(test_stream_2peers_halfclose_LDADD) $(LIBS)
+test-stream-local$(EXEEXT): $(test_stream_local_OBJECTS) $(test_stream_local_DEPENDENCIES)
+ @rm -f test-stream-local$(EXEEXT)
$(AM_V_CCLD)$(LINK) $(test_stream_local_OBJECTS) $(test_stream_local_LDADD) $(LIBS)
mostlyclean-compile:
@@ -449,6 +494,8 @@ distclean-compile:
-rm -f *.tab.c
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream_api.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_2peers.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_2peers_halfclose.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_local.Po@am__quote@
.c.o:
@@ -778,12 +825,6 @@ uninstall-am: uninstall-libLTLIBRARIES
uninstall-am uninstall-libLTLIBRARIES
-#test_stream_halfclose_SOURCES = \
-# test_stream_halfclose.c
-#test_stream_halfclose_LDADD = \
-# $(top_builddir)/src/stream/libgnunetstream.la \
-# $(top_builddir)/src/util/libgnunetutil.la
-
# Tell versions [3.59,3.63) of GNU make to not export all variables.
# Otherwise a system limit (for SysV at least) may be exceeded.
.NOEXPORT:
diff --git a/src/stream/README b/src/stream/README
index 9b550b0..977ca2d 100644
--- a/src/stream/README
+++ b/src/stream/README
@@ -1,11 +1,11 @@
-The aim of the stream library is to provide stream connections between peers in
-GNUnet. This is a convenience library which hides the complexity of dividing
-data stream into packets, transmitting them and retransmitting them in case of
+Stream library provides stream connections between peers in GNUnet. This is a
+convenience library which hides the complexity of dividing data stream into
+packets, transmitting them and retransmitting them in case of communication
errors.
This library's API are similar to unix PIPE API. The user is expected to open a
stream to a listening target peer. Once the stream is established, the user can
-use it as a pipe. Any data written into the stream will be readable by the
-target peer.
+use it as a pipe. Any data written into the stream at one peer will be readable
+by the other peer and vice versa.
-This library uses mesh API for establishing streams between peers. \ No newline at end of file
+This library uses mesh API for establishing tunnels between peers. \ No newline at end of file
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index 84fcdfd..dadba33 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -18,17 +18,31 @@
Boston, MA 02111-1307, USA.
*/
+/* TODO:
+ *
+ * Checks for matching the sender and socket->other_peer in server
+ * message handlers
+ *
+ * Add code for write io timeout
+ *
+ * Include retransmission for control messages
+ **/
+
/**
* @file stream/stream_api.c
* @brief Implementation of the stream library
* @author Sree Harsha Totakura
*/
+
+
#include "platform.h"
#include "gnunet_common.h"
#include "gnunet_crypto_lib.h"
#include "gnunet_stream_lib.h"
#include "stream_protocol.h"
+#define LOG(kind,...) \
+ GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
/**
* The maximum packet size of a stream packet
@@ -36,15 +50,15 @@
#define MAX_PACKET_SIZE 64000
/**
- * The maximum payload a data message packet can carry
+ * Receive buffer
*/
-static size_t max_payload_size =
- MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+#define RECEIVE_BUFFER_SIZE 4096000
/**
- * Receive buffer
+ * The maximum payload a data message packet can carry
*/
-#define RECEIVE_BUFFER_SIZE 4096000
+static size_t max_payload_size =
+ MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
/**
* states in the Protocol
@@ -150,12 +164,6 @@ struct MessageQueue
*/
struct GNUNET_STREAM_Socket
{
-
- /**
- * The peer identity of the peer at the other end of the stream
- */
- struct GNUNET_PeerIdentity other_peer;
-
/**
* Retransmission timeout
*/
@@ -177,16 +185,6 @@ struct GNUNET_STREAM_Socket
struct GNUNET_TIME_Relative ack_time_deadline;
/**
- * The task for sending timely Acks
- */
- GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
-
- /**
- * Task scheduled to continue a read operation.
- */
- GNUNET_SCHEDULER_TaskIdentifier read_task;
-
- /**
* The mesh handle
*/
struct GNUNET_MESH_Handle *mesh;
@@ -212,6 +210,16 @@ struct GNUNET_STREAM_Socket
struct GNUNET_MESH_TransmitHandle *transmit_handle;
/**
+ * The current act transmit handle (if a pending ack transmit request exists)
+ */
+ struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
+
+ /**
+ * Pointer to the current ack message using in ack_task
+ */
+ struct GNUNET_STREAM_AckMessage *ack_msg;
+
+ /**
* The current message associated with the transmit handle
*/
struct MessageQueue *queue_head;
@@ -232,14 +240,45 @@ struct GNUNET_STREAM_Socket
struct GNUNET_STREAM_IOReadHandle *read_handle;
/**
+ * The shutdown handle associated with this socket
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ /**
* Buffer for storing received messages
*/
void *receive_buffer;
/**
+ * The listen socket from which this socket is derived. Should be NULL if it
+ * is not a derived socket
+ */
+ struct GNUNET_STREAM_ListenSocket *lsocket;
+
+ /**
+ * The peer identity of the peer at the other end of the stream
+ */
+ struct GNUNET_PeerIdentity other_peer;
+
+ /**
* Task identifier for the read io timeout task
*/
- GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
+ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+ /**
+ * Task identifier for retransmission task after timeout
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+
+ /**
+ * The task for sending timely Acks
+ */
+ GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
+
+ /**
+ * Task scheduled to continue a read operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_task_id;
/**
* The state of the protocol associated with this socket
@@ -257,6 +296,11 @@ struct GNUNET_STREAM_Socket
unsigned int retries;
/**
+ * The application port number (type: uint32_t)
+ */
+ GNUNET_MESH_ApplicationType app_port;
+
+ /**
* The session id associated with this stream connection
* FIXME: Not used currently, may be removed
*/
@@ -286,7 +330,7 @@ struct GNUNET_STREAM_Socket
/**
* receiver's available buffer after the last acknowledged packet
*/
- uint32_t receive_window_available;
+ uint32_t receiver_window_available;
/**
* The offset pointer used during write operation
@@ -310,7 +354,6 @@ struct GNUNET_STREAM_Socket
*/
struct GNUNET_STREAM_ListenSocket
{
-
/**
* The mesh handle
*/
@@ -328,6 +371,7 @@ struct GNUNET_STREAM_ListenSocket
/**
* The service port
+ * FIXME: Remove if not required!
*/
GNUNET_MESH_ApplicationType port;
};
@@ -339,9 +383,24 @@ struct GNUNET_STREAM_ListenSocket
struct GNUNET_STREAM_IOWriteHandle
{
/**
+ * The socket to which this write handle is associated
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
* The packet_buffers associated with this Handle
*/
- struct GNUNET_STREAM_DataMessage *messages[64];
+ struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
+
+ /**
+ * The write continuation callback
+ */
+ GNUNET_STREAM_CompletionContinuation write_cont;
+
+ /**
+ * Write continuation closure
+ */
+ void *write_cont_cls;
/**
* The bitmap of this IOHandle; Corresponding bit for a message is set when
@@ -350,11 +409,9 @@ struct GNUNET_STREAM_IOWriteHandle
GNUNET_STREAM_AckBitmap ack_bitmap;
/**
- * Number of packets sent before waiting for an ack
- *
- * FIXME: Do we need this?
+ * Number of bytes in this write handle
*/
- unsigned int sent_packets;
+ size_t size;
};
@@ -376,13 +433,45 @@ struct GNUNET_STREAM_IOReadHandle
/**
+ * Handle for Shutdown
+ */
+struct GNUNET_STREAM_ShutdownHandle
+{
+ /**
+ * The socket associated with this shutdown handle
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Shutdown completion callback
+ */
+ GNUNET_STREAM_ShutdownCompletion completion_cb;
+
+ /**
+ * Closure for completion callback
+ */
+ void *completion_cls;
+
+ /**
+ * Close message retransmission task id
+ */
+ GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
+
+ /**
+ * Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
+ */
+ int operation;
+};
+
+
+/**
* Default value in seconds for various timeouts
*/
-static unsigned int default_timeout = 300;
+static unsigned int default_timeout = 10;
/**
- * Callback function for sending hello message
+ * Callback function for sending queued message
*
* @param cls closure the socket
* @param size number of bytes available in buf
@@ -401,31 +490,32 @@ send_message_notify (void *cls, size_t size, void *buf)
if (NULL == head)
return 0; /* just to be safe */
if (0 == size) /* request timed out */
- {
- socket->retries++;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Message sending timed out. Retry %d \n",
- socket->retries);
- socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- /* FIXME: exponential backoff */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (head->message->header.size),
- &send_message_notify,
- socket);
- return 0;
- }
+ {
+ socket->retries++;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Message sending timed out. Retry %d \n",
+ GNUNET_i2s (&socket->other_peer),
+ socket->retries);
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ /* FIXME: exponential backoff */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
+ return 0;
+ }
ret = ntohs (head->message->header.size);
GNUNET_assert (size >= ret);
memcpy (buf, head->message, ret);
if (NULL != head->finish_cb)
- {
- head->finish_cb (socket, head->finish_cb_cls);
- }
+ {
+ head->finish_cb (head->finish_cb_cls, socket);
+ }
GNUNET_CONTAINER_DLL_remove (socket->queue_head,
socket->queue_tail,
head);
@@ -433,19 +523,19 @@ send_message_notify (void *cls, size_t size, void *buf)
GNUNET_free (head);
head = socket->queue_head;
if (NULL != head) /* more pending messages to send */
- {
- socket->retries = 0;
- socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- /* FIXME: exponential backoff */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (head->message->header.size),
- &send_message_notify,
- socket);
- }
+ {
+ socket->retries = 0;
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ /* FIXME: exponential backoff */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
+ }
return ret;
}
@@ -466,6 +556,16 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
{
struct MessageQueue *queue_entity;
+ GNUNET_assert
+ ((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
+ && (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Queueing message of type %d and size %d\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohs (message->header.type),
+ ntohs (message->header.size));
+ GNUNET_assert (NULL != message);
queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
queue_entity->message = message;
queue_entity->finish_cb = finish_cb;
@@ -490,6 +590,31 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
/**
+ * Copies a message and queues it for sending using the mesh connection of
+ * given socket
+ *
+ * @param socket the socket whose mesh connection is used
+ * @param message the message to be sent
+ * @param finish_cb the callback to be called when the message is sent
+ * @param finish_cb_cls the closure for the callback
+ */
+static void
+copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ SendFinishCallback finish_cb,
+ void *finish_cb_cls)
+{
+ struct GNUNET_STREAM_MessageHeader *msg_copy;
+ uint16_t size;
+
+ size = ntohs (message->header.size);
+ msg_copy = GNUNET_malloc (size);
+ memcpy (msg_copy, message, size);
+ queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
+}
+
+
+/**
* Callback function for sending ack message
*
* @param cls closure the ACK message created in ack_task
@@ -500,21 +625,56 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
static size_t
send_ack_notify (void *cls, size_t size, void *buf)
{
- struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+ struct GNUNET_STREAM_Socket *socket = cls;
if (0 == size)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s called with size 0\n", __func__);
- return 0;
- }
- GNUNET_assert (ack_msg->header.header.size <= size);
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s called with size 0\n", __func__);
+ return 0;
+ }
+ GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
+
+ size = ntohs (socket->ack_msg->header.header.size);
+ memcpy (buf, socket->ack_msg, size);
- size = ack_msg->header.header.size;
- memcpy (buf, ack_msg, size);
+ GNUNET_free (socket->ack_msg);
+ socket->ack_msg = NULL;
+ socket->ack_transmit_handle = NULL;
return size;
}
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receiver_window_size
+ *
+ * @param socket the socket to use
+ */
+static void
+write_data (struct GNUNET_STREAM_Socket *socket);
+
+/**
+ * Task for retransmitting data messages if they aren't ACK before their ack
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+retransmission_timeout_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ return;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
+ socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ write_data (socket);
+}
+
/**
* Task for sending ACK message
@@ -530,12 +690,10 @@ ack_task (void *cls,
struct GNUNET_STREAM_AckMessage *ack_msg;
if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
- {
- return;
- }
-
- socket->ack_task_id = 0;
-
+ {
+ return;
+ }
+ socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Create the ACK Message */
ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
ack_msg->header.header.size = htons (sizeof (struct
@@ -545,18 +703,61 @@ ack_task (void *cls,
ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
ack_msg->receive_window_remaining =
htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
-
+ socket->ack_msg = ack_msg;
/* Request MESH for sending ACK */
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (ack_msg->header.header.size),
- &send_ack_notify,
- ack_msg);
+ socket->ack_transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (ack_msg->header.header.size),
+ &send_ack_notify,
+ socket);
+}
-
+
+/**
+ * Retransmission task for shutdown messages
+ *
+ * @param cls the shutdown handle
+ * @param tc the Task Context
+ */
+static void
+close_msg_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle = cls;
+ struct GNUNET_STREAM_MessageHeader *msg;
+ struct GNUNET_STREAM_Socket *socket;
+
+ GNUNET_assert (NULL != shutdown_handle);
+ socket = shutdown_handle->socket;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ switch (shutdown_handle->operation)
+ {
+ case SHUT_RDWR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+ break;
+ case SHUT_RD:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+ break;
+ case SHUT_WR:
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+ break;
+ default:
+ GNUNET_free (msg);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ return;
+ }
+ queue_message (socket, msg, NULL, NULL);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &close_msg_retransmission_task,
+ shutdown_handle);
}
@@ -572,7 +773,7 @@ ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
unsigned int bit,
int value)
{
- GNUNET_assert (bit < 64);
+ GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
if (GNUNET_YES == value)
*bitmap |= (1LL << bit);
else
@@ -591,31 +792,14 @@ static uint8_t
ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
unsigned int bit)
{
- GNUNET_assert (bit < 64);
+ GNUNET_assert (bit < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH);
return 0 != (*bitmap & (1LL << bit));
}
-
-/**
- * Function called when Data Message is sent
- *
- * @param cls the io_handle corresponding to the Data Message
- * @param socket the socket which was used
- */
-static void
-write_data_finish_cb (void *cls,
- struct GNUNET_STREAM_Socket *socket)
-{
- struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
-
- io_handle->sent_packets++;
-}
-
-
/**
* Writes data using the given socket. The amount of data written is limited by
- * the receive_window_size
+ * the receiver_window_size
*
* @param socket the socket to use
*/
@@ -623,43 +807,59 @@ static void
write_data (struct GNUNET_STREAM_Socket *socket)
{
struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
- unsigned int packet;
+ int packet; /* Although an int, should never be negative */
int ack_packet;
ack_packet = -1;
/* Find the last acknowledged packet */
- for (packet=0; packet < 64; packet++)
- {
- if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
- packet))
- ack_packet = packet;
- else if (NULL == io_handle->messages[packet])
- break;
- }
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
+ ack_packet = packet;
+ else if (NULL == io_handle->messages[packet])
+ break;
+ }
/* Resend packets which weren't ack'ed */
for (packet=0; packet < ack_packet; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
{
- if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
- packet))
- {
- queue_message (socket,
- &io_handle->messages[packet]->header,
- NULL,
- NULL);
- }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Placing DATA message with sequence %u in send queue\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
}
+ }
packet = ack_packet + 1;
/* Now send new packets if there is enough buffer space */
while ( (NULL != io_handle->messages[packet]) &&
- (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
- {
- socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
- queue_message (socket,
- &io_handle->messages[packet]->header,
- &write_data_finish_cb,
- io_handle);
- packet++;
- }
+ (socket->receiver_window_available
+ >= ntohs (io_handle->messages[packet]->header.header.size)) )
+ {
+ socket->receiver_window_available -=
+ ntohs (io_handle->messages[packet]->header.header.size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Placing DATA message with sequence %u in send queue\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
+ copy_and_queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
+ packet++;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
+ socket->retransmission_timeout_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 8),
+ &retransmission_timeout_task,
+ socket);
}
@@ -671,7 +871,7 @@ write_data (struct GNUNET_STREAM_Socket *socket)
*/
static void
call_read_processor (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
size_t read_size;
@@ -680,87 +880,95 @@ call_read_processor (void *cls,
uint32_t sequence_increase;
uint32_t offset_increase;
- socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+ if (NULL == socket->receive_buffer)
+ return;
+
GNUNET_assert (NULL != socket->read_handle);
GNUNET_assert (NULL != socket->read_handle->proc);
/* Check the bitmap for any holes */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
- {
- if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
- packet))
- break;
- }
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ packet))
+ break;
+ }
/* We only call read processor if we have the first packet */
GNUNET_assert (0 < packet);
-
valid_read_size =
socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
-
GNUNET_assert (0 != valid_read_size);
-
/* Cancel the read_io_timeout_task */
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
- socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
-
+ GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
+ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
/* Call the data processor */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Calling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
read_size =
socket->read_handle->proc (socket->read_handle->proc_cls,
socket->status,
socket->receive_buffer + socket->copy_offset,
valid_read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read processor read %d bytes\n",
+ GNUNET_i2s (&socket->other_peer), read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read processor completed successfully\n",
+ GNUNET_i2s (&socket->other_peer));
/* Free the read handle */
GNUNET_free (socket->read_handle);
socket->read_handle = NULL;
-
GNUNET_assert (read_size <= valid_read_size);
socket->copy_offset += read_size;
-
/* Determine upto which packet we can remove from the buffer */
for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
+ { packet++; break; }
if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
break;
+ }
/* If no packets can be removed we can't move the buffer */
if (0 == packet) return;
-
sequence_increase = packet;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Sequence increase after read processor completion: %u\n",
+ GNUNET_i2s (&socket->other_peer), sequence_increase);
/* Shift the data in the receive buffer */
memmove (socket->receive_buffer,
socket->receive_buffer
+ socket->receive_buffer_boundaries[sequence_increase-1],
- socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
-
+ socket->receive_buffer_size
+ - socket->receive_buffer_boundaries[sequence_increase-1]);
/* Shift the bitmap */
socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
-
/* Set read_sequence_number */
socket->read_sequence_number += sequence_increase;
-
/* Set read_offset */
offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
socket->read_offset += offset_increase;
-
/* Fix copy_offset */
GNUNET_assert (offset_increase <= socket->copy_offset);
socket->copy_offset -= offset_increase;
-
/* Fix relative boundaries */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
{
- if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
- {
- socket->receive_buffer_boundaries[packet] =
- socket->receive_buffer_boundaries[packet + sequence_increase]
- - offset_increase;
- }
- else
- socket->receive_buffer_boundaries[packet] = 0;
+ socket->receive_buffer_boundaries[packet] =
+ socket->receive_buffer_boundaries[packet + sequence_increase]
+ - offset_increase;
}
+ else
+ socket->receive_buffer_boundaries[packet] = 0;
+ }
}
@@ -772,20 +980,32 @@ call_read_processor (void *cls,
*/
static void
read_io_timeout (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ GNUNET_STREAM_DataProcessor proc;
+ void *proc_cls;
- socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
- if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
+ socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
- GNUNET_SCHEDULER_cancel (socket->read_task);
- socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read task timedout - Cancelling it\n",
+ GNUNET_i2s (&socket->other_peer));
+ GNUNET_SCHEDULER_cancel (socket->read_task_id);
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_assert (NULL != socket->read_handle);
-
+ proc = socket->read_handle->proc;
+ proc_cls = socket->read_handle->proc_cls;
+
GNUNET_free (socket->read_handle);
socket->read_handle = NULL;
+ /* Call the read processor to signal timeout */
+ proc (proc_cls,
+ GNUNET_STREAM_TIMEOUT,
+ NULL,
+ 0);
}
@@ -815,96 +1035,150 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
size = htons (msg->header.header.size);
if (size < sizeof (struct GNUNET_STREAM_DataMessage))
- {
- GNUNET_break_op (0);
- return GNUNET_SYSERR;
- }
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+
+ if (0 != memcmp (sender,
+ &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+
+ /* check if the message's sequence number is in the range we are
+ expecting */
+ relative_sequence_number =
+ ntohl (msg->sequence_number) - socket->read_sequence_number;
+ if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
{
- case STATE_ESTABLISHED:
- case STATE_TRANSMIT_CLOSED:
- case STATE_TRANSMIT_CLOSE_WAIT:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring received message with sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
+ return GNUNET_YES;
+ }
+
+ /* Check if we have already seen this message */
+ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ relative_sequence_number))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring already received message with sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
+ return GNUNET_YES;
+ }
- /* check if the message's sequence number is in the range we are
- expecting */
- relative_sequence_number =
- ntohl (msg->sequence_number) - socket->read_sequence_number;
- if ( relative_sequence_number > 64)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Ignoring received message with sequence number %d",
- ntohl (msg->sequence_number));
- return GNUNET_YES;
- }
-
- /* Check if we have to allocate the buffer */
- size -= sizeof (struct GNUNET_STREAM_DataMessage);
- relative_offset = ntohl (msg->offset) - socket->read_offset;
- bytes_needed = relative_offset + size;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number),
+ ntohs (msg->header.header.size),
+ GNUNET_i2s (&socket->other_peer));
- if (bytes_needed > socket->receive_buffer_size)
- {
- if (bytes_needed <= RECEIVE_BUFFER_SIZE)
- {
- socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
- bytes_needed);
- socket->receive_buffer_size = bytes_needed;
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Cannot accommodate packet %d as buffer is full\n",
- ntohl (msg->sequence_number));
- return GNUNET_YES;
- }
- }
+ /* Check if we have to allocate the buffer */
+ size -= sizeof (struct GNUNET_STREAM_DataMessage);
+ relative_offset = ntohl (msg->offset) - socket->read_offset;
+ bytes_needed = relative_offset + size;
+ if (bytes_needed > socket->receive_buffer_size)
+ {
+ if (bytes_needed <= RECEIVE_BUFFER_SIZE)
+ {
+ socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
+ bytes_needed);
+ socket->receive_buffer_size = bytes_needed;
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Cannot accommodate packet %d as buffer is full\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (msg->sequence_number));
+ return GNUNET_YES;
+ }
+ }
- /* Copy Data to buffer */
- payload = &msg[1];
- GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
- memcpy (socket->receive_buffer + relative_offset,
- payload,
- size);
- socket->receive_buffer_boundaries[relative_sequence_number] =
- relative_offset + size;
+ /* Copy Data to buffer */
+ payload = &msg[1];
+ GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
+ memcpy (socket->receive_buffer + relative_offset,
+ payload,
+ size);
+ socket->receive_buffer_boundaries[relative_sequence_number] =
+ relative_offset + size;
- /* Modify the ACK bitmap */
- ackbitmap_modify_bit (&socket->ack_bitmap,
- relative_sequence_number,
- GNUNET_YES);
+ /* Modify the ACK bitmap */
+ ackbitmap_modify_bit (&socket->ack_bitmap,
+ relative_sequence_number,
+ GNUNET_YES);
- /* Start ACK sending task if one is not already present */
- if (0 == socket->ack_task_id)
- {
- socket->ack_task_id =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
- (msg->ack_deadline),
- &ack_task,
- socket);
- }
-
- if ((NULL != socket->read_handle) /* A read handle is waiting */
- /* There is no current read task */
- && (GNUNET_SCHEDULER_NO_TASK == socket->read_task)
- /* We have the first packet */
- && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
- 0)))
- {
- socket->read_task =
- GNUNET_SCHEDULER_add_now (&call_read_processor,
+ /* Start ACK sending task if one is not already present */
+ if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
socket);
- }
-
- break;
+ }
- default:
- /* FIXME: call statistics */
- break;
+ if ((NULL != socket->read_handle) /* A read handle is waiting */
+ /* There is no current read task */
+ && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
+ /* We have the first packet */
+ && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
+ 0)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Scheduling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
+
+ socket->read_task_id =
+ GNUNET_SCHEDULER_add_now (&call_read_processor,
+ socket);
}
+
+ break;
+
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received data message when it cannot be handled\n",
+ GNUNET_i2s (&socket->other_peer));
+ break;
+ }
return GNUNET_YES;
}
+
/**
* Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
*
@@ -919,11 +1193,11 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
*/
static int
client_handle_data (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
- const struct GNUNET_PeerIdentity *sender,
- const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information*atsi)
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
@@ -945,10 +1219,31 @@ static void
set_state_established (void *cls,
struct GNUNET_STREAM_Socket *socket)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaining ESTABLISHED state\n",
+ GNUNET_i2s (&socket->other_peer));
socket->write_offset = 0;
socket->read_offset = 0;
socket->state = STATE_ESTABLISHED;
+ /* FIXME: What if listen_cb is NULL */
+ if (NULL != socket->lsocket)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Calling listen callback\n",
+ GNUNET_i2s (&socket->other_peer));
+ if (GNUNET_SYSERR ==
+ socket->lsocket->listen_cb (socket->lsocket->listen_cb_cls,
+ socket,
+ &socket->other_peer))
+ {
+ socket->state = STATE_CLOSED;
+ /* FIXME: We should close in a decent way */
+ GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
+ GNUNET_free (socket);
+ }
+ }
+ else if (socket->open_cb)
+ socket->open_cb (socket->open_cls, socket);
}
@@ -963,12 +1258,115 @@ set_state_hello_wait (void *cls,
struct GNUNET_STREAM_Socket *socket)
{
GNUNET_assert (STATE_INIT == socket->state);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaining HELLO_WAIT state\n",
+ GNUNET_i2s (&socket->other_peer));
socket->state = STATE_HELLO_WAIT;
}
/**
+ * Callback to set state to CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaing CLOSE_WAIT state\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_CLOSE_WAIT;
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to RECEIVE_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_receive_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaing RECEIVE_CLOSE_WAIT state\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_RECEIVE_CLOSE_WAIT;
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+}
+
+
+/**
+ * Callback to set state to TRANSMIT_CLOSE_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_transmit_close_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Attaing TRANSMIT_CLOSE_WAIT state\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_TRANSMIT_CLOSE_WAIT;
+}
+
+
+/**
+ * Callback to set state to CLOSED
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket requiring state change
+ */
+static void
+set_state_closed (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ socket->state = STATE_CLOSED;
+}
+
+/**
+ * Returns a new HelloAckMessage. Also sets the write sequence number for the
+ * socket
+ *
+ * @param socket the socket for which this HelloAckMessage has to be generated
+ * @return the HelloAckMessage
+ */
+static struct GNUNET_STREAM_HelloAckMessage *
+generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_HelloAckMessage *msg;
+
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Generated write sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ (unsigned int) socket->write_sequence_number);
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ msg->header.header.size =
+ htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ msg->sequence_number = htonl (socket->write_sequence_number);
+ msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
+
+ return msg;
+}
+
+
+/**
* Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
*
* @param cls the socket (set from GNUNET_MESH_connect)
@@ -992,37 +1390,48 @@ client_handle_hello_ack (void *cls,
const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
struct GNUNET_STREAM_HelloAckMessage *reply;
+ if (0 != memcmp (sender,
+ &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received HELLO_ACK from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received HELLO_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+
GNUNET_assert (socket->tunnel == tunnel);
switch (socket->state)
{
case STATE_HELLO_WAIT:
- socket->read_sequence_number = ntohl (ack_msg->sequence_number);
- socket->receive_window_available = ntohl (ack_msg->receive_window_size);
- /* Get the random sequence number */
- socket->write_sequence_number =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
- reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
- reply->header.header.size =
- htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.header.type =
- htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
- reply->sequence_number = htonl (socket->write_sequence_number);
- reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
- queue_message (socket,
- &reply->header,
- &set_state_established,
- NULL);
- return GNUNET_OK;
+ socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ (unsigned int) socket->read_sequence_number);
+ socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
+ reply = generate_hello_ack_msg (socket);
+ queue_message (socket,
+ &reply->header,
+ &set_state_established,
+ NULL);
+ return GNUNET_OK;
case STATE_ESTABLISHED:
case STATE_RECEIVE_CLOSE_WAIT:
// call statistics (# ACKs ignored++)
return GNUNET_OK;
case STATE_INIT:
default:
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Server sent HELLO_ACK when in state %d\n", socket->state);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Server %s sent HELLO_ACK when in state %d\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer),
+ socket->state);
socket->state = STATE_CLOSED; // introduce STATE_ERROR?
return GNUNET_SYSERR;
}
@@ -1050,7 +1459,7 @@ client_handle_reset (void *cls,
const struct GNUNET_MessageHeader *message,
const struct GNUNET_ATS_Information*atsi)
{
- struct GNUNET_STREAM_Socket *socket = cls;
+ // struct GNUNET_STREAM_Socket *socket = cls;
return GNUNET_OK;
}
@@ -1077,22 +1486,22 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
struct GNUNET_STREAM_MessageHeader *reply;
switch (socket->state)
- {
- case STATE_ESTABLISHED:
- socket->state = STATE_RECEIVE_CLOSED;
+ {
+ case STATE_ESTABLISHED:
+ socket->state = STATE_RECEIVE_CLOSED;
- /* Send TRANSMIT_CLOSE_ACK */
- reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.type =
- htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
- reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- queue_message (socket, reply, NULL, NULL);
- break;
+ /* Send TRANSMIT_CLOSE_ACK */
+ reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
+ reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ queue_message (socket, reply, NULL, NULL);
+ break;
- default:
- /* FIXME: Call statistics? */
- break;
- }
+ default:
+ /* FIXME: Call statistics? */
+ break;
+ }
return GNUNET_YES;
}
@@ -1128,6 +1537,141 @@ client_handle_transmit_close (void *cls,
/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @param operation the close operation which is being ACK'ed
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi,
+ int operation)
+{
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ shutdown_handle = socket->shutdown_handle;
+ if (NULL == shutdown_handle)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE_ACK when shutdown handle is NULL\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+
+ switch (operation)
+ {
+ case SHUT_RDWR:
+ switch (socket->state)
+ {
+ case STATE_CLOSE_WAIT:
+ if (SHUT_RDWR != shutdown_handle->operation)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE_ACK when shutdown handle is not for "
+ "SHUT_RDWR\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_CLOSED;
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE_ACK when in it not expected\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ break;
+
+ case SHUT_RD:
+ switch (socket->state)
+ {
+ case STATE_RECEIVE_CLOSE_WAIT:
+ if (SHUT_RD != shutdown_handle->operation)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
+ "is not for SHUT_RD\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_RECEIVE_CLOSED;
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+
+ break;
+ case SHUT_WR:
+ switch (socket->state)
+ {
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ if (SHUT_WR != shutdown_handle->operation)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received TRANSMIT_CLOSE_ACK when shutdown handle "
+ "is not for SHUT_WR\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ socket->state = STATE_TRANSMIT_CLOSED;
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
+ GNUNET_i2s (&socket->other_peer));
+
+ return GNUNET_OK;
+ }
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+
+ if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
+ shutdown_handle->completion_cb(shutdown_handle->completion_cls,
+ operation);
+ GNUNET_free (shutdown_handle); /* Free shutdown handle */
+ socket->shutdown_handle = NULL;
+ if (GNUNET_SCHEDULER_NO_TASK
+ != shutdown_handle->close_msg_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel
+ (shutdown_handle->close_msg_retransmission_task_id);
+ shutdown_handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
* Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
*
* @param cls the socket (set from GNUNET_MESH_connect)
@@ -1149,6 +1693,67 @@ client_handle_transmit_close_ack (void *cls,
{
struct GNUNET_STREAM_Socket *socket = cls;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_WR);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_receive_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *receive_close_ack;
+
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ default:
+ break;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received RECEIVE_CLOSE from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ receive_close_ack =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ receive_close_ack->header.size =
+ htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ receive_close_ack->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
+ queue_message (socket,
+ receive_close_ack,
+ &set_state_closed,
+ NULL);
+
+ /* FIXME: Handle the case where write handle is present; the write operation
+ should be deemed as finised and the write continuation callback
+ has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
return GNUNET_OK;
}
@@ -1175,7 +1780,12 @@ client_handle_receive_close (void *cls,
{
struct GNUNET_STREAM_Socket *socket = cls;
- return GNUNET_OK;
+ return
+ handle_receive_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
}
@@ -1201,6 +1811,66 @@ client_handle_receive_close_ack (void *cls,
{
struct GNUNET_STREAM_Socket *socket = cls;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RD);
+}
+
+
+/**
+ * Generic handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
+ *
+ * @param socket the socket
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *close_ack;
+
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ default:
+ break;
+ }
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received CLOSE from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
+ queue_message (socket,
+ close_ack,
+ &set_state_closed,
+ NULL);
+ if (socket->state == STATE_CLOSED)
+ return GNUNET_OK;
+
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
return GNUNET_OK;
}
@@ -1227,7 +1897,11 @@ client_handle_close (void *cls,
{
struct GNUNET_STREAM_Socket *socket = cls;
- return GNUNET_OK;
+ return handle_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
}
@@ -1249,11 +1923,17 @@ client_handle_close_ack (void *cls,
void **tunnel_ctx,
const struct GNUNET_PeerIdentity *sender,
const struct GNUNET_MessageHeader *message,
- const struct GNUNET_ATS_Information*atsi)
+ const struct GNUNET_ATS_Information *atsi)
{
struct GNUNET_STREAM_Socket *socket = cls;
- return GNUNET_OK;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RDWR);
}
/*****************************/
@@ -1313,31 +1993,39 @@ server_handle_hello (void *cls,
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
struct GNUNET_STREAM_HelloAckMessage *reply;
+ if (0 != memcmp (sender,
+ &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received HELLO from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
+
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
+ ntohs (message->type));
GNUNET_assert (socket->tunnel == tunnel);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received HELLO from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+
if (STATE_INIT == socket->state)
- {
- /* Get the random sequence number */
- socket->write_sequence_number =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
- reply =
- GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
- reply->header.header.size =
- htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.header.type =
- htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
- reply->sequence_number = htonl (socket->write_sequence_number);
- queue_message (socket,
- &reply->header,
- &set_state_hello_wait,
- NULL);
- }
+ {
+ reply = generate_hello_ack_msg (socket);
+ queue_message (socket,
+ &reply->header,
+ &set_state_hello_wait,
+ NULL);
+ }
else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client sent HELLO when in state %d\n", socket->state);
- /* FIXME: Send RESET? */
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client sent HELLO when in state %d\n", socket->state);
+ /* FIXME: Send RESET? */
- }
+ }
return GNUNET_OK;
}
@@ -1365,23 +2053,33 @@ server_handle_hello_ack (void *cls,
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
const struct GNUNET_STREAM_HelloAckMessage *ack_message;
- ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK ==
+ ntohs (message->type));
GNUNET_assert (socket->tunnel == tunnel);
+ ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
if (STATE_HELLO_WAIT == socket->state)
- {
- socket->read_sequence_number = ntohl (ack_message->sequence_number);
- socket->receive_window_available =
- ntohl (ack_message->receive_window_size);
- /* Attain ESTABLISHED state */
- set_state_established (NULL, socket);
- }
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received HELLO_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ socket->read_sequence_number = ntohl (ack_message->sequence_number);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Read sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ (unsigned int) socket->read_sequence_number);
+ socket->receiver_window_available =
+ ntohl (ack_message->receiver_window_size);
+ /* Attain ESTABLISHED state */
+ set_state_established (NULL, socket);
+ }
else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Client sent HELLO_ACK when in state %d\n", socket->state);
- /* FIXME: Send RESET? */
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Client sent HELLO_ACK when in state %d\n", socket->state);
+ /* FIXME: Send RESET? */
- }
+ }
return GNUNET_OK;
}
@@ -1406,7 +2104,7 @@ server_handle_reset (void *cls,
const struct GNUNET_MessageHeader *message,
const struct GNUNET_ATS_Information*atsi)
{
- struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+ // struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
return GNUNET_OK;
}
@@ -1464,7 +2162,13 @@ server_handle_transmit_close_ack (void *cls,
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_WR);
}
@@ -1490,7 +2194,12 @@ server_handle_receive_close (void *cls,
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return
+ handle_receive_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
}
@@ -1516,14 +2225,21 @@ server_handle_receive_close_ack (void *cls,
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RD);
}
/**
* Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
*
- * @param cls the closure
+ * @param cls the listen socket (from GNUNET_MESH_connect in
+ * GNUNET_STREAM_listen)
* @param tunnel connection to the other end
* @param tunnel_ctx the socket
* @param sender who sent the message
@@ -1541,8 +2257,12 @@ server_handle_close (void *cls,
const struct GNUNET_ATS_Information*atsi)
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
-
- return GNUNET_OK;
+
+ return handle_close (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *) message,
+ atsi);
}
@@ -1568,12 +2288,18 @@ server_handle_close_ack (void *cls,
{
struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
- return GNUNET_OK;
+ return handle_generic_close_ack (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_MessageHeader *)
+ message,
+ atsi,
+ SHUT_RDWR);
}
/**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
*
* @param socket the socket through which the ack was received
* @param tunnel connection to the other end
@@ -1590,30 +2316,132 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
const struct GNUNET_STREAM_AckMessage *ack,
const struct GNUNET_ATS_Information*atsi)
{
+ unsigned int packet;
+ int need_retransmission;
+
+
+ if (0 != memcmp (sender,
+ &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received ACK from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_YES;
+ }
+
switch (socket->state)
+ {
+ case (STATE_ESTABLISHED):
+ case (STATE_RECEIVE_CLOSED):
+ case (STATE_RECEIVE_CLOSE_WAIT):
+ if (NULL == socket->write_handle)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA_ACK when write_handle is NULL\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ }
+ /* FIXME: increment in the base sequence number is breaking current flow
+ */
+ if (!((socket->write_sequence_number
+ - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA_ACK with unexpected base sequence number\n",
+ GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Current write sequence: %u; Ack's base sequence: %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ socket->write_sequence_number,
+ ntohl (ack->base_sequence_number));
+ return GNUNET_OK;
+ }
+ /* FIXME: include the case when write_handle is cancelled - ignore the
+ acks */
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Received DATA_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+
+ /* Cancel the retransmission task */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+ socket->retransmission_timeout_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (NULL == socket->write_handle->messages[packet]) break;
+ if (ntohl (ack->base_sequence_number)
+ >= ntohl (socket->write_handle->messages[packet]->sequence_number))
+ ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+ packet,
+ GNUNET_YES);
+ else
+ if (GNUNET_YES ==
+ ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
+ ntohl (socket->write_handle->messages[packet]->sequence_number)
+ - ntohl (ack->base_sequence_number)))
+ ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+ packet,
+ GNUNET_YES);
+ }
+
+ /* Update the receive window remaining
+ FIXME : Should update with the value from a data ack with greater
+ sequence number */
+ socket->receiver_window_available =
+ ntohl (ack->receive_window_remaining);
+
+ /* Check if we have received all acknowledgements */
+ need_retransmission = GNUNET_NO;
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (NULL == socket->write_handle->messages[packet]) break;
+ if (GNUNET_YES != ackbitmap_is_bit_set
+ (&socket->write_handle->ack_bitmap,packet))
+ {
+ need_retransmission = GNUNET_YES;
+ break;
+ }
+ }
+ if (GNUNET_YES == need_retransmission)
{
- case (STATE_ESTABLISHED):
- if (NULL == socket->write_handle)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Received DATA ACK when write_handle is NULL\n");
- return GNUNET_OK;
- }
-
- socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
- socket->receive_window_available =
- ntohl (ack->receive_window_remaining);
write_data (socket);
- break;
- default:
- break;
}
+ else /* We have to call the write continuation callback now */
+ {
+ /* Free the packets */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ GNUNET_free_non_null (socket->write_handle->messages[packet]);
+ }
+ if (NULL != socket->write_handle->write_cont)
+ socket->write_handle->write_cont
+ (socket->write_handle->write_cont_cls,
+ socket->status,
+ socket->write_handle->size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Write completion callback completed\n",
+ GNUNET_i2s (&socket->other_peer));
+ /* We are done with the write handle - Freeing it */
+ GNUNET_free (socket->write_handle);
+ socket->write_handle = NULL;
+ }
+ break;
+ default:
+ break;
+ }
return GNUNET_OK;
}
/**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
*
* @param cls the 'struct GNUNET_STREAM_Socket'
* @param tunnel connection to the other end
@@ -1640,7 +2468,7 @@ client_handle_ack (void *cls,
/**
- * Message Handler for mesh
+ * Handler for DATA_ACK messages
*
* @param cls the server's listen socket
* @param tunnel connection to the other end
@@ -1738,19 +2566,21 @@ mesh_peer_connect_callback (void *cls,
{
struct GNUNET_STREAM_Socket *socket = cls;
struct GNUNET_STREAM_MessageHeader *message;
-
- if (0 != memcmp (&socket->other_peer,
- peer,
+
+ if (0 != memcmp (peer,
+ &socket->other_peer,
sizeof (struct GNUNET_PeerIdentity)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "A peer (%s) which is not our target has connected to our tunnel",
- GNUNET_i2s (peer));
- return;
- }
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: A peer which is not our target has connected to our tunnel\n",
+ GNUNET_i2s(peer));
+ return;
+ }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Target peer %s connected\n", GNUNET_i2s (peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Target peer %s connected\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
/* Set state to INIT */
socket->state = STATE_INIT;
@@ -1766,14 +2596,10 @@ mesh_peer_connect_callback (void *cls,
/* Call open callback */
if (NULL == socket->open_cb)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "STREAM_open callback is NULL\n");
- }
- else
- {
- socket->open_cb (socket->open_cls, socket);
- }
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "STREAM_open callback is NULL\n");
+ }
}
@@ -1787,7 +2613,112 @@ static void
mesh_peer_disconnect_callback (void *cls,
const struct GNUNET_PeerIdentity *peer)
{
+ struct GNUNET_STREAM_Socket *socket=cls;
+
+ /* If the state is SHUTDOWN its ok; else set the state of the socket to SYSERR */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Other peer %s disconnected \n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+}
+
+/**
+ * Method called whenever a peer creates a tunnel to us
+ *
+ * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param atsi performance information for the tunnel
+ * @return initial tunnel context for the tunnel
+ * (can be NULL -- that's not an error)
+ */
+static void *
+new_tunnel_notify (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *initiator,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+ struct GNUNET_STREAM_Socket *socket;
+
+ /* FIXME: If a tunnel is already created, we should not accept new tunnels
+ from the same peer again until the socket is closed */
+
+ socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
+ socket->other_peer = *initiator;
+ socket->tunnel = tunnel;
+ socket->session_id = 0; /* FIXME */
+ socket->state = STATE_INIT;
+ socket->lsocket = lsocket;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Peer %s initiated tunnel to us\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+
+ /* FIXME: Copy MESH handle from lsocket to socket */
+
+ return socket;
+}
+
+
+/**
+ * Function called whenever an inbound tunnel is destroyed. Should clean up
+ * any associated state. This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ * with the tunnel is stored
+ */
+static void
+tunnel_cleaner (void *cls,
+ const struct GNUNET_MESH_Tunnel *tunnel,
+ void *tunnel_ctx)
+{
+ struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+
+ if (tunnel != socket->tunnel)
+ return;
+
+ GNUNET_break_op(0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Peer %s has terminated connection abruptly\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+
+ socket->status = GNUNET_STREAM_SHUTDOWN;
+
+ /* Clear Transmit handles */
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ if (NULL != socket->ack_transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
+ GNUNET_free (socket->ack_msg);
+ socket->ack_msg = NULL;
+ socket->ack_transmit_handle = NULL;
+ }
+ /* Stop Tasks using socket->tunnel */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+ socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* FIXME: Cancel all other tasks using socket->tunnel */
+ socket->tunnel = NULL;
}
@@ -1819,55 +2750,167 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
{
struct GNUNET_STREAM_Socket *socket;
enum GNUNET_STREAM_Option option;
+ GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
va_list vargs; /* Variable arguments */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s\n", __func__);
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *target;
socket->open_cb = open_cb;
socket->open_cls = open_cb_cls;
-
/* Set defaults */
socket->retransmit_timeout =
GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
-
va_start (vargs, open_cb_cls); /* Parse variable args */
do {
option = va_arg (vargs, enum GNUNET_STREAM_Option);
switch (option)
- {
- case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
- /* Expect struct GNUNET_TIME_Relative */
- socket->retransmit_timeout = va_arg (vargs,
- struct GNUNET_TIME_Relative);
- break;
- case GNUNET_STREAM_OPTION_END:
- break;
- }
+ {
+ case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
+ /* Expect struct GNUNET_TIME_Relative */
+ socket->retransmit_timeout = va_arg (vargs,
+ struct GNUNET_TIME_Relative);
+ break;
+ case GNUNET_STREAM_OPTION_END:
+ break;
+ }
} while (GNUNET_STREAM_OPTION_END != option);
va_end (vargs); /* End of variable args parsing */
-
socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
- 1, /* QUEUE size as parameter? */
+ 10, /* QUEUE size as parameter? */
socket, /* cls */
NULL, /* No inbound tunnel handler */
- NULL, /* No inbound tunnel cleaner */
+ NULL, /* No in-tunnel cleaner */
client_message_handlers,
- NULL); /* We don't get inbound tunnels */
- // FIXME: if (NULL == socket->mesh) ...
+ ports); /* We don't get inbound tunnels */
+ if (NULL == socket->mesh) /* Fail if we cannot connect to mesh */
+ {
+ GNUNET_free (socket);
+ return NULL;
+ }
/* Now create the mesh tunnel to target */
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Creating MESH Tunnel\n");
socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
NULL, /* Tunnel context */
&mesh_peer_connect_callback,
&mesh_peer_disconnect_callback,
socket);
- // FIXME: if (NULL == socket->tunnel) ...
-
+ GNUNET_assert (NULL != socket->tunnel);
+ GNUNET_MESH_peer_request_connect_add (socket->tunnel,
+ &socket->other_peer);
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s() END\n", __func__);
return socket;
}
/**
+ * Shutdown the stream for reading or writing (similar to man 2 shutdown).
+ *
+ * @param socket the stream socket
+ * @param operation SHUT_RD, SHUT_WR or SHUT_RDWR
+ * @param completion_cb the callback that will be called upon successful
+ * shutdown of given operation
+ * @param completion_cls the closure for the completion callback
+ * @return the shutdown handle
+ */
+struct GNUNET_STREAM_ShutdownHandle *
+GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
+ int operation,
+ GNUNET_STREAM_ShutdownCompletion completion_cb,
+ void *completion_cls)
+{
+ struct GNUNET_STREAM_ShutdownHandle *handle;
+ struct GNUNET_STREAM_MessageHeader *msg;
+
+ GNUNET_assert (NULL == socket->shutdown_handle);
+
+ handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
+ handle->socket = socket;
+ handle->completion_cb = completion_cb;
+ handle->completion_cls = completion_cls;
+ socket->shutdown_handle = handle;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ switch (operation)
+ {
+ case SHUT_RD:
+ handle->operation = SHUT_RD;
+ if (NULL != socket->read_handle)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Existing read handle should be cancelled before shutting"
+ " down reading\n");
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
+ queue_message (socket,
+ msg,
+ &set_state_receive_close_wait,
+ NULL);
+ break;
+ case SHUT_WR:
+ handle->operation = SHUT_WR;
+ if (NULL != socket->write_handle)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Existing write handle should be cancelled before shutting"
+ " down writing\n");
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
+ queue_message (socket,
+ msg,
+ &set_state_transmit_close_wait,
+ NULL);
+ break;
+ case SHUT_RDWR:
+ handle->operation = SHUT_RDWR;
+ if (NULL != socket->write_handle)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Existing write handle should be cancelled before shutting"
+ " down writing\n");
+ if (NULL != socket->read_handle)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Existing read handle should be cancelled before shutting"
+ " down reading\n");
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
+ queue_message (socket,
+ msg,
+ &set_state_close_wait,
+ NULL);
+ break;
+ default:
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "GNUNET_STREAM_shutdown called with invalid value for "
+ "parameter operation -- Ignoring\n");
+ GNUNET_free (msg);
+ GNUNET_free (handle);
+ return NULL;
+ }
+ handle->close_msg_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &close_msg_retransmission_task,
+ handle);
+ return handle;
+}
+
+
+/**
+ * Cancels a pending shutdown
+ *
+ * @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
+ */
+void
+GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
+ GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+ GNUNET_free (handle);
+ return;
+}
+
+
+/**
* Closes the stream
*
* @param socket the stream socket
@@ -1877,20 +2920,45 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
{
struct MessageQueue *head;
- if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
+ if (NULL != socket->read_handle)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Closing STREAM socket when a read handle is pending\n");
+ }
+ if (NULL != socket->write_handle)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Closing STREAM socket when a write handle is pending\n");
+ }
+
+ if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
/* socket closed with read task pending!? */
GNUNET_break (0);
- GNUNET_SCHEDULER_cancel (socket->read_task);
- socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->read_task_id);
+ socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ /* Terminate the ack'ing tasks if they are still present */
+ if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
}
/* Clear Transmit handles */
if (NULL != socket->transmit_handle)
- {
- GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
- socket->transmit_handle = NULL;
- }
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ if (NULL != socket->ack_transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
+ GNUNET_free (socket->ack_msg);
+ socket->ack_msg = NULL;
+ socket->ack_transmit_handle = NULL;
+ }
/* Clear existing message queue */
while (NULL != (head = socket->queue_head)) {
@@ -1903,101 +2971,29 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
/* Close associated tunnel */
if (NULL != socket->tunnel)
- {
- GNUNET_MESH_tunnel_destroy (socket->tunnel);
- socket->tunnel = NULL;
- }
+ {
+ GNUNET_MESH_tunnel_destroy (socket->tunnel);
+ socket->tunnel = NULL;
+ }
/* Close mesh connection */
- if (NULL != socket->mesh)
- {
- GNUNET_MESH_disconnect (socket->mesh);
- socket->mesh = NULL;
- }
+ if (NULL != socket->mesh && NULL == socket->lsocket)
+ {
+ GNUNET_MESH_disconnect (socket->mesh);
+ socket->mesh = NULL;
+ }
/* Release receive buffer */
if (NULL != socket->receive_buffer)
- {
- GNUNET_free (socket->receive_buffer);
- }
+ {
+ GNUNET_free (socket->receive_buffer);
+ }
GNUNET_free (socket);
}
/**
- * Method called whenever a peer creates a tunnel to us
- *
- * @param cls closure
- * @param tunnel new handle to the tunnel
- * @param initiator peer that started the tunnel
- * @param atsi performance information for the tunnel
- * @return initial tunnel context for the tunnel
- * (can be NULL -- that's not an error)
- */
-static void *
-new_tunnel_notify (void *cls,
- struct GNUNET_MESH_Tunnel *tunnel,
- const struct GNUNET_PeerIdentity *initiator,
- const struct GNUNET_ATS_Information *atsi)
-{
- struct GNUNET_STREAM_ListenSocket *lsocket = cls;
- struct GNUNET_STREAM_Socket *socket;
-
- socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
- socket->tunnel = tunnel;
- socket->session_id = 0; /* FIXME */
- socket->other_peer = *initiator;
- socket->state = STATE_INIT;
-
- if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
- socket,
- &socket->other_peer))
- {
- socket->state = STATE_CLOSED;
- /* FIXME: Send CLOSE message and then free */
- GNUNET_free (socket);
- GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
- }
- return socket;
-}
-
-
-/**
- * Function called whenever an inbound tunnel is destroyed. Should clean up
- * any associated state. This function is NOT called if the client has
- * explicitly asked for the tunnel to be destroyed using
- * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
- * the tunnel.
- *
- * @param cls closure (set from GNUNET_MESH_connect)
- * @param tunnel connection to the other end (henceforth invalid)
- * @param tunnel_ctx place where local state associated
- * with the tunnel is stored
- */
-static void
-tunnel_cleaner (void *cls,
- const struct GNUNET_MESH_Tunnel *tunnel,
- void *tunnel_ctx)
-{
- struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer %s has terminated connection abruptly\n",
- GNUNET_i2s (&socket->other_peer));
-
- socket->status = GNUNET_STREAM_SHUTDOWN;
- /* Clear Transmit handles */
- if (NULL != socket->transmit_handle)
- {
- GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
- socket->transmit_handle = NULL;
- }
- socket->tunnel = NULL;
-}
-
-
-/**
* Listens for stream connections for a specific application ports
*
* @param cfg the configuration to use
@@ -2015,10 +3011,8 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
{
/* FIXME: Add variable args for passing configration options? */
struct GNUNET_STREAM_ListenSocket *lsocket;
- GNUNET_MESH_ApplicationType app_types[2];
+ GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
- app_types[0] = app_port;
- app_types[1] = 0;
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
lsocket->port = app_port;
lsocket->listen_cb = listen_cb;
@@ -2029,7 +3023,8 @@ GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
&new_tunnel_notify,
&tunnel_cleaner,
server_message_handlers,
- app_types);
+ ports);
+ GNUNET_assert (NULL != lsocket->mesh);
return lsocket;
}
@@ -2043,6 +3038,7 @@ void
GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
{
/* Close MESH connection */
+ GNUNET_assert (NULL != lsocket->mesh);
GNUNET_MESH_disconnect (lsocket->mesh);
GNUNET_free (lsocket);
@@ -2050,15 +3046,22 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
/**
- * Tries to write the given data to the stream
+ * Tries to write the given data to the stream. The maximum size of data that
+ * can be written as part of a write operation is (64 * (64000 - sizeof (struct
+ * GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
+ * violation, however only the said number of maximum bytes will be written.
*
* @param socket the socket representing a stream
* @param data the data buffer from where the data is written into the stream
* @param size the number of bytes to be written from the data buffer
* @param timeout the timeout period
- * @param write_cont the function to call upon writing some bytes into the stream
+ * @param write_cont the function to call upon writing some bytes into the
+ * stream
* @param write_cont_cls the closure
- * @return handle to cancel the operation
+ *
+ * @return handle to cancel the operation; if a previous write is pending or
+ * the stream has been shutdown for this operation then write_cont is
+ * immediately called and NULL is returned.
*/
struct GNUNET_STREAM_IOWriteHandle *
GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
@@ -2075,6 +3078,10 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
uint32_t payload_size;
struct GNUNET_STREAM_DataMessage *data_msg;
const void *sweep;
+ struct GNUNET_TIME_Relative ack_deadline;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s\n", __func__);
/* Return NULL if there is already a write request pending */
if (NULL != socket->write_handle)
@@ -2082,70 +3089,101 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
GNUNET_break (0);
return NULL;
}
- if (!((STATE_ESTABLISHED == socket->state)
- || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
- || (STATE_RECEIVE_CLOSED == socket->state)))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Attempting to write on a closed (OR) not-yet-established"
- "stream\n");
- return NULL;
- }
+
+ switch (socket->state)
+ {
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ case STATE_CLOSED:
+ case STATE_CLOSE_WAIT:
+ if (NULL != write_cont)
+ write_cont (write_cont_cls, GNUNET_STREAM_SHUTDOWN, 0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s() END\n", __func__);
+ return NULL;
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ if (NULL != write_cont)
+ /* FIXME: GNUNET_STREAM_SYSERR?? */
+ write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s() END\n", __func__);
+ return NULL;
+ case STATE_ESTABLISHED:
+ case STATE_RECEIVE_CLOSED:
+ case STATE_RECEIVE_CLOSE_WAIT:
+ break;
+ }
+
if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+ io_handle->socket = socket;
+ io_handle->write_cont = write_cont;
+ io_handle->write_cont_cls = write_cont_cls;
+ io_handle->size = size;
sweep = data;
+ /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
+ determined from RTT */
+ ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
/* Divide the given buffer into packets for sending */
for (packet=0; packet < num_needed_packets; packet++)
+ {
+ if ((packet + 1) * max_payload_size < size)
+ {
+ payload_size = max_payload_size;
+ packet_size = MAX_PACKET_SIZE;
+ }
+ else
{
- if ((packet + 1) * max_payload_size < size)
- {
- payload_size = max_payload_size;
- packet_size = MAX_PACKET_SIZE;
- }
- else
- {
- payload_size = size - packet * max_payload_size;
- packet_size = payload_size + sizeof (struct
- GNUNET_STREAM_DataMessage);
- }
- io_handle->messages[packet] = GNUNET_malloc (packet_size);
- io_handle->messages[packet]->header.header.size = htons (packet_size);
- io_handle->messages[packet]->header.header.type =
- htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
- io_handle->messages[packet]->sequence_number =
- htons (socket->write_sequence_number++);
- io_handle->messages[packet]->offset = htons (socket->write_offset);
-
- /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
- determined from RTT */
- io_handle->messages[packet]->ack_deadline =
- GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5));
- data_msg = io_handle->messages[packet];
- /* Copy data from given buffer to the packet */
- memcpy (&data_msg[1],
- sweep,
- payload_size);
- sweep += payload_size;
- socket->write_offset += payload_size;
+ payload_size = size - packet * max_payload_size;
+ packet_size = payload_size + sizeof (struct
+ GNUNET_STREAM_DataMessage);
}
+ io_handle->messages[packet] = GNUNET_malloc (packet_size);
+ io_handle->messages[packet]->header.header.size = htons (packet_size);
+ io_handle->messages[packet]->header.header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
+ io_handle->messages[packet]->sequence_number =
+ htonl (socket->write_sequence_number++);
+ io_handle->messages[packet]->offset = htonl (socket->write_offset);
+
+ /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
+ determined from RTT */
+ io_handle->messages[packet]->ack_deadline =
+ GNUNET_TIME_relative_hton (ack_deadline);
+ data_msg = io_handle->messages[packet];
+ /* Copy data from given buffer to the packet */
+ memcpy (&data_msg[1],
+ sweep,
+ payload_size);
+ sweep += payload_size;
+ socket->write_offset += payload_size;
+ }
socket->write_handle = io_handle;
write_data (socket);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s() END\n", __func__);
+
return io_handle;
}
+
/**
- * Tries to read data from the stream
+ * Tries to read data from the stream.
*
* @param socket the socket representing a stream
* @param timeout the timeout period
* @param proc function to call with data (once only)
* @param proc_cls the closure for proc
- * @return handle to cancel the operation
+ *
+ * @return handle to cancel the operation; if the stream has been shutdown for
+ * this type of opeartion then the DataProcessor is immediately
+ * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
*/
struct GNUNET_STREAM_IOReadHandle *
GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
@@ -2155,26 +3193,99 @@ GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
{
struct GNUNET_STREAM_IOReadHandle *read_handle;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: %s()\n",
+ GNUNET_i2s (&socket->other_peer),
+ __func__);
+
/* Return NULL if there is already a read handle; the user has to cancel that
- first before continuing or has to wait until it is completed */
+ first before continuing or has to wait until it is completed */
if (NULL != socket->read_handle) return NULL;
+ GNUNET_assert (NULL != proc);
+
+ switch (socket->state)
+ {
+ case STATE_RECEIVE_CLOSED:
+ case STATE_RECEIVE_CLOSE_WAIT:
+ case STATE_CLOSED:
+ case STATE_CLOSE_WAIT:
+ proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: %s() END\n",
+ GNUNET_i2s (&socket->other_peer),
+ __func__);
+ return NULL;
+ default:
+ break;
+ }
+
read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
read_handle->proc = proc;
+ read_handle->proc_cls = proc_cls;
socket->read_handle = read_handle;
/* Check if we have a packet at bitmap 0 */
if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
0))
- {
- socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
+ {
+ socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
+ socket);
- }
+ }
/* Setup the read timeout task */
- socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
- &read_io_timeout,
- socket);
+ socket->read_io_timeout_task_id =
+ GNUNET_SCHEDULER_add_delayed (timeout,
+ &read_io_timeout,
+ socket);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: %s() END\n",
+ GNUNET_i2s (&socket->other_peer),
+ __func__);
return read_handle;
}
+
+
+/**
+ * Cancel pending write operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
+{
+ struct GNUNET_STREAM_Socket *socket = ioh->socket;
+ unsigned int packet;
+
+ GNUNET_assert (NULL != socket->write_handle);
+ GNUNET_assert (socket->write_handle == ioh);
+
+ if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
+ socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (NULL == ioh->messages[packet]) break;
+ GNUNET_free (ioh->messages[packet]);
+ }
+
+ GNUNET_free (socket->write_handle);
+ socket->write_handle = NULL;
+ return;
+}
+
+
+/**
+ * Cancel pending read operation.
+ *
+ * @param ioh handle to operation to cancel
+ */
+void
+GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
+{
+ return;
+}
diff --git a/src/stream/stream_protocol.h b/src/stream/stream_protocol.h
index 0c1987e..0c5376f 100644
--- a/src/stream/stream_protocol.h
+++ b/src/stream/stream_protocol.h
@@ -127,10 +127,8 @@ struct GNUNET_STREAM_AckMessage
GNUNET_STREAM_AckBitmap bitmap GNUNET_PACKED;
/**
- * The sequence number of the Data Message upto which the receiver has filled
- * its buffer without any missing packets
- *
- * FIXME: Do we need this?
+ * The sequence number of the next Data Message receiver is
+ * anticipating. Data messages less than this number are received by receiver
*/
uint32_t base_sequence_number GNUNET_PACKED;
@@ -163,7 +161,7 @@ struct GNUNET_STREAM_HelloAckMessage
*
* FIXME: Remove if not needed
*/
- uint32_t receive_window_size;
+ uint32_t receiver_window_size;
};
diff --git a/src/stream/test_stream_2peers.c b/src/stream/test_stream_2peers.c
new file mode 100644
index 0000000..1fdc0ee
--- /dev/null
+++ b/src/stream/test_stream_2peers.c
@@ -0,0 +1,560 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file stream/test_stream_2peers.c
+ * @brief Stream API testing between 2 peers using testing API
+ * @author Sree Harsha Totakura
+ */
+
+#include <string.h>
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_mesh_service.h"
+#include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
+
+#define VERBOSE 1
+
+/**
+ * Number of peers
+ */
+#define NUM_PEERS 2
+
+/**
+ * Structure for holding peer's sockets and IO Handles
+ */
+struct PeerData
+{
+ /**
+ * Peer's stream socket
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Peer's io write handle
+ */
+ struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
+
+ /**
+ * Peer's io read handle
+ */
+ struct GNUNET_STREAM_IOReadHandle *io_read_handle;
+
+ /**
+ * Peer's shutdown handle
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ /**
+ * Our Peer id
+ */
+ struct GNUNET_PeerIdentity our_id;
+
+ /**
+ * Bytes the peer has written
+ */
+ unsigned int bytes_wrote;
+
+ /**
+ * Byte the peer has read
+ */
+ unsigned int bytes_read;
+};
+
+/**
+ * The current peer group
+ */
+static struct GNUNET_TESTING_PeerGroup *pg;
+
+/**
+ * Peer 1 daemon
+ */
+static struct GNUNET_TESTING_Daemon *d1;
+
+/**
+ * Peer 2 daemon
+ */
+static struct GNUNET_TESTING_Daemon *d2;
+
+static struct PeerData peer1;
+static struct PeerData peer2;
+static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static struct GNUNET_CONFIGURATION_Handle *config;
+
+static GNUNET_SCHEDULER_TaskIdentifier abort_task;
+
+static char *data = "ABCD";
+static int result;
+
+static int writing_success;
+static int reading_success;
+
+
+/**
+ * Input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size);
+
+/**
+ * Task for calling STREAM_read
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_read_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ peer);
+ GNUNET_assert (NULL != peer->io_read_handle);
+}
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void
+write_completion (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size);
+
+
+/**
+ * Task for calling STREAM_write
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_write_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ peer->io_write_handle =
+ GNUNET_STREAM_write (peer->socket,
+ (void *) data,
+ strlen(data) - peer->bytes_wrote,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ peer);
+
+ GNUNET_assert (NULL != peer->io_write_handle);
+ }
+
+
+/**
+ * Check whether peers successfully shut down.
+ */
+static void
+peergroup_shutdown_callback (void *cls, const char *emsg)
+{
+ if (emsg != NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Shutdown of peers failed!\n");
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "All peers successfully shut down!\n");
+ }
+ GNUNET_CONFIGURATION_destroy (config);
+}
+
+
+/**
+ * Close sockets and stop testing deamons nicely
+ */
+static void
+do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (NULL != peer1.socket)
+ GNUNET_STREAM_close (peer1.socket);
+ if (NULL != peer2.socket)
+ GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2_listen_socket)
+ GNUNET_STREAM_listen_close (peer2_listen_socket);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
+ if (0 != abort_task)
+ {
+ GNUNET_SCHEDULER_cancel (abort_task);
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
+
+ GNUNET_TESTING_daemons_stop (pg,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &peergroup_shutdown_callback,
+ NULL);
+}
+
+
+/**
+ * Completion callback for shutdown
+ *
+ * @param cls the closure from GNUNET_STREAM_shutdown call
+ * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
+ * SHUT_RDWR)
+ */
+static void
+shutdown_completion (void *cls,
+ int operation)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "STREAM shutdown successful\n");
+ GNUNET_SCHEDULER_add_now (&do_close,
+ cls);
+}
+
+
+
+/**
+ * Shutdown sockets gracefully
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket,
+ SHUT_RDWR,
+ &shutdown_completion,
+ cls);
+}
+
+
+/**
+ * Something went wrong and timed out. Kill everything and set error flag
+ */
+static void
+do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
+ result = GNUNET_SYSERR;
+ abort_task = 0;
+ do_close (cls, tc);
+}
+
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void
+write_completion (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size)
+{
+ struct PeerData *peer=cls;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (size <= strlen (data));
+ peer->bytes_wrote += size;
+
+ if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
+ {
+ GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Writing completed\n");
+
+ if (&peer1 == peer) /* Peer1 has finished writing; should read now */
+ {
+ peer->bytes_read = 0;
+ GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
+ }
+ else
+ {
+ writing_success = GNUNET_YES;
+ if (GNUNET_YES == reading_success)
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ }
+ }
+}
+
+
+/**
+ * Function executed after stream has been established
+ *
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
+ */
+static void
+stream_open_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct PeerData *peer=cls;
+
+ GNUNET_assert (&peer1 == peer);
+ GNUNET_assert (socket == peer1.socket);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Stream established from peer1\n",
+ GNUNET_i2s (&peer1.our_id));
+ peer->bytes_wrote = 0;
+ GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
+}
+
+
+/**
+ * Input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size)
+{
+ struct PeerData *peer;
+
+ peer = (struct PeerData *) cls;
+
+ if (GNUNET_STREAM_TIMEOUT == status)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Read operation timedout - reading again!\n");
+ GNUNET_assert (0 == size);
+ GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
+ return 0;
+ }
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (size <= strlen (data));
+ GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
+ (const char *) input_data,
+ size));
+ peer->bytes_read += size;
+
+ if (peer->bytes_read < strlen (data))
+ {
+ GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
+ }
+ else
+ {
+ if (&peer2 == peer) /* Peer2 has completed reading; should write */
+ {
+ peer->bytes_wrote = 0;
+ GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
+ }
+ else /* Peer1 has completed reading. End of tests */
+ {
+ reading_success = GNUNET_YES;
+ if (GNUNET_YES == writing_success)
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ }
+ }
+ return size;
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other peers
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+stream_listen_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ GNUNET_assert (NULL != socket);
+ GNUNET_assert (NULL != initiator);
+ GNUNET_assert (socket != peer1.socket);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Peer connected: %s\n",
+ GNUNET_i2s (&peer2.our_id),
+ GNUNET_i2s(initiator));
+
+ peer2.socket = socket;
+ peer2.bytes_read = 0;
+ GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Callback to be called when testing peer group is ready
+ *
+ * @param cls NULL
+ * @param emsg NULL on success
+ */
+void
+peergroup_ready (void *cls, const char *emsg)
+{
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Starting peer group failed: %s\n", emsg);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer group is now ready\n");
+
+ GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
+
+ d1 = GNUNET_TESTING_daemon_get (pg, 0);
+ GNUNET_assert (NULL != d1);
+
+ d2 = GNUNET_TESTING_daemon_get (pg, 1);
+ GNUNET_assert (NULL != d2);
+
+ GNUNET_TESTING_get_peer_identity (d1->cfg,
+ &peer1.our_id);
+ GNUNET_TESTING_get_peer_identity (d2->cfg,
+ &peer2.our_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s : %s\n",
+ GNUNET_i2s (&peer1.our_id),
+ GNUNET_i2s (&d1->id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s : %s\n",
+ GNUNET_i2s (&peer2.our_id),
+ GNUNET_i2s (&d2->id));
+
+ peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg,
+ 10, /* App port */
+ &stream_listen_cb,
+ NULL);
+ GNUNET_assert (NULL != peer2_listen_socket);
+
+ /* Connect to stream library */
+ peer1.socket = GNUNET_STREAM_open (d1->cfg,
+ &d2->id, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
+/**
+ * Initialize framework and start test
+ */
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
+
+ GNUNET_log_setup ("test_stream_2peers",
+ "DEBUG",
+ NULL);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting test\n");
+ /* Duplicate the configuration */
+ config = GNUNET_CONFIGURATION_dup (cfg);
+
+ hosts = GNUNET_TESTING_hosts_load (config);
+
+ pg = GNUNET_TESTING_peergroup_start (config,
+ 2,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 3),
+ NULL,
+ &peergroup_ready,
+ NULL,
+ hosts);
+ GNUNET_assert (NULL != pg);
+
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
+ NULL);
+}
+
+/**
+ * Main function
+ */
+int main (int argc, char **argv)
+{
+ int ret;
+
+ char *argv2[] = { "test-stream-2peers",
+ "-L", "DEBUG",
+ "-c", "test_stream_local.conf",
+ NULL};
+
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_OPTION_END
+ };
+
+ ret =
+ GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
+ "test-stream-2peers", "nohelp", options, &run, NULL);
+
+ if (GNUNET_OK != ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
+ ret);
+ return 1;
+ }
+ if (GNUNET_SYSERR == result)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
+ return 1;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
+ return 0;
+}
diff --git a/src/stream/test_stream_2peers_halfclose.c b/src/stream/test_stream_2peers_halfclose.c
new file mode 100644
index 0000000..7997c20
--- /dev/null
+++ b/src/stream/test_stream_2peers_halfclose.c
@@ -0,0 +1,786 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file stream/test_stream_2peers_halfclose.c
+ * @brief Testcases for Stream API halfclosed connections between 2 peers
+ * @author Sree Harsha Totakura
+ */
+
+#include <string.h>
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_mesh_service.h"
+#include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_scheduler_lib.h"
+
+#define VERBOSE 1
+
+/**
+ * Number of peers
+ */
+#define NUM_PEERS 2
+
+/**
+ * Structure for holding peer's sockets and IO Handles
+ */
+struct PeerData
+{
+ /**
+ * Peer's stream socket
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Peer's io write handle
+ */
+ struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
+
+ /**
+ * Peer's io read handle
+ */
+ struct GNUNET_STREAM_IOReadHandle *io_read_handle;
+
+ /**
+ * Peer's shutdown handle
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ /**
+ * Our Peer id
+ */
+ struct GNUNET_PeerIdentity our_id;
+
+ /**
+ * Bytes the peer has written
+ */
+ unsigned int bytes_wrote;
+
+ /**
+ * Byte the peer has read
+ */
+ unsigned int bytes_read;
+
+ /**
+ * GNUNET_YES if the peer has successfully completed the current test
+ */
+ unsigned int test_ok;
+
+ /**
+ * The shutdown operation that has to be used by the stream_shutdown_task
+ */
+ int shutdown_operation;
+};
+
+/**
+ * The current peer group
+ */
+static struct GNUNET_TESTING_PeerGroup *pg;
+
+/**
+ * Peer 1 daemon
+ */
+static struct GNUNET_TESTING_Daemon *d1;
+
+/**
+ * Peer 2 daemon
+ */
+static struct GNUNET_TESTING_Daemon *d2;
+
+
+/**
+ * Peer1 writes first and then calls for SHUT_WR
+ * Peer2 reads first and then calls for SHUT_RD
+ * Attempt to write again by Peer1 should be rejected
+ * Attempt to read again by Peer2 should be rejected
+ * Peer1 then reads from Peer2 which writes
+ */
+static struct PeerData peer1;
+static struct PeerData peer2;
+static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static struct GNUNET_CONFIGURATION_Handle *config;
+
+static GNUNET_SCHEDULER_TaskIdentifier abort_task;
+static GNUNET_SCHEDULER_TaskIdentifier read_task;
+
+static char *data = "ABCD";
+static int result;
+
+/**
+ * Enumeration for various tests that are to be passed in the same order as
+ * below
+ */
+enum Test
+ {
+ /**
+ * Peer1 writing; Peer2 reading
+ */
+ PEER1_WRITE,
+
+ /**
+ * Peer1 write shutdown; Peer2 should get an error when it tries to read;
+ */
+ PEER1_WRITE_SHUTDOWN,
+
+ /**
+ * Peer1 reads; Peer2 writes (connection is halfclosed)
+ */
+ PEER1_HALFCLOSE_READ,
+
+ /**
+ * Peer1 attempts to write; Should fail with stream already shutdown error
+ */
+ PEER1_HALFCLOSE_WRITE_FAIL,
+
+ /**
+ * Peer1 read shutdown; Peer2 should get stream shutdown error during write
+ */
+ PEER1_READ_SHUTDOWN,
+
+ /**
+ * All tests successfully finished
+ */
+ SUCCESS
+ };
+
+/**
+ * Current running test
+ */
+enum Test current_test;
+
+/**
+ * Input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size);
+
+
+/**
+ * The transition function; responsible for the transitions among tests
+ */
+static void
+transition();
+
+
+/**
+ * Task for calling STREAM_read
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_read_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ cls);
+ switch (current_test)
+ {
+ case PEER1_WRITE_SHUTDOWN:
+ GNUNET_assert (&peer2 == peer);
+ GNUNET_assert (NULL == peer->io_read_handle);
+ transition (); /* to PEER1_HALFCLOSE_READ */
+ break;
+ default:
+ GNUNET_assert (NULL != peer->io_read_handle);
+ }
+}
+
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void
+write_completion (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size);
+
+
+/**
+ * Task for calling STREAM_write
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_write_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ peer->io_write_handle =
+ GNUNET_STREAM_write (peer->socket,
+ (void *) data,
+ strlen(data) - peer->bytes_wrote,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ peer);
+ switch (current_test)
+ {
+ case PEER1_HALFCLOSE_WRITE_FAIL:
+ GNUNET_assert (&peer1 == peer);
+ GNUNET_assert (NULL == peer->io_write_handle);
+ transition(); /* To PEER1_READ_SHUTDOWN */
+ break;
+ case PEER1_READ_SHUTDOWN:
+ GNUNET_assert (&peer2 == peer);
+ GNUNET_assert (NULL == peer->io_write_handle);
+ transition (); /* To SUCCESS */
+ break;
+ default:
+ GNUNET_assert (NULL != peer->io_write_handle);
+ }
+}
+
+
+/**
+ * Check whether peers successfully shut down.
+ */
+static void
+peergroup_shutdown_callback (void *cls, const char *emsg)
+{
+ if (emsg != NULL)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Shutdown of peers failed!\n");
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "All peers successfully shut down!\n");
+ }
+ GNUNET_CONFIGURATION_destroy (config);
+}
+
+
+/**
+ * Close sockets and stop testing deamons nicely
+ */
+static void
+do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (NULL != peer1.socket)
+ GNUNET_STREAM_close (peer1.socket);
+ if (NULL != peer2.socket)
+ GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2_listen_socket)
+ GNUNET_STREAM_listen_close (peer2_listen_socket);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
+ if (0 != abort_task)
+ {
+ GNUNET_SCHEDULER_cancel (abort_task);
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
+
+ GNUNET_TESTING_daemons_stop (pg,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &peergroup_shutdown_callback,
+ NULL);
+}
+
+
+/**
+ * Completion callback for shutdown
+ *
+ * @param cls the closure from GNUNET_STREAM_shutdown call
+ * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
+ * SHUT_RDWR)
+ */
+void
+shutdown_completion (void *cls,
+ int operation)
+{
+ switch (current_test)
+ {
+ case PEER1_WRITE:
+ GNUNET_assert (0);
+ case PEER1_WRITE_SHUTDOWN:
+ GNUNET_assert (cls == &peer1);
+ GNUNET_assert (SHUT_WR == operation);
+ peer1.test_ok = GNUNET_YES;
+ /* Peer2 should read with error */
+ peer2.bytes_read = 0;
+ GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+ break;
+ case PEER1_READ_SHUTDOWN:
+ peer1.test_ok = GNUNET_YES;
+ peer2.bytes_wrote = 0;
+ GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2);
+ break;
+ case PEER1_HALFCLOSE_READ:
+ case PEER1_HALFCLOSE_WRITE_FAIL:
+ case SUCCESS:
+ GNUNET_assert (0); /* We shouldn't reach here */
+ }
+}
+
+
+/**
+ * Task for calling STREAM_shutdown
+ *
+ * @param cls the peer entity
+ * @param tc the TaskContext
+ */
+static void
+stream_shutdown_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ peer->shutdown_handle = GNUNET_STREAM_shutdown (peer->socket,
+ peer->shutdown_operation,
+ &shutdown_completion,
+ peer);
+ GNUNET_assert (NULL != peer->shutdown_handle);
+}
+
+
+/**
+ * Something went wrong and timed out. Kill everything and set error flag
+ */
+static void
+do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
+ if (0 != read_task)
+ {
+ GNUNET_SCHEDULER_cancel (read_task);
+ }
+ result = GNUNET_SYSERR;
+ abort_task = 0;
+ do_close (cls, tc);
+}
+
+
+/**
+ * The transition function; responsible for the transitions among tests
+ */
+static void
+transition()
+{
+ if ((GNUNET_YES == peer1.test_ok) && (GNUNET_YES == peer2.test_ok))
+ {
+ peer1.test_ok = GNUNET_NO;
+ peer2.test_ok = GNUNET_NO;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "TEST %d SUCCESSFULL\n", current_test);
+ switch (current_test)
+ {
+ case PEER1_WRITE:
+ current_test = PEER1_WRITE_SHUTDOWN;
+ /* Peer1 should shutdown writing */
+ peer1.shutdown_operation = SHUT_WR;
+ GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1);
+ break;
+ case PEER1_WRITE_SHUTDOWN:
+ current_test = PEER1_HALFCLOSE_READ;
+ /* Peer2 should be able to write successfully */
+ peer2.bytes_wrote = 0;
+ GNUNET_SCHEDULER_add_now (&stream_write_task, &peer2);
+
+ /* Peer1 should be able to read successfully */
+ peer1.bytes_read = 0;
+ GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
+ break;
+ case PEER1_HALFCLOSE_READ:
+ current_test = PEER1_HALFCLOSE_WRITE_FAIL;
+ peer1.bytes_wrote = 0;
+ peer2.bytes_read = 0;
+ peer2.test_ok = GNUNET_YES;
+ GNUNET_SCHEDULER_add_now (&stream_write_task, &peer1);
+ break;
+ case PEER1_HALFCLOSE_WRITE_FAIL:
+ current_test = PEER1_READ_SHUTDOWN;
+ peer1.shutdown_operation = SHUT_RD;
+ GNUNET_SCHEDULER_add_now (&stream_shutdown_task, &peer1);
+ break;
+ case PEER1_READ_SHUTDOWN:
+ current_test = SUCCESS;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "All tests successful\n");
+ GNUNET_SCHEDULER_add_now (&do_close, NULL);
+ break;
+ case SUCCESS:
+ GNUNET_assert (0); /* We shouldn't reach here */
+
+ }
+ }
+}
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void
+write_completion (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size)
+{
+ struct PeerData *peer = cls;
+
+ switch (current_test)
+ {
+ case PEER1_WRITE:
+ case PEER1_HALFCLOSE_READ:
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (size <= strlen (data));
+ peer->bytes_wrote += size;
+
+ if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
+ {
+ GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Writing completed\n");
+
+ if (&peer1 == peer)
+ {
+ peer1.test_ok = GNUNET_YES;
+ transition (); /* to PEER1_WRITE_SHUTDOWN */
+ }
+ else /* This will happen during PEER1_HALFCLOSE_READ */
+ {
+ peer2.test_ok = GNUNET_YES;
+ transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */
+ }
+ }
+ break;
+ case PEER1_HALFCLOSE_WRITE_FAIL:
+ GNUNET_assert (peer == &peer1);
+ GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
+ GNUNET_assert (0 == size);
+ peer1.test_ok = GNUNET_YES;
+ break;
+ case PEER1_READ_SHUTDOWN:
+ GNUNET_assert (peer == &peer2);
+ GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
+ GNUNET_assert (0 == size);
+ peer2.test_ok = GNUNET_YES;
+ break;
+ case PEER1_WRITE_SHUTDOWN:
+ case SUCCESS:
+ GNUNET_assert (0); /* We shouldn't reach here */
+ }
+}
+
+
+/**
+ * Function executed after stream has been established
+ *
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
+ */
+static void
+stream_open_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct PeerData *peer;
+
+ GNUNET_assert (socket == peer1.socket);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Stream established from peer1\n",
+ GNUNET_i2s (&peer1.our_id));
+ peer = (struct PeerData *) cls;
+ peer->bytes_wrote = 0;
+ GNUNET_assert (socket == peer1.socket);
+ GNUNET_assert (socket == peer->socket);
+ peer1.test_ok = GNUNET_NO;
+ peer2.test_ok = GNUNET_NO;
+ current_test = PEER1_WRITE;
+ GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
+}
+
+
+/**
+ * Input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size)
+{
+ struct PeerData *peer;
+
+ peer = (struct PeerData *) cls;
+
+ switch (current_test)
+ {
+ case PEER1_WRITE:
+ case PEER1_HALFCLOSE_READ:
+ if (GNUNET_STREAM_TIMEOUT == status)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Read operation timedout - reading again!\n");
+ GNUNET_assert (0 == size);
+ GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
+ return 0;
+ }
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (size <= strlen (data));
+ GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
+ (const char *) input_data,
+ size));
+ peer->bytes_read += size;
+
+ if (peer->bytes_read < strlen (data))
+ {
+ GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
+ }
+ else
+ {
+ if (&peer2 == peer) /* Peer2 has completed reading; should write */
+ {
+ peer2.test_ok = GNUNET_YES;
+ transition (); /* Transition to PEER1_WRITE_SHUTDOWN */
+ }
+ else /* Peer1 has completed reading. End of tests */
+ {
+ peer1.test_ok = GNUNET_YES;
+ transition (); /* to PEER1_HALFCLOSE_WRITE_FAIL */
+ }
+ }
+ break;
+ case PEER1_WRITE_SHUTDOWN:
+ GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
+ peer2.test_ok = GNUNET_YES;
+ break;
+ case PEER1_HALFCLOSE_WRITE_FAIL:
+ case PEER1_READ_SHUTDOWN:
+ case SUCCESS:
+ GNUNET_assert (0); /* We shouldn't reach here */
+ }
+
+ return size;
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ read_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (NULL != cls);
+ peer2.bytes_read = 0;
+ GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other peers
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+stream_listen_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ GNUNET_assert (NULL != socket);
+ GNUNET_assert (NULL != initiator);
+ GNUNET_assert (socket != peer1.socket);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Peer connected: %s\n",
+ GNUNET_i2s (&peer2.our_id),
+ GNUNET_i2s(initiator));
+
+ peer2.socket = socket;
+ /* FIXME: reading should be done right now instead of a scheduled call */
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Callback to be called when testing peer group is ready
+ *
+ * @param cls NULL
+ * @param emsg NULL on success
+ */
+void
+peergroup_ready (void *cls, const char *emsg)
+{
+ if (NULL != emsg)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Starting peer group failed: %s\n", emsg);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer group is now ready\n");
+
+ GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
+
+ d1 = GNUNET_TESTING_daemon_get (pg, 0);
+ GNUNET_assert (NULL != d1);
+
+ d2 = GNUNET_TESTING_daemon_get (pg, 1);
+ GNUNET_assert (NULL != d2);
+
+ GNUNET_TESTING_get_peer_identity (d1->cfg,
+ &peer1.our_id);
+ GNUNET_TESTING_get_peer_identity (d2->cfg,
+ &peer2.our_id);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s : %s\n",
+ GNUNET_i2s (&peer1.our_id),
+ GNUNET_i2s (&d1->id));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s : %s\n",
+ GNUNET_i2s (&peer2.our_id),
+ GNUNET_i2s (&d2->id));
+
+ peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg,
+ 10, /* App port */
+ &stream_listen_cb,
+ NULL);
+ GNUNET_assert (NULL != peer2_listen_socket);
+
+ /* Connect to stream library */
+ peer1.socket = GNUNET_STREAM_open (d1->cfg,
+ &d2->id, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
+/**
+ * Initialize framework and start test
+ */
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
+
+ /* GNUNET_log_setup ("test_stream_local", */
+ /* "DEBUG", */
+ /* NULL); */
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Starting test\n");
+ /* Duplicate the configuration */
+ config = GNUNET_CONFIGURATION_dup (cfg);
+
+ hosts = GNUNET_TESTING_hosts_load (config);
+
+ pg = GNUNET_TESTING_peergroup_start (config,
+ 2,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 3),
+ NULL,
+ &peergroup_ready,
+ NULL,
+ hosts);
+ GNUNET_assert (NULL != pg);
+
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
+ NULL);
+}
+
+/**
+ * Main function
+ */
+int main (int argc, char **argv)
+{
+ int ret;
+
+ char *argv2[] = { "test-stream-2peers-halfclose",
+ "-L", "DEBUG",
+ "-c", "test_stream_local.conf",
+ NULL};
+
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_OPTION_END
+ };
+
+ ret =
+ GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
+ "test-stream-2peers-halfclose", "nohelp", options, &run, NULL);
+
+ if (GNUNET_OK != ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
+ ret);
+ return 1;
+ }
+ if (GNUNET_SYSERR == result)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
+ return 1;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
+ return 0;
+}
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c
index 4da1258..c9fab84 100644
--- a/src/stream/test_stream_local.c
+++ b/src/stream/test_stream_local.c
@@ -30,6 +30,7 @@
#include "gnunet_util_lib.h"
#include "gnunet_mesh_service.h"
#include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
#define VERBOSE 1
@@ -44,9 +45,14 @@ struct PeerData
struct GNUNET_STREAM_Socket *socket;
/**
- * Peer's io handle
+ * Peer's io write handle
*/
- struct GNUNET_STREAM_IOHandle *io_handle;
+ struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
+
+ /**
+ * Peer's io read handle
+ */
+ struct GNUNET_STREAM_IOReadHandle *io_read_handle;
/**
* Bytes the peer has written
@@ -63,14 +69,92 @@ static struct GNUNET_OS_Process *arm_pid;
static struct PeerData peer1;
static struct PeerData peer2;
static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static struct GNUNET_CONFIGURATION_Handle *config_peer1;
+static struct GNUNET_CONFIGURATION_Handle *config_peer2;
static GNUNET_SCHEDULER_TaskIdentifier abort_task;
static GNUNET_SCHEDULER_TaskIdentifier test_task;
-static GNUNET_SCHEDULER_TaskIdentifier read_task;
static char *data = "ABCD";
static int result;
+static int writing_success;
+static int reading_success;
+
+/**
+ * Input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size);
+
+/**
+ * Task for calling STREAM_read
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_read_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ peer->io_read_handle = GNUNET_STREAM_read (peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ peer);
+ GNUNET_assert (NULL != peer->io_read_handle);
+}
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void
+write_completion (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size);
+
+
+/**
+ * Task for calling STREAM_write
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_write_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ peer->io_write_handle =
+ GNUNET_STREAM_write (peer->socket,
+ (void *) data,
+ strlen(data) - peer->bytes_wrote,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ peer);
+
+ GNUNET_assert (NULL != peer->io_write_handle);
+ }
+
/**
* Shutdown nicely
*/
@@ -78,7 +162,10 @@ static void
do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_STREAM_close (peer1.socket);
- GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2.socket)
+ GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2_listen_socket)
+ GNUNET_STREAM_listen_close (peer2_listen_socket);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
if (0 != abort_task)
{
@@ -90,8 +177,11 @@ do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
+ /* Free the duplicated configuration */
+ GNUNET_CONFIGURATION_destroy (config_peer1);
+ GNUNET_CONFIGURATION_destroy (config_peer2);
GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid));
- GNUNET_OS_process_close (arm_pid);
+ GNUNET_OS_process_destroy (arm_pid);
}
@@ -106,10 +196,7 @@ do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_SCHEDULER_cancel (test_task);
}
- if (0 != read_task)
- {
- GNUNET_SCHEDULER_cancel (read_task);
- }
+
result = GNUNET_SYSERR;
abort_task = 0;
do_shutdown (cls, tc);
@@ -129,35 +216,31 @@ write_completion (void *cls,
enum GNUNET_STREAM_Status status,
size_t size)
{
- struct PeerData *peer;
+ struct PeerData *peer=cls;
- peer = (struct PeerData *) cls;
GNUNET_assert (GNUNET_STREAM_OK == status);
- GNUNET_assert (size < strlen (data));
+ GNUNET_assert (size <= strlen (data));
peer->bytes_wrote += size;
if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
{
- peer->io_handle = GNUNET_STREAM_write (peer->socket,
- (void *) data,
- strlen(data) - peer->bytes_wrote,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
- cls);
- GNUNET_assert (NULL != peer->io_handle);
+ GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
}
else
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Writing completed\n");
+
if (&peer1 == peer) /* Peer1 has finished writing; should read now */
{
- peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- cls);
- GNUNET_assert (NULL!=peer->io_handle);
+ peer->bytes_read = 0;
+ GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
+ }
+ else
+ {
+ writing_success = GNUNET_YES;
+ if (GNUNET_YES == reading_success)
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
}
}
}
@@ -173,20 +256,15 @@ static void
stream_open_cb (void *cls,
struct GNUNET_STREAM_Socket *socket)
{
- struct PeerData *peer;
+ struct PeerData *peer=cls;
- peer = (struct PeerData *) cls;
- peer->bytes_wrote = 0;
+ GNUNET_assert (&peer1 == peer);
GNUNET_assert (socket == peer1.socket);
GNUNET_assert (socket == peer->socket);
- peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */
- (void *) data, /* data */
- strlen(data),
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
- cls);
- GNUNET_assert (NULL != peer->io_handle);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
+ peer->bytes_wrote = 0;
+ GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
}
@@ -206,44 +284,31 @@ input_processor (void *cls,
const void *input_data,
size_t size)
{
- struct PeerData *peer;
+ struct PeerData *peer = cls;
- peer = (struct PeerData *) cls;
-
- GNUNET_assert (GNUNET_STERAM_OK == status);
- GNUNET_assert (size < strlen (data));
- GNUNET_assert (strncmp ((const char *) data + peer->bytes_read,
- (const char *) input_data,
- size));
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (size <= strlen (data));
+ GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
+ (const char *) input_data,
+ size));
peer->bytes_read += size;
if (peer->bytes_read < strlen (data))
{
- peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- cls);
- GNUNET_assert (NULL != peer->io_handle);
+ GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
}
else
{
if (&peer2 == peer) /* Peer2 has completed reading; should write */
{
peer->bytes_wrote = 0;
- peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *)
- peer->socket,
- (void *) data,
- strlen(data),
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &write_completion,
- cls);
+ GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
}
else /* Peer1 has completed reading. End of tests */
{
- GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ reading_success = GNUNET_YES;
+ if (GNUNET_YES == writing_success)
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
}
}
return size;
@@ -251,29 +316,9 @@ input_processor (void *cls,
/**
- * Scheduler call back; to be executed when a new stream is connected
- * Called from listen connect for peer2
- */
-static void
-stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- read_task = GNUNET_SCHEDULER_NO_TASK;
- GNUNET_assert (NULL != cls);
- peer2.bytes_read = 0;
- GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
- peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &input_processor,
- (void *) &peer2);
- GNUNET_assert (NULL != peer2.io_handle);
-}
-
-
-/**
* Functions of this type are called upon new stream connection from other peers
*
- * @param cls the closure from GNUNET_STREAM_listen
+ * @param cls the PeerData of peer2
* @param socket the socket representing the stream
* @param initiator the identity of the peer who wants to establish a stream
* with us
@@ -285,35 +330,60 @@ stream_listen_cb (void *cls,
struct GNUNET_STREAM_Socket *socket,
const struct GNUNET_PeerIdentity *initiator)
{
+ struct PeerData *peer=cls;
+ struct GNUNET_PeerIdentity self;
+
GNUNET_assert (NULL != socket);
- GNUNET_assert (NULL == initiator); /* Local peer=NULL? */
GNUNET_assert (socket != peer1.socket);
+ GNUNET_assert (&peer2 == peer);
+
+ /* Get our identity */
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1,
+ &self));
+ GNUNET_assert (0 == memcmp (&self,
+ initiator,
+ sizeof (struct GNUNET_PeerIdentity)));
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer connected: %s\n", GNUNET_i2s(initiator));
- peer2.socket = socket;
- read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
+ peer->socket = socket;
+ peer->bytes_read = 0;
+ GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
return GNUNET_OK;
}
/**
* Testing function
+ *
+ * @param cls NULL
+ * @param tc the task context
*/
static void
test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct GNUNET_PeerIdentity self;
+
test_task = GNUNET_SCHEDULER_NO_TASK;
+ /* Get our identity */
+ GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1,
+ &self));
+
+ peer2_listen_socket = GNUNET_STREAM_listen (config_peer2,
+ 10, /* App port */
+ &stream_listen_cb,
+ &peer2);
+ GNUNET_assert (NULL != peer2_listen_socket);
/* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */
+ peer1.socket = GNUNET_STREAM_open (config_peer1,
+ &self, /* Null for local peer? */
10, /* App port */
&stream_open_cb,
- (void *) &peer1);
+ &peer1,
+ GNUNET_STREAM_OPTION_END);
GNUNET_assert (NULL != peer1.socket);
- peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */
- &stream_listen_cb,
- NULL);
- GNUNET_assert (NULL != peer2_listen_socket);
-
}
/**
@@ -330,6 +400,9 @@ run (void *cls, char *const *args, const char *cfgfile,
"WARNING",
#endif
NULL);
+ /* Duplicate the configuration */
+ config_peer1 = GNUNET_CONFIGURATION_dup (cfg);
+ config_peer2 = GNUNET_CONFIGURATION_dup (cfg);
arm_pid =
GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
"gnunet-service-arm",
@@ -340,11 +413,10 @@ run (void *cls, char *const *args, const char *cfgfile,
abort_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort,
+ (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
NULL);
-
- test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg);
-
+
+ test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
}
/**
@@ -355,7 +427,7 @@ int main (int argc, char **argv)
int ret;
char *const argv2[] = { "test-stream-local",
- "-c", "test_stream.conf",
+ "-c", "test_stream_local.conf",
#if VERBOSE
"-L", "DEBUG",
#endif
@@ -365,7 +437,7 @@ int main (int argc, char **argv)
struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_OPTION_END
};
-
+
ret =
GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
"test-stream-local", "nohelp", options, &run, NULL);
@@ -381,6 +453,6 @@ int main (int argc, char **argv)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
return 1;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
return 0;
}
diff --git a/src/stream/test_stream_local.conf b/src/stream/test_stream_local.conf
index 3d955f0..884ecbc 100644
--- a/src/stream/test_stream_local.conf
+++ b/src/stream/test_stream_local.conf
@@ -9,7 +9,7 @@ DEBUG = YES
AUTOSTART = YES
ACCEPT_FROM = 127.0.0.1;
HOSTNAME = localhost
-PORT = 10511
+PORT = 10700
# PREFIX = valgrind --leak-check=full
# PREFIX = xterm -geometry 100x85 -T peer1 -e gdb --args
@@ -19,7 +19,7 @@ AUTOSTART = YES
ACCEPT_FROM6 = ::1;
ACCEPT_FROM = 127.0.0.1;
HOSTNAME = localhost
-PORT = 2100
+PORT = 12100
[block]
plugins = dht test
@@ -53,14 +53,20 @@ TIMEOUT = 300 s
PORT = 12368
[TESTING]
+NUM_PEERS = 5
WEAKRANDOM = YES
+DEBUG = YES
+HOSTKEYSFILE = ../../contrib/testing_hostkeys.dat
+MAX_CONCURRENT_SSH = 10
+USE_PROGRESSBARS = YES
+PEERGROUP_TIMEOUT = 2400 s
[gnunetd]
HOSTKEY = $SERVICEHOME/.hostkey
[PATHS]
-DEFAULTCONFIG = test_mesh.conf
-SERVICEHOME = /tmp/test-mesh/
+DEFAULTCONFIG = test_stream_local.conf
+SERVICEHOME = /tmp/test-stream/
[dns]
AUTOSTART = NO