aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/Makefile.am58
-rw-r--r--src/stream/Makefile.in245
-rw-r--r--src/stream/perf_stream_api.c1047
-rw-r--r--src/stream/stream.h (renamed from src/stream/stream_protocol.h)31
-rw-r--r--src/stream/stream_api.c1814
-rw-r--r--src/stream/test_stream_2peers.c449
-rw-r--r--src/stream/test_stream_2peers_halfclose.c488
-rw-r--r--src/stream/test_stream_big.c429
-rw-r--r--src/stream/test_stream_local.c225
-rw-r--r--src/stream/test_stream_local.conf37
-rw-r--r--src/stream/test_stream_sequence_wraparound.c425
11 files changed, 3987 insertions, 1261 deletions
diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am
index 8d74417..0994697 100644
--- a/src/stream/Makefile.am
+++ b/src/stream/Makefile.am
@@ -12,22 +12,39 @@ endif
lib_LTLIBRARIES = libgnunetstream.la
libgnunetstream_la_SOURCES = \
- stream_api.c stream_protocol.h
+ stream_api.c stream.h
libgnunetstream_la_LIBADD = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/lockmanager/libgnunetlockmanager.la \
+ $(top_builddir)/src/statistics/libgnunetstatistics.la \
$(top_builddir)/src/util/libgnunetutil.la $(XLIB)
libgnunetstream_la_LDFLAGS = \
- $(GN_LIB_LDFLAGS)
+ $(GN_LIB_LDFLAGS) \
+ -version-info 1:0:0
+
+if HAVE_BENCHMARKS
+ STREAM_BENCHMARKS = \
+ perf_stream_api
+endif
check_PROGRAMS = \
- test-stream-2peers \
- test-stream-2peers_halfclose \
- test-stream-local
+ test_stream_2peers \
+ test_stream_2peers_halfclose \
+ test_stream_local \
+ test_stream_big \
+ test_stream_sequence_wraparound \
+ $(STREAM_BENCHMARKS)
EXTRA_DIST = test_stream_local.conf
if ENABLE_TEST_RUN
-TESTS = $(check_PROGRAMS)
+TESTS = \
+ test_stream_2peers \
+ test_stream_2peers_halfclose \
+ test_stream_local \
+ test_stream_big \
+ test_stream_sequence_wraparound \
+ $(STREAM_BENCHMARKS)
endif
test_stream_2peers_SOURCES = \
@@ -35,18 +52,41 @@ test_stream_2peers_SOURCES = \
test_stream_2peers_LDADD = \
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/util/libgnunetutil.la \
- $(top_builddir)/src/testing/libgnunettesting.la
+ $(top_builddir)/src/testbed/libgnunettestbed.la
+
test_stream_2peers_halfclose_SOURCES = \
test_stream_2peers_halfclose.c
test_stream_2peers_halfclose_LDADD = \
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/util/libgnunetutil.la \
- $(top_builddir)/src/testing/libgnunettesting.la
+ $(top_builddir)/src/testbed/libgnunettestbed.la
test_stream_local_SOURCES = \
test_stream_local.c
test_stream_local_LDADD = \
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/util/libgnunetutil.la \
- $(top_builddir)/src/testing/libgnunettesting.la \ No newline at end of file
+ $(top_builddir)/src/testing/libgnunettesting.la
+
+test_stream_big_SOURCES = \
+ test_stream_big.c
+test_stream_big_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+
+test_stream_sequence_wraparound_SOURCES = \
+ test_stream_sequence_wraparound.c
+test_stream_sequence_wraparound_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+
+perf_stream_api_SOURCES = \
+ perf_stream_api.c
+perf_stream_api_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/testbed/libgnunettestbed.la \ No newline at end of file
diff --git a/src/stream/Makefile.in b/src/stream/Makefile.in
index 9f1c278..2422602 100644
--- a/src/stream/Makefile.in
+++ b/src/stream/Makefile.in
@@ -1,9 +1,9 @@
-# Makefile.in generated by automake 1.11.1 from Makefile.am.
+# Makefile.in generated by automake 1.11.6 from Makefile.am.
# @configure_input@
# Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
-# 2003, 2004, 2005, 2006, 2007, 2008, 2009 Free Software Foundation,
-# Inc.
+# 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011 Free Software
+# Foundation, Inc.
# This Makefile.in is free software; the Free Software Foundation
# gives unlimited permission to copy and/or distribute it,
# with or without modifications, as long as this notice is preserved.
@@ -16,6 +16,23 @@
@SET_MAKE@
VPATH = @srcdir@
+am__make_dryrun = \
+ { \
+ am__dry=no; \
+ case $$MAKEFLAGS in \
+ *\\[\ \ ]*) \
+ echo 'am--echo: ; @echo "AM" OK' | $(MAKE) -f - 2>/dev/null \
+ | grep '^AM OK$$' >/dev/null || am__dry=yes;; \
+ *) \
+ for am__flg in $$MAKEFLAGS; do \
+ case $$am__flg in \
+ *=*|--*) ;; \
+ *n*) am__dry=yes; break;; \
+ esac; \
+ done;; \
+ esac; \
+ test $$am__dry = yes; \
+ }
pkgdatadir = $(datadir)/@PACKAGE@
pkgincludedir = $(includedir)/@PACKAGE@
pkglibdir = $(libdir)/@PACKAGE@
@@ -35,22 +52,30 @@ POST_UNINSTALL = :
build_triplet = @build@
host_triplet = @host@
target_triplet = @target@
-check_PROGRAMS = test-stream-2peers$(EXEEXT) \
- test-stream-2peers_halfclose$(EXEEXT) \
- test-stream-local$(EXEEXT)
+check_PROGRAMS = test_stream_2peers$(EXEEXT) \
+ test_stream_2peers_halfclose$(EXEEXT) \
+ test_stream_local$(EXEEXT) test_stream_big$(EXEEXT) \
+ test_stream_sequence_wraparound$(EXEEXT) $(am__EXEEXT_1)
+@ENABLE_TEST_RUN_TRUE@TESTS = test_stream_2peers$(EXEEXT) \
+@ENABLE_TEST_RUN_TRUE@ test_stream_2peers_halfclose$(EXEEXT) \
+@ENABLE_TEST_RUN_TRUE@ test_stream_local$(EXEEXT) \
+@ENABLE_TEST_RUN_TRUE@ test_stream_big$(EXEEXT) \
+@ENABLE_TEST_RUN_TRUE@ test_stream_sequence_wraparound$(EXEEXT) \
+@ENABLE_TEST_RUN_TRUE@ $(am__EXEEXT_1)
subdir = src/stream
DIST_COMMON = README $(srcdir)/Makefile.am $(srcdir)/Makefile.in
ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
am__aclocal_m4_deps = $(top_srcdir)/m4/absolute-header.m4 \
$(top_srcdir)/m4/align.m4 $(top_srcdir)/m4/argz.m4 \
- $(top_srcdir)/m4/gettext.m4 $(top_srcdir)/m4/iconv.m4 \
- $(top_srcdir)/m4/lib-ld.m4 $(top_srcdir)/m4/lib-link.m4 \
- $(top_srcdir)/m4/lib-prefix.m4 $(top_srcdir)/m4/libcurl.m4 \
- $(top_srcdir)/m4/libgcrypt.m4 $(top_srcdir)/m4/libtool.m4 \
- $(top_srcdir)/m4/libunistring.m4 $(top_srcdir)/m4/ltdl.m4 \
- $(top_srcdir)/m4/ltoptions.m4 $(top_srcdir)/m4/ltsugar.m4 \
- $(top_srcdir)/m4/ltversion.m4 $(top_srcdir)/m4/lt~obsolete.m4 \
- $(top_srcdir)/m4/nls.m4 $(top_srcdir)/m4/po.m4 \
+ $(top_srcdir)/m4/gettext.m4 $(top_srcdir)/m4/glib-2.0.m4 \
+ $(top_srcdir)/m4/iconv.m4 $(top_srcdir)/m4/lib-ld.m4 \
+ $(top_srcdir)/m4/lib-link.m4 $(top_srcdir)/m4/lib-prefix.m4 \
+ $(top_srcdir)/m4/libcurl.m4 $(top_srcdir)/m4/libgcrypt.m4 \
+ $(top_srcdir)/m4/libtool.m4 $(top_srcdir)/m4/libunistring.m4 \
+ $(top_srcdir)/m4/ltdl.m4 $(top_srcdir)/m4/ltoptions.m4 \
+ $(top_srcdir)/m4/ltsugar.m4 $(top_srcdir)/m4/ltversion.m4 \
+ $(top_srcdir)/m4/lt~obsolete.m4 $(top_srcdir)/m4/nls.m4 \
+ $(top_srcdir)/m4/pkg.m4 $(top_srcdir)/m4/po.m4 \
$(top_srcdir)/m4/progtest.m4 $(top_srcdir)/acinclude.m4 \
$(top_srcdir)/configure.ac
am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
@@ -80,28 +105,44 @@ am__nobase_list = $(am__nobase_strip_setup); \
am__base_list = \
sed '$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;s/\n/ /g' | \
sed '$$!N;$$!N;$$!N;$$!N;s/\n/ /g'
+am__uninstall_files_from_dir = { \
+ test -z "$$files" \
+ || { test ! -d "$$dir" && test ! -f "$$dir" && test ! -r "$$dir"; } \
+ || { echo " ( cd '$$dir' && rm -f" $$files ")"; \
+ $(am__cd) "$$dir" && rm -f $$files; }; \
+ }
am__installdirs = "$(DESTDIR)$(libdir)"
LTLIBRARIES = $(lib_LTLIBRARIES)
am__DEPENDENCIES_1 =
libgnunetstream_la_DEPENDENCIES = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/lockmanager/libgnunetlockmanager.la \
+ $(top_builddir)/src/statistics/libgnunetstatistics.la \
$(top_builddir)/src/util/libgnunetutil.la \
$(am__DEPENDENCIES_1)
am_libgnunetstream_la_OBJECTS = stream_api.lo
libgnunetstream_la_OBJECTS = $(am_libgnunetstream_la_OBJECTS)
-AM_V_lt = $(am__v_lt_$(V))
-am__v_lt_ = $(am__v_lt_$(AM_DEFAULT_VERBOSITY))
+AM_V_lt = $(am__v_lt_@AM_V@)
+am__v_lt_ = $(am__v_lt_@AM_DEFAULT_V@)
am__v_lt_0 = --silent
libgnunetstream_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \
$(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CCLD) \
$(AM_CFLAGS) $(CFLAGS) $(libgnunetstream_la_LDFLAGS) \
$(LDFLAGS) -o $@
+@HAVE_BENCHMARKS_TRUE@am__EXEEXT_1 = perf_stream_api$(EXEEXT)
+am_perf_stream_api_OBJECTS = perf_stream_api.$(OBJEXT)
+perf_stream_api_OBJECTS = $(am_perf_stream_api_OBJECTS)
+perf_stream_api_DEPENDENCIES = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/testbed/libgnunettestbed.la
am_test_stream_2peers_OBJECTS = test_stream_2peers.$(OBJEXT)
test_stream_2peers_OBJECTS = $(am_test_stream_2peers_OBJECTS)
test_stream_2peers_DEPENDENCIES = \
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/util/libgnunetutil.la \
- $(top_builddir)/src/testing/libgnunettesting.la
+ $(top_builddir)/src/testbed/libgnunettestbed.la
am_test_stream_2peers_halfclose_OBJECTS = \
test_stream_2peers_halfclose.$(OBJEXT)
test_stream_2peers_halfclose_OBJECTS = \
@@ -109,6 +150,12 @@ test_stream_2peers_halfclose_OBJECTS = \
test_stream_2peers_halfclose_DEPENDENCIES = \
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testbed/libgnunettestbed.la
+am_test_stream_big_OBJECTS = test_stream_big.$(OBJEXT)
+test_stream_big_OBJECTS = $(am_test_stream_big_OBJECTS)
+test_stream_big_DEPENDENCIES = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/testing/libgnunettesting.la
am_test_stream_local_OBJECTS = test_stream_local.$(OBJEXT)
test_stream_local_OBJECTS = $(am_test_stream_local_OBJECTS)
@@ -116,6 +163,14 @@ test_stream_local_DEPENDENCIES = \
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/testing/libgnunettesting.la
+am_test_stream_sequence_wraparound_OBJECTS = \
+ test_stream_sequence_wraparound.$(OBJEXT)
+test_stream_sequence_wraparound_OBJECTS = \
+ $(am_test_stream_sequence_wraparound_OBJECTS)
+test_stream_sequence_wraparound_DEPENDENCIES = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir)
depcomp = $(SHELL) $(top_srcdir)/depcomp
am__depfiles_maybe = depfiles
@@ -126,29 +181,37 @@ LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
$(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) \
$(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \
$(AM_CFLAGS) $(CFLAGS)
-AM_V_CC = $(am__v_CC_$(V))
-am__v_CC_ = $(am__v_CC_$(AM_DEFAULT_VERBOSITY))
+AM_V_CC = $(am__v_CC_@AM_V@)
+am__v_CC_ = $(am__v_CC_@AM_DEFAULT_V@)
am__v_CC_0 = @echo " CC " $@;
-AM_V_at = $(am__v_at_$(V))
-am__v_at_ = $(am__v_at_$(AM_DEFAULT_VERBOSITY))
+AM_V_at = $(am__v_at_@AM_V@)
+am__v_at_ = $(am__v_at_@AM_DEFAULT_V@)
am__v_at_0 = @
CCLD = $(CC)
LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
$(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
$(AM_LDFLAGS) $(LDFLAGS) -o $@
-AM_V_CCLD = $(am__v_CCLD_$(V))
-am__v_CCLD_ = $(am__v_CCLD_$(AM_DEFAULT_VERBOSITY))
+AM_V_CCLD = $(am__v_CCLD_@AM_V@)
+am__v_CCLD_ = $(am__v_CCLD_@AM_DEFAULT_V@)
am__v_CCLD_0 = @echo " CCLD " $@;
-AM_V_GEN = $(am__v_GEN_$(V))
-am__v_GEN_ = $(am__v_GEN_$(AM_DEFAULT_VERBOSITY))
+AM_V_GEN = $(am__v_GEN_@AM_V@)
+am__v_GEN_ = $(am__v_GEN_@AM_DEFAULT_V@)
am__v_GEN_0 = @echo " GEN " $@;
-SOURCES = $(libgnunetstream_la_SOURCES) $(test_stream_2peers_SOURCES) \
+SOURCES = $(libgnunetstream_la_SOURCES) $(perf_stream_api_SOURCES) \
+ $(test_stream_2peers_SOURCES) \
$(test_stream_2peers_halfclose_SOURCES) \
- $(test_stream_local_SOURCES)
+ $(test_stream_big_SOURCES) $(test_stream_local_SOURCES) \
+ $(test_stream_sequence_wraparound_SOURCES)
DIST_SOURCES = $(libgnunetstream_la_SOURCES) \
- $(test_stream_2peers_SOURCES) \
+ $(perf_stream_api_SOURCES) $(test_stream_2peers_SOURCES) \
$(test_stream_2peers_halfclose_SOURCES) \
- $(test_stream_local_SOURCES)
+ $(test_stream_big_SOURCES) $(test_stream_local_SOURCES) \
+ $(test_stream_sequence_wraparound_SOURCES)
+am__can_run_installinfo = \
+ case $$AM_UPDATE_INFO_DIR in \
+ n|no|NO) false;; \
+ *) (install-info --version) >/dev/null 2>&1;; \
+ esac
ETAGS = etags
CTAGS = ctags
am__tty_colors = \
@@ -189,6 +252,10 @@ EXEEXT = @EXEEXT@
EXT_LIBS = @EXT_LIBS@
EXT_LIB_PATH = @EXT_LIB_PATH@
FGREP = @FGREP@
+GLIB_CFLAGS = @GLIB_CFLAGS@
+GLIB_GENMARSHAL = @GLIB_GENMARSHAL@
+GLIB_LIBS = @GLIB_LIBS@
+GLIB_MKENUMS = @GLIB_MKENUMS@
GMSGFMT = @GMSGFMT@
GMSGFMT_015 = @GMSGFMT_015@
GNUNETDNS_GROUP = @GNUNETDNS_GROUP@
@@ -199,6 +266,7 @@ GN_LIBINTL = @GN_LIBINTL@
GN_LIB_LDFLAGS = @GN_LIB_LDFLAGS@
GN_PLUGIN_LDFLAGS = @GN_PLUGIN_LDFLAGS@
GN_USER_HOME_DIR = @GN_USER_HOME_DIR@
+GOBJECT_QUERY = @GOBJECT_QUERY@
GREP = @GREP@
HAVE_LIBUNISTRING = @HAVE_LIBUNISTRING@
INCLTDL = @INCLTDL@
@@ -221,6 +289,8 @@ LIBCURL_CPPFLAGS = @LIBCURL_CPPFLAGS@
LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@
LIBGCRYPT_CONFIG = @LIBGCRYPT_CONFIG@
LIBGCRYPT_LIBS = @LIBGCRYPT_LIBS@
+LIBGTOP_CFLAGS = @LIBGTOP_CFLAGS@
+LIBGTOP_LIBS = @LIBGTOP_LIBS@
LIBICONV = @LIBICONV@
LIBINTL = @LIBINTL@
LIBLTDL = @LIBLTDL@
@@ -242,6 +312,7 @@ LT_CONFIG_H = @LT_CONFIG_H@
LT_DLLOADERS = @LT_DLLOADERS@
LT_DLPREOPEN = @LT_DLPREOPEN@
MAKEINFO = @MAKEINFO@
+MANIFEST_TOOL = @MANIFEST_TOOL@
MKDIR_P = @MKDIR_P@
MONKEYPREFIX = @MONKEYPREFIX@
MSGFMT = @MSGFMT@
@@ -251,6 +322,7 @@ MYSQL_CPPFLAGS = @MYSQL_CPPFLAGS@
MYSQL_LDFLAGS = @MYSQL_LDFLAGS@
NM = @NM@
NMEDIT = @NMEDIT@
+NSS_DIR = @NSS_DIR@
OBJC = @OBJC@
OBJCDEPMODE = @OBJCDEPMODE@
OBJCFLAGS = @OBJCFLAGS@
@@ -266,6 +338,7 @@ PACKAGE_TARNAME = @PACKAGE_TARNAME@
PACKAGE_URL = @PACKAGE_URL@
PACKAGE_VERSION = @PACKAGE_VERSION@
PATH_SEPARATOR = @PATH_SEPARATOR@
+PKG_CONFIG = @PKG_CONFIG@
POSTGRES_CPPFLAGS = @POSTGRES_CPPFLAGS@
POSTGRES_LDFLAGS = @POSTGRES_LDFLAGS@
POSUB = @POSUB@
@@ -297,6 +370,7 @@ abs_builddir = @abs_builddir@
abs_srcdir = @abs_srcdir@
abs_top_builddir = @abs_top_builddir@
abs_top_srcdir = @abs_top_srcdir@
+ac_ct_AR = @ac_ct_AR@
ac_ct_CC = @ac_ct_CC@
ac_ct_CXX = @ac_ct_CXX@
ac_ct_DUMPBIN = @ac_ct_DUMPBIN@
@@ -319,6 +393,7 @@ datarootdir = @datarootdir@
docdir = @docdir@
dvidir = @dvidir@
exec_prefix = @exec_prefix@
+gitcommand = @gitcommand@
host = @host@
host_alias = @host_alias@
host_cpu = @host_cpu@
@@ -332,7 +407,6 @@ libdir = @libdir@
libexecdir = @libexecdir@
localedir = @localedir@
localstatedir = @localstatedir@
-lt_ECHO = @lt_ECHO@
ltdl_LIBOBJS = @ltdl_LIBOBJS@
ltdl_LTLIBOBJS = @ltdl_LTLIBOBJS@
mandir = @mandir@
@@ -350,6 +424,7 @@ sbindir = @sbindir@
sharedstatedir = @sharedstatedir@
srcdir = @srcdir@
subdirs = @subdirs@
+svnversioncommand = @svnversioncommand@
sys_symbol_underscore = @sys_symbol_underscore@
sysconfdir = @sysconfdir@
target = @target@
@@ -366,24 +441,29 @@ INCLUDES = -I$(top_srcdir)/src/include
@USE_COVERAGE_TRUE@XLIB = -lgcov
lib_LTLIBRARIES = libgnunetstream.la
libgnunetstream_la_SOURCES = \
- stream_api.c stream_protocol.h
+ stream_api.c stream.h
libgnunetstream_la_LIBADD = \
$(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/lockmanager/libgnunetlockmanager.la \
+ $(top_builddir)/src/statistics/libgnunetstatistics.la \
$(top_builddir)/src/util/libgnunetutil.la $(XLIB)
libgnunetstream_la_LDFLAGS = \
- $(GN_LIB_LDFLAGS)
+ $(GN_LIB_LDFLAGS) \
+ -version-info 1:0:0
+
+@HAVE_BENCHMARKS_TRUE@STREAM_BENCHMARKS = \
+@HAVE_BENCHMARKS_TRUE@ perf_stream_api
EXTRA_DIST = test_stream_local.conf
-@ENABLE_TEST_RUN_TRUE@TESTS = $(check_PROGRAMS)
test_stream_2peers_SOURCES = \
test_stream_2peers.c
test_stream_2peers_LDADD = \
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/util/libgnunetutil.la \
- $(top_builddir)/src/testing/libgnunettesting.la
+ $(top_builddir)/src/testbed/libgnunettestbed.la
test_stream_2peers_halfclose_SOURCES = \
test_stream_2peers_halfclose.c
@@ -391,7 +471,7 @@ test_stream_2peers_halfclose_SOURCES = \
test_stream_2peers_halfclose_LDADD = \
$(top_builddir)/src/stream/libgnunetstream.la \
$(top_builddir)/src/util/libgnunetutil.la \
- $(top_builddir)/src/testing/libgnunettesting.la
+ $(top_builddir)/src/testbed/libgnunettestbed.la
test_stream_local_SOURCES = \
test_stream_local.c
@@ -401,6 +481,31 @@ test_stream_local_LDADD = \
$(top_builddir)/src/util/libgnunetutil.la \
$(top_builddir)/src/testing/libgnunettesting.la
+test_stream_big_SOURCES = \
+ test_stream_big.c
+
+test_stream_big_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+
+test_stream_sequence_wraparound_SOURCES = \
+ test_stream_sequence_wraparound.c
+
+test_stream_sequence_wraparound_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la
+
+perf_stream_api_SOURCES = \
+ perf_stream_api.c
+
+perf_stream_api_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/testbed/libgnunettestbed.la
+
all: all-am
.SUFFIXES:
@@ -437,7 +542,6 @@ $(ACLOCAL_M4): $(am__aclocal_m4_deps)
$(am__aclocal_m4_deps):
install-libLTLIBRARIES: $(lib_LTLIBRARIES)
@$(NORMAL_INSTALL)
- test -z "$(libdir)" || $(MKDIR_P) "$(DESTDIR)$(libdir)"
@list='$(lib_LTLIBRARIES)'; test -n "$(libdir)" || list=; \
list2=; for p in $$list; do \
if test -f $$p; then \
@@ -445,6 +549,8 @@ install-libLTLIBRARIES: $(lib_LTLIBRARIES)
else :; fi; \
done; \
test -z "$$list2" || { \
+ echo " $(MKDIR_P) '$(DESTDIR)$(libdir)'"; \
+ $(MKDIR_P) "$(DESTDIR)$(libdir)" || exit 1; \
echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(libdir)'"; \
$(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(libdir)"; \
}
@@ -466,7 +572,7 @@ clean-libLTLIBRARIES:
echo "rm -f \"$${dir}/so_locations\""; \
rm -f "$${dir}/so_locations"; \
done
-libgnunetstream.la: $(libgnunetstream_la_OBJECTS) $(libgnunetstream_la_DEPENDENCIES)
+libgnunetstream.la: $(libgnunetstream_la_OBJECTS) $(libgnunetstream_la_DEPENDENCIES) $(EXTRA_libgnunetstream_la_DEPENDENCIES)
$(AM_V_CCLD)$(libgnunetstream_la_LINK) -rpath $(libdir) $(libgnunetstream_la_OBJECTS) $(libgnunetstream_la_LIBADD) $(LIBS)
clean-checkPROGRAMS:
@@ -477,15 +583,24 @@ clean-checkPROGRAMS:
list=`for p in $$list; do echo "$$p"; done | sed 's/$(EXEEXT)$$//'`; \
echo " rm -f" $$list; \
rm -f $$list
-test-stream-2peers$(EXEEXT): $(test_stream_2peers_OBJECTS) $(test_stream_2peers_DEPENDENCIES)
- @rm -f test-stream-2peers$(EXEEXT)
+perf_stream_api$(EXEEXT): $(perf_stream_api_OBJECTS) $(perf_stream_api_DEPENDENCIES) $(EXTRA_perf_stream_api_DEPENDENCIES)
+ @rm -f perf_stream_api$(EXEEXT)
+ $(AM_V_CCLD)$(LINK) $(perf_stream_api_OBJECTS) $(perf_stream_api_LDADD) $(LIBS)
+test_stream_2peers$(EXEEXT): $(test_stream_2peers_OBJECTS) $(test_stream_2peers_DEPENDENCIES) $(EXTRA_test_stream_2peers_DEPENDENCIES)
+ @rm -f test_stream_2peers$(EXEEXT)
$(AM_V_CCLD)$(LINK) $(test_stream_2peers_OBJECTS) $(test_stream_2peers_LDADD) $(LIBS)
-test-stream-2peers_halfclose$(EXEEXT): $(test_stream_2peers_halfclose_OBJECTS) $(test_stream_2peers_halfclose_DEPENDENCIES)
- @rm -f test-stream-2peers_halfclose$(EXEEXT)
+test_stream_2peers_halfclose$(EXEEXT): $(test_stream_2peers_halfclose_OBJECTS) $(test_stream_2peers_halfclose_DEPENDENCIES) $(EXTRA_test_stream_2peers_halfclose_DEPENDENCIES)
+ @rm -f test_stream_2peers_halfclose$(EXEEXT)
$(AM_V_CCLD)$(LINK) $(test_stream_2peers_halfclose_OBJECTS) $(test_stream_2peers_halfclose_LDADD) $(LIBS)
-test-stream-local$(EXEEXT): $(test_stream_local_OBJECTS) $(test_stream_local_DEPENDENCIES)
- @rm -f test-stream-local$(EXEEXT)
+test_stream_big$(EXEEXT): $(test_stream_big_OBJECTS) $(test_stream_big_DEPENDENCIES) $(EXTRA_test_stream_big_DEPENDENCIES)
+ @rm -f test_stream_big$(EXEEXT)
+ $(AM_V_CCLD)$(LINK) $(test_stream_big_OBJECTS) $(test_stream_big_LDADD) $(LIBS)
+test_stream_local$(EXEEXT): $(test_stream_local_OBJECTS) $(test_stream_local_DEPENDENCIES) $(EXTRA_test_stream_local_DEPENDENCIES)
+ @rm -f test_stream_local$(EXEEXT)
$(AM_V_CCLD)$(LINK) $(test_stream_local_OBJECTS) $(test_stream_local_LDADD) $(LIBS)
+test_stream_sequence_wraparound$(EXEEXT): $(test_stream_sequence_wraparound_OBJECTS) $(test_stream_sequence_wraparound_DEPENDENCIES) $(EXTRA_test_stream_sequence_wraparound_DEPENDENCIES)
+ @rm -f test_stream_sequence_wraparound$(EXEEXT)
+ $(AM_V_CCLD)$(LINK) $(test_stream_sequence_wraparound_OBJECTS) $(test_stream_sequence_wraparound_LDADD) $(LIBS)
mostlyclean-compile:
-rm -f *.$(OBJEXT)
@@ -493,34 +608,34 @@ mostlyclean-compile:
distclean-compile:
-rm -f *.tab.c
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/perf_stream_api.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream_api.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_2peers.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_2peers_halfclose.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_big.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_local.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_sequence_wraparound.Po@am__quote@
.c.o:
@am__fastdepCC_TRUE@ $(AM_V_CC)$(COMPILE) -MT $@ -MD -MP -MF $(DEPDIR)/$*.Tpo -c -o $@ $<
@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/$*.Tpo $(DEPDIR)/$*.Po
-@am__fastdepCC_FALSE@ $(AM_V_CC) @AM_BACKSLASH@
-@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
-@am__fastdepCC_FALSE@ $(COMPILE) -c $<
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c $<
.c.obj:
@am__fastdepCC_TRUE@ $(AM_V_CC)$(COMPILE) -MT $@ -MD -MP -MF $(DEPDIR)/$*.Tpo -c -o $@ `$(CYGPATH_W) '$<'`
@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/$*.Tpo $(DEPDIR)/$*.Po
-@am__fastdepCC_FALSE@ $(AM_V_CC) @AM_BACKSLASH@
-@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
-@am__fastdepCC_FALSE@ $(COMPILE) -c `$(CYGPATH_W) '$<'`
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(COMPILE) -c `$(CYGPATH_W) '$<'`
.c.lo:
@am__fastdepCC_TRUE@ $(AM_V_CC)$(LTCOMPILE) -MT $@ -MD -MP -MF $(DEPDIR)/$*.Tpo -c -o $@ $<
@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/$*.Tpo $(DEPDIR)/$*.Plo
-@am__fastdepCC_FALSE@ $(AM_V_CC) @AM_BACKSLASH@
-@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ $(AM_V_CC)source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@
@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
-@am__fastdepCC_FALSE@ $(LTCOMPILE) -c -o $@ $<
+@am__fastdepCC_FALSE@ $(AM_V_CC@am__nodep@)$(LTCOMPILE) -c -o $@ $<
mostlyclean-libtool:
-rm -f *.lo
@@ -661,14 +776,15 @@ check-TESTS: $(TESTS)
fi; \
dashes=`echo "$$dashes" | sed s/./=/g`; \
if test "$$failed" -eq 0; then \
- echo "$$grn$$dashes"; \
+ col="$$grn"; \
else \
- echo "$$red$$dashes"; \
+ col="$$red"; \
fi; \
- echo "$$banner"; \
- test -z "$$skipped" || echo "$$skipped"; \
- test -z "$$report" || echo "$$report"; \
- echo "$$dashes$$std"; \
+ echo "$${col}$$dashes$${std}"; \
+ echo "$${col}$$banner$${std}"; \
+ test -z "$$skipped" || echo "$${col}$$skipped$${std}"; \
+ test -z "$$report" || echo "$${col}$$report$${std}"; \
+ echo "$${col}$$dashes$${std}"; \
test "$$failed" -eq 0; \
else :; fi
@@ -721,10 +837,15 @@ install-am: all-am
installcheck: installcheck-am
install-strip:
- $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
- install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
- `test -z '$(STRIP)' || \
- echo "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'"` install
+ if test -z '$(STRIP)'; then \
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ install; \
+ else \
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'" install; \
+ fi
mostlyclean-generic:
clean-generic:
diff --git a/src/stream/perf_stream_api.c b/src/stream/perf_stream_api.c
new file mode 100644
index 0000000..aa534ce
--- /dev/null
+++ b/src/stream/perf_stream_api.c
@@ -0,0 +1,1047 @@
+ /*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file stream/perf_stream_api.c
+ * @brief performance benchmarks for stream api
+ * @author Sree Harsha Totakura
+ */
+
+#define LOG(kind, ...) \
+ GNUNET_log (kind, __VA_ARGS__);
+
+/****************************************************************************************/
+/* Test is setup into the following major steps: */
+/* 1. Measurements over loopback (1 hop). i.e. we use only one peer and open */
+/* stream connections over loopback. Messages will go through */
+/* STREAM_API->MESH_API->MESH_SERVICE->MESH_API->STREAM_API. */
+/* 2. Measurements over 2 peers (2 hops). We use testbed to create 2 peers, */
+/* connect them and then create stream connections. Messages will go through */
+/* STREAM_API->MESH_API->MESH_SERVICE->CORE1.....CORE2->MESH_API->STREAM_API */
+/* 3. Measurements over 3 peers (3 hops). We use testbed to create 3 peers, */
+/* connect them in a line topology: peer1->peer2->peer3. Messages will go */
+/* through */
+/* STREAM_API->MESH_API->MESH_SERVICE->CORE1..CORE2..CORE3->MESH_API->STREAM_API. */
+/****************************************************************************************/
+
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_testbed_service.h"
+#include "gnunet_stream_lib.h"
+
+/**
+ * Simple struct to keep track of progress, and print a
+ * nice little percentage meter for long running tasks.
+ */
+struct ProgressMeter
+{
+ unsigned int total;
+
+ unsigned int modnum;
+
+ unsigned int dotnum;
+
+ unsigned int completed;
+
+ int print;
+
+ char *startup_string;
+};
+
+
+/**
+ * Steps in testing
+ */
+enum TestStep
+{
+ /**
+ * Single hop loopback testing
+ */
+ TEST_STEP_1_HOP,
+
+ /**
+ * Testing with 2 peers
+ */
+ TEST_STEP_2_HOP,
+
+ /**
+ * Testing with 3 peers
+ */
+ TEST_STEP_3_HOP
+};
+
+
+/**
+ * Structure for holding peer's sockets and IO Handles
+ */
+struct PeerData
+{
+ /**
+ * Peer's stream socket
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Peer's io write handle
+ */
+ struct GNUNET_STREAM_WriteHandle *io_write_handle;
+
+ /**
+ * Peer's io read handle
+ */
+ struct GNUNET_STREAM_ReadHandle *io_read_handle;
+
+ /**
+ * The peer handle when we use the testbed servie
+ */
+ struct GNUNET_TESTBED_Peer *peer;
+
+ /**
+ * Handle to peer specific opearations while using testbed service
+ */
+ struct GNUNET_TESTBED_Operation *op;
+
+ /**
+ * The identity of this peer
+ */
+ struct GNUNET_PeerIdentity id;
+
+ /**
+ * Peer's shutdown handle
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ /**
+ * Bytes the peer has written
+ */
+ size_t bytes_wrote;
+
+ /**
+ * Byte the peer has read
+ */
+ size_t bytes_read;
+
+ /**
+ * number of packets sent
+ */
+ unsigned int packets_wrote;
+
+ /**
+ * number of packets read
+ */
+ unsigned int packets_read;
+};
+
+
+/**
+ * Enumeration of stages in this testing
+ */
+enum TestStage
+{
+ /**
+ * The initial stage
+ */
+ INIT,
+
+ /**
+ * Uplink testing stage
+ */
+ UPLINK_OK,
+
+ /**
+ * Downlink testing stage
+ */
+ DOWNLINK_OK
+};
+
+
+/**
+ * Maximum size of the data which we will transfer during tests
+ */
+#define DATA_SIZE 5000000 /* 5mB */
+
+/**
+ * Fixed number of packets we send in each direction during each subtest
+ */
+#define MAX_PACKETS 1000
+
+/**
+ * Listen socket of peer2
+ */
+struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+
+/**
+ * Handle to configuration during TEST_STEP_1_HOP
+ */
+const struct GNUNET_CONFIGURATION_Handle *config;
+
+/**
+ * Handle for the progress meter
+ */
+static struct ProgressMeter *meter;
+
+/**
+ * Placeholder for peer data
+ */
+static struct PeerData peer_data[3];
+
+/**
+ * Handle to common operations while using testbed
+ */
+static struct GNUNET_TESTBED_Operation *common_op;
+
+/**
+ * Task ID for abort task
+ */
+static GNUNET_SCHEDULER_TaskIdentifier abort_task;
+
+/**
+ * Task ID for write task
+ */
+static GNUNET_SCHEDULER_TaskIdentifier write_task;
+
+/**
+ * Task ID for read task
+ */
+static GNUNET_SCHEDULER_TaskIdentifier read_task;
+
+/**
+ * Absolute time when profiling starts
+ */
+static struct GNUNET_TIME_Absolute prof_start_time;
+
+/**
+ * Test time taken for sending the data
+ */
+static struct GNUNET_TIME_Relative prof_time;
+
+/**
+ * Random data block. Should generate data first
+ */
+static uint32_t data[DATA_SIZE / 4];
+
+/**
+ * Payload sizes to test each major test with
+ */
+static uint16_t payload_size[] =
+{ 20, 500, 2000, 7000, 13000, 25000, 30000};//, 50000, 60000, 63000, 64000 };
+
+/**
+ * Current step of testing
+ */
+static enum TestStep test_step;
+
+/**
+ * Index for choosing payload size
+ */
+static unsigned int payload_size_index;
+
+/**
+ * Number of peers we want to create while using the testbed service
+ */
+static int num_peers;
+
+/**
+ * Flag to indicate that the other peer should reset its data read source index
+ */
+static int reset_read;
+
+/**
+ * Testing result of a major test
+ */
+static enum TestStage result;
+
+/**
+ * Create a meter to keep track of the progress of some task.
+ *
+ * @param total the total number of items to complete
+ * @param start_string a string to prefix the meter with (if printing)
+ * @param print GNUNET_YES to print the meter, GNUNET_NO to count
+ * internally only
+ *
+ * @return the progress meter
+ */
+static struct ProgressMeter *
+create_meter (unsigned int total, char *start_string, int print)
+{
+ struct ProgressMeter *ret;
+
+ ret = GNUNET_malloc (sizeof (struct ProgressMeter));
+ ret->print = print;
+ ret->total = total;
+ ret->modnum = total / 4;
+ if (ret->modnum == 0) /* Divide by zero check */
+ ret->modnum = 1;
+ ret->dotnum = (total / 50) + 1;
+ if (start_string != NULL)
+ ret->startup_string = GNUNET_strdup (start_string);
+ else
+ ret->startup_string = GNUNET_strdup ("");
+
+ return ret;
+}
+
+
+/**
+ * Update progress meter (increment by one).
+ *
+ * @param meter the meter to update and print info for
+ *
+ * @return GNUNET_YES if called the total requested,
+ * GNUNET_NO if more items expected
+ */
+static int
+update_meter (struct ProgressMeter *meter)
+{
+ if (meter->print == GNUNET_YES)
+ {
+ if (meter->completed % meter->modnum == 0)
+ {
+ if (meter->completed == 0)
+ {
+ FPRINTF (stdout, "%sProgress: [0%%", meter->startup_string);
+ }
+ else
+ FPRINTF (stdout, "%d%%",
+ (int) (((float) meter->completed / meter->total) * 100));
+ }
+ else if (meter->completed % meter->dotnum == 0)
+ FPRINTF (stdout, "%s", ".");
+
+ if (meter->completed + 1 == meter->total)
+ FPRINTF (stdout, "%d%%]\n", 100);
+ fflush (stdout);
+ }
+ meter->completed++;
+
+ if (meter->completed == meter->total)
+ return GNUNET_YES;
+ if (meter->completed > meter->total)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Progress meter overflow!!\n");
+ return GNUNET_NO;
+}
+
+
+/**
+ * Reset progress meter.
+ *
+ * @param meter the meter to reset
+ *
+ * @return GNUNET_YES if meter reset,
+ * GNUNET_SYSERR on error
+ */
+static int
+reset_meter (struct ProgressMeter *meter)
+{
+ if (meter == NULL)
+ return GNUNET_SYSERR;
+
+ meter->completed = 0;
+ return GNUNET_YES;
+}
+
+
+/**
+ * Release resources for meter
+ *
+ * @param meter the meter to free
+ */
+static void
+free_meter (struct ProgressMeter *meter)
+{
+ GNUNET_free_non_null (meter->startup_string);
+ GNUNET_free (meter);
+}
+
+
+/**
+ * Shutdown nicely
+ */
+static void
+do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ switch (test_step)
+ {
+ case TEST_STEP_1_HOP:
+ if (NULL != peer_data[0].socket)
+ GNUNET_STREAM_close (peer_data[0].socket);
+ if (NULL != peer_data[1].socket)
+ GNUNET_STREAM_close (peer_data[1].socket);
+ if (NULL != peer2_listen_socket)
+ GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
+ break;
+ case TEST_STEP_2_HOP:
+ if (NULL != peer_data[1].socket)
+ GNUNET_STREAM_close (peer_data[1].socket);
+ if (NULL != peer_data[0].op)
+ GNUNET_TESTBED_operation_done (peer_data[0].op);
+ if (NULL != peer_data[1].op)
+ GNUNET_TESTBED_operation_done (peer_data[1].op);
+ break;
+ case TEST_STEP_3_HOP:
+ GNUNET_break (0);
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ if (GNUNET_SCHEDULER_NO_TASK != write_task)
+ GNUNET_SCHEDULER_cancel (write_task);
+ GNUNET_SCHEDULER_shutdown (); /* Shutdown this testcase */
+ if (NULL != meter)
+ {
+ free_meter (meter);
+ meter = NULL;
+ }
+}
+
+
+/**
+ * Something went wrong and timed out. Kill everything and set error flag
+ */
+static void
+do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ abort_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test: ABORT\n");
+ if (GNUNET_SCHEDULER_NO_TASK != read_task)
+ GNUNET_SCHEDULER_cancel (read_task);
+ result = GNUNET_SYSERR;
+ do_close (cls, tc);
+}
+
+
+/**
+ * Completion callback for shutdown
+ *
+ * @param cls the closure from GNUNET_STREAM_shutdown call
+ * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
+ * SHUT_RDWR)
+ */
+static void
+shutdown_completion (void *cls,
+ int operation)
+{
+ static int shutdowns;
+
+ if (++shutdowns == 1)
+ {
+ peer_data[0].shutdown_handle = NULL;
+ peer_data[1].shutdown_handle = GNUNET_STREAM_shutdown (peer_data[1].socket, SHUT_RDWR,
+ &shutdown_completion, cls);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
+ GNUNET_SCHEDULER_add_now (&do_close, cls);
+}
+
+
+/**
+ * Shutdown sockets gracefully
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ peer_data[0].shutdown_handle = GNUNET_STREAM_shutdown (peer_data[0].socket, SHUT_RDWR,
+ &shutdown_completion, cls);
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Task for calling STREAM_write with a chunk of random data
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes written
+ */
+static void
+write_completion (void *cls, enum GNUNET_STREAM_Status status, size_t size)
+{
+ struct PeerData *pdata = cls;
+ double throughput;
+ double prof_time_sec;
+ unsigned int packets_wrote;
+
+ if (GNUNET_STREAM_OK != status)
+ {
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
+ return;
+ }
+ GNUNET_assert (size <= DATA_SIZE);
+ packets_wrote = (size + payload_size[payload_size_index] - 1)
+ / payload_size[payload_size_index];
+ pdata->bytes_wrote += size;
+ for (;packets_wrote > 0; packets_wrote--)
+ {
+ update_meter (meter);
+ pdata->packets_wrote++;
+ }
+ if (pdata->packets_wrote < MAX_PACKETS) /* Have more data to send */
+ {
+ size_t write_amount;
+
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ {
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
+ NULL);
+ }
+ write_amount = (MAX_PACKETS - pdata->packets_wrote) *
+ payload_size[payload_size_index];
+ if (write_amount > DATA_SIZE)
+ write_amount = DATA_SIZE;
+ reset_read = GNUNET_YES;
+ pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
+ write_amount,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &write_completion, pdata);
+ GNUNET_assert (NULL != pdata->io_write_handle);
+ }
+ else
+ {
+ free_meter (meter);
+ meter = NULL;
+ prof_time = GNUNET_TIME_absolute_get_duration (prof_start_time);
+ prof_time_sec = (((double) prof_time.rel_value)/ ((double) 1000));
+ throughput = ((float) pdata->bytes_wrote) / prof_time_sec;
+ PRINTF ("Throughput %.2f kB/sec\n", throughput / 1000.00);
+ switch (result)
+ {
+ case INIT:
+ result = UPLINK_OK;
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == write_task);
+ pdata->bytes_read = 0;
+ pdata->packets_read = 0;
+ meter = create_meter (MAX_PACKETS, "Testing Downlink\n", GNUNET_YES);
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer_data[0]);
+ write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, &peer_data[1]);
+ break;
+ case UPLINK_OK:
+ result = DOWNLINK_OK;
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ break;
+ case DOWNLINK_OK:
+ GNUNET_assert (0);
+ }
+ }
+}
+
+
+/**
+ * Task for calling STREAM_write with a chunk of random data
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_write_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *pdata = cls;
+ size_t write_amount;
+
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ {
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
+ NULL);
+ }
+ write_task = GNUNET_SCHEDULER_NO_TASK;
+ prof_start_time = GNUNET_TIME_absolute_get ();
+ pdata->bytes_wrote = 0;
+ pdata->packets_wrote = 0;
+ write_amount = MAX_PACKETS * payload_size[payload_size_index];
+ if (write_amount > DATA_SIZE)
+ write_amount = DATA_SIZE;
+ reset_read = GNUNET_YES;
+ pdata->io_write_handle = GNUNET_STREAM_write (pdata->socket, data,
+ write_amount,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ &write_completion, pdata);
+ GNUNET_assert (NULL != pdata->io_write_handle);
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Input processor
+ *
+ * @param cls peer2
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls, enum GNUNET_STREAM_Status status,
+ const void *input_data, size_t size)
+{
+ struct PeerData *pdata = cls;
+
+ if (GNUNET_STREAM_OK != status)
+ {
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
+ return 0;
+ }
+ GNUNET_assert (size <= DATA_SIZE);
+ if (GNUNET_YES == reset_read)
+ {
+ pdata->bytes_read = 0;
+ reset_read = GNUNET_NO;
+ }
+ GNUNET_assert ((pdata->bytes_read + size) <= DATA_SIZE);
+ GNUNET_assert (0 == memcmp (((void *)data ) + pdata->bytes_read,
+ input_data, size));
+ pdata->bytes_read += size;
+ pdata->packets_read += (size + payload_size[payload_size_index] - 1)
+ / payload_size[payload_size_index];
+ if (pdata->packets_read < MAX_PACKETS)
+ {
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
+ }
+ return size;
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *pdata = cls;
+
+ read_task = GNUNET_SCHEDULER_NO_TASK;
+ pdata->io_read_handle =
+ GNUNET_STREAM_read (pdata->socket, GNUNET_TIME_UNIT_FOREVER_REL,
+ &input_processor, pdata);
+ GNUNET_assert (NULL != pdata->io_read_handle);
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other peers
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+stream_listen_cb (void *cls, struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ struct PeerData *pdata = cls;
+
+
+ if ((NULL == socket) || (NULL == initiator))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
+ return GNUNET_OK;
+ }
+ GNUNET_assert (NULL != socket);
+ GNUNET_assert (pdata == &peer_data[1]);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer connected: %s\n",
+ GNUNET_i2s(initiator));
+ pdata->socket = socket;
+ pdata->bytes_read = 0;
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, pdata);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function executed after stream has been established
+ *
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
+ */
+static void
+stream_open_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct PeerData *pdata = cls;
+
+ GNUNET_assert (socket == pdata->socket);
+ meter = create_meter (MAX_PACKETS, "Testing Uplink\n", GNUNET_YES);
+ write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, pdata);
+}
+
+
+/**
+ * Listen success callback; connects a peer to stream as client
+ */
+static void
+stream_connect (void)
+{
+ peer_data[0].socket =
+ GNUNET_STREAM_open (config, &peer_data[1].id, 10, &stream_open_cb,
+ &peer_data[0],
+ GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
+ payload_size[payload_size_index],
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer_data[0].socket);
+}
+
+
+/**
+ * Initialize framework and start test
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer that was started
+ * @param peer identity of the peer that was created
+ */
+static void
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
+{
+ struct GNUNET_PeerIdentity id;
+
+ GNUNET_TESTING_peer_get_identity (peer, &id);
+ config = cfg;
+ peer2_listen_socket =
+ GNUNET_STREAM_listen (config, 10, &stream_listen_cb, &peer_data[1],
+ GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
+ &stream_connect,
+ GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
+ payload_size[payload_size_index],
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer2_listen_socket);
+ peer_data[0].id = id;
+ peer_data[1].id = id;
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 300), &do_abort,
+ NULL);
+}
+
+
+/**
+ * Adapter function called to establish a connection to
+ * a service.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer to connect to; will be available until
+ * GNUNET_TESTBED_operation_done() is called on the operation returned
+ * from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
+ */
+static void *
+stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg);
+
+
+/**
+ * Adapter function called to destroy a connection to
+ * a service.
+ *
+ * @param cls closure
+ * @param op_result service handle returned from the connect adapter
+ */
+static void
+stream_da (void *cls, void *op_result)
+{
+ if (&peer_data[1] == cls)
+ {
+ GNUNET_STREAM_listen_close (op_result);
+ return;
+ }
+ else if (&peer_data[0] == cls)
+ {
+ GNUNET_STREAM_close (op_result);
+ return;
+ }
+ GNUNET_assert (0);
+}
+
+
+/**
+ * Listen success callback; connects a peer to stream as client. Called from
+ * testbed stream_ca
+ */
+static void
+stream_connect2 (void)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
+ peer_data[0].op =
+ GNUNET_TESTBED_service_connect (&peer_data[0], peer_data[0].peer,
+ "stream", NULL, NULL, stream_ca,
+ stream_da, &peer_data[0]);
+}
+
+
+/**
+ * Adapter function called to establish a connection to
+ * a service.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer to connect to; will be available until
+ * GNUNET_TESTBED_operation_done() is called on the operation returned
+ * from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
+ */
+static void *
+stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct PeerData *pdata = cls;
+
+ if (&peer_data[1] == pdata)
+ {
+ peer2_listen_socket = NULL;
+ peer2_listen_socket =
+ GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, &peer_data[1],
+ GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
+ &stream_connect2,
+ GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
+ payload_size[payload_size_index],
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer2_listen_socket);
+ return peer2_listen_socket;
+ }
+ if (&peer_data[0] == pdata)
+ {
+ pdata->socket =
+ GNUNET_STREAM_open (cfg, &peer_data[1].id, 10, &stream_open_cb,
+ &peer_data[0],
+ GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE,
+ payload_size[payload_size_index],
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != pdata->socket);
+ return pdata->socket;
+ }
+ GNUNET_assert (0);
+ return NULL;
+}
+
+
+/**
+ * Callback to be called when the requested peer information is available
+ *
+ * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
+ * @param op the operation this callback corresponds to
+ * @param pinfo the result; will be NULL if the operation has failed
+ * @param emsg error message if the operation has failed; will be NULL if the
+ * operation is successfull
+ */
+static void
+peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op,
+ const struct GNUNET_TESTBED_PeerInformation *pinfo,
+ const char *emsg)
+{
+ struct PeerData *pdata = cb_cls;
+
+ GNUNET_assert (NULL == emsg);
+ GNUNET_assert (common_op == op);
+ GNUNET_assert (NULL != pdata);
+ memcpy (&pdata->id, pinfo->result.id, sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_TESTBED_operation_done (op);
+ if (pdata == &peer_data[0])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n",
+ GNUNET_i2s (&pdata->id));
+ common_op = GNUNET_TESTBED_peer_get_information (peer_data[1].peer,
+ GNUNET_TESTBED_PIT_IDENTITY,
+ &peerinfo_cb, &peer_data[1]);
+ }
+ else if (pdata == &peer_data[1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n",
+ GNUNET_i2s (&pdata->id));
+ if (TEST_STEP_2_HOP == test_step)
+ peer_data[1].op =
+ GNUNET_TESTBED_service_connect (&peer_data[1], peer_data[1].peer,
+ "stream", NULL, NULL, stream_ca,
+ stream_da, &peer_data[1]);
+ else
+ GNUNET_break (0); /* FIXME: 3 hop test case here... */
+ }
+}
+
+
+/**
+ * Controller event callback
+ *
+ * @param cls NULL
+ * @param event the controller event
+ */
+static void
+controller_event_cb (void *cls,
+ const struct GNUNET_TESTBED_EventInformation *event)
+{
+ switch (event->type)
+ {
+ case GNUNET_TESTBED_ET_OPERATION_FINISHED:
+ if (NULL != event->details.operation_finished.emsg)
+ {
+ FPRINTF (stderr, "Error while expecting an operation to succeed:%s \n",
+ event->details.operation_finished.emsg);
+ GNUNET_assert (0);
+ }
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+/**
+ * Signature of a main function for a testcase.
+ *
+ * @param cls closure
+ * @param num_peers number of peers in 'peers'
+ * @param peers handle to peers run in the testbed
+ */
+static void
+test_master (void *cls, unsigned int num_peers_,
+ struct GNUNET_TESTBED_Peer **peers)
+{
+ GNUNET_assert (NULL != peers);
+ GNUNET_assert (NULL != peers[0]);
+ GNUNET_assert (NULL != peers[1]);
+ GNUNET_assert (num_peers_ == num_peers);
+ peer_data[0].peer = peers[0];
+ peer_data[1].peer = peers[1];
+ if (2 == num_peers)
+ /* Get the peer identity and configuration of peers */
+ common_op =
+ GNUNET_TESTBED_peer_get_information (peer_data[0].peer,
+ GNUNET_TESTBED_PIT_IDENTITY,
+ &peerinfo_cb, &peer_data[0]);
+ else
+ GNUNET_break (0);
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 120), &do_abort,
+ NULL);
+}
+
+
+/**
+ * Main function
+ */
+int main (int argc, char **argv)
+{
+ char *test_name = "perf_stream_api";
+ char *cfg_file = "test_stream_local.conf";
+ uint64_t event_mask;
+ unsigned int count;
+ int ret;
+
+ meter = create_meter ((sizeof (data) / 4), "Generating random data\n", GNUNET_YES);
+ for (count=0; count < (sizeof (data) / 4); count++)
+ {
+ data[count] = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
+ update_meter (meter);
+ }
+ reset_meter (meter);
+ free_meter (meter);
+ meter = NULL;
+ test_step = TEST_STEP_1_HOP;
+ for (payload_size_index = 0;
+ payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
+ payload_size_index++)
+ {
+ PRINTF ("\nTesting over loopback with payload size %hu\n",
+ payload_size[payload_size_index]);
+ (void) memset (peer_data, 0, sizeof (peer_data));
+ result = INIT;
+ reset_read = GNUNET_NO;
+ ret = GNUNET_TESTING_peer_run (test_name, cfg_file, &run, NULL);
+ if ((0 != ret) || (DOWNLINK_OK != result))
+ goto return_fail;
+ }
+ test_step = TEST_STEP_2_HOP;
+ num_peers = 2;
+ event_mask = 0;
+ event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
+ for (payload_size_index = 0;
+ payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
+ payload_size_index++)
+ {
+ PRINTF ("\nTesting over 1 hop with payload size %hu\n",
+ payload_size[payload_size_index]);
+ (void) memset (peer_data, 0, sizeof (peer_data));
+ result = INIT;
+ reset_read = GNUNET_NO;
+ (void) GNUNET_TESTBED_test_run (test_name, cfg_file, num_peers, event_mask,
+ &controller_event_cb, NULL, &test_master,
+ NULL);
+ if (DOWNLINK_OK != result)
+ goto return_fail;
+ }
+ test_step = TEST_STEP_3_HOP;
+ for (payload_size_index = 0;
+ payload_size_index < (sizeof (payload_size) / sizeof (uint16_t));
+ payload_size_index++)
+ {
+ /* Initialize testbed here */
+ }
+ return 0;
+
+ return_fail:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Test failed\n");
+ return 1;
+}
+
+/* end of perf_stream_api.c */
diff --git a/src/stream/stream_protocol.h b/src/stream/stream.h
index 0c5376f..9b92a28 100644
--- a/src/stream/stream_protocol.h
+++ b/src/stream/stream.h
@@ -19,13 +19,13 @@
*/
/**
- * @file stream/stream_protocol.h
+ * @file stream/stream.h
* @brief P2P protocol for the stream connections
* @author Sree Harsha Totakura
*/
-#ifndef STREAM_PROTOCOL_H
-#define STREAM_PROTOCOL_H
+#ifndef STREAM_H
+#define STREAM_H
#ifdef __cplusplus
extern "C"
@@ -50,12 +50,6 @@ struct GNUNET_STREAM_MessageHeader
* The GNUNET message header, types are from GNUNET_MESSAGE_TYPE_STREAM_*-range.
*/
struct GNUNET_MessageHeader header;
-
- /**
- * A number which identifies a session between the two peers. FIXME: not needed
- */
- uint32_t session_id GNUNET_PACKED;
-
};
@@ -121,18 +115,18 @@ struct GNUNET_STREAM_AckMessage
struct GNUNET_STREAM_MessageHeader header;
/**
- * The Selective Acknowledgement Bitmap. Computed relative to the base_seq
- * (bit n corresponds to the Data message with sequence number base_seq+n)
- */
- GNUNET_STREAM_AckBitmap bitmap GNUNET_PACKED;
-
- /**
* The sequence number of the next Data Message receiver is
* anticipating. Data messages less than this number are received by receiver
*/
uint32_t base_sequence_number GNUNET_PACKED;
/**
+ * The Selective Acknowledgement Bitmap. Computed relative to the base_seq
+ * (bit n corresponds to the Data message with sequence number base_seq+n)
+ */
+ GNUNET_STREAM_AckBitmap bitmap GNUNET_PACKED;
+
+ /**
* Available buffer space past the last acknowledged buffer (for flow control),
* in bytes.
*/
@@ -154,19 +148,20 @@ struct GNUNET_STREAM_HelloAckMessage
* The selected sequence number. Following data tranmissions from the sender
* start with this sequence
*/
- uint32_t sequence_number;
+ uint32_t sequence_number GNUNET_PACKED;
/**
* The size(in bytes) of the receive window on the peer sending this message
*
* FIXME: Remove if not needed
*/
- uint32_t receiver_window_size;
+ uint32_t receiver_window_size GNUNET_PACKED;
};
/**
* The Transmit close message(used to signal transmission is closed)
+ * FIXME: dead struct?
*/
struct GNUNET_STREAM_TransmitCloseMessage
{
@@ -192,4 +187,4 @@ GNUNET_NETWORK_STRUCT_END
}
#endif
-#endif /* STREAM_PROTOCOL_H */
+#endif /* STREAM.H */
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
index dadba33..20df4aa 100644
--- a/src/stream/stream_api.c
+++ b/src/stream/stream_api.c
@@ -34,31 +34,41 @@
* @author Sree Harsha Totakura
*/
-
#include "platform.h"
#include "gnunet_common.h"
-#include "gnunet_crypto_lib.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_lockmanager_service.h"
+#include "gnunet_statistics_service.h"
#include "gnunet_stream_lib.h"
-#include "stream_protocol.h"
+#include "stream.h"
+/**
+ * Generic logging shorthand
+ */
#define LOG(kind,...) \
GNUNET_log_from (kind, "stream-api", __VA_ARGS__)
/**
- * The maximum packet size of a stream packet
+ * Debug logging shorthand
*/
-#define MAX_PACKET_SIZE 64000
+#define LOG_DEBUG(...) \
+ LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__)
/**
- * Receive buffer
+ * Time in relative seconds shorthand
*/
-#define RECEIVE_BUFFER_SIZE 4096000
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
/**
- * The maximum payload a data message packet can carry
+ * The maximum packet size of a stream packet
*/
-static size_t max_payload_size =
- MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+#define DEFAULT_MAX_PAYLOAD_SIZE 64000
+
+/**
+ * Receive buffer
+ */
+#define RECEIVE_BUFFER_SIZE 4096000
/**
* states in the Protocol
@@ -165,29 +175,14 @@ struct MessageQueue
struct GNUNET_STREAM_Socket
{
/**
- * Retransmission timeout
- */
- struct GNUNET_TIME_Relative retransmit_timeout;
-
- /**
- * The Acknowledgement Bitmap
- */
- GNUNET_STREAM_AckBitmap ack_bitmap;
-
- /**
- * Time when the Acknowledgement was queued
- */
- struct GNUNET_TIME_Absolute ack_time_registered;
-
- /**
- * Queued Acknowledgement deadline
+ * The mesh handle
*/
- struct GNUNET_TIME_Relative ack_time_deadline;
+ struct GNUNET_MESH_Handle *mesh;
/**
- * The mesh handle
+ * Handle to statistics
*/
- struct GNUNET_MESH_Handle *mesh;
+ struct GNUNET_STATISTICS_Handle *stat_handle;
/**
* The mesh tunnel handle
@@ -210,16 +205,6 @@ struct GNUNET_STREAM_Socket
struct GNUNET_MESH_TransmitHandle *transmit_handle;
/**
- * The current act transmit handle (if a pending ack transmit request exists)
- */
- struct GNUNET_MESH_TransmitHandle *ack_transmit_handle;
-
- /**
- * Pointer to the current ack message using in ack_task
- */
- struct GNUNET_STREAM_AckMessage *ack_msg;
-
- /**
* The current message associated with the transmit handle
*/
struct MessageQueue *queue_head;
@@ -232,12 +217,12 @@ struct GNUNET_STREAM_Socket
/**
* The write IO_handle associated with this socket
*/
- struct GNUNET_STREAM_IOWriteHandle *write_handle;
+ struct GNUNET_STREAM_WriteHandle *write_handle;
/**
* The read IO_handle associated with this socket
*/
- struct GNUNET_STREAM_IOReadHandle *read_handle;
+ struct GNUNET_STREAM_ReadHandle *read_handle;
/**
* The shutdown handle associated with this socket
@@ -261,14 +246,19 @@ struct GNUNET_STREAM_Socket
struct GNUNET_PeerIdentity other_peer;
/**
- * Task identifier for the read io timeout task
+ * The Acknowledgement Bitmap
*/
- GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+ GNUNET_STREAM_AckBitmap ack_bitmap;
/**
* Task identifier for retransmission task after timeout
*/
- GNUNET_SCHEDULER_TaskIdentifier retransmission_timeout_task_id;
+ GNUNET_SCHEDULER_TaskIdentifier data_retransmission_task_id;
+
+ /**
+ * Task identifier for retransmission of control messages
+ */
+ GNUNET_SCHEDULER_TaskIdentifier control_retransmission_task_id;
/**
* The task for sending timely Acks
@@ -276,9 +266,29 @@ struct GNUNET_STREAM_Socket
GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
/**
- * Task scheduled to continue a read operation.
+ * Retransmission timeout
*/
- GNUNET_SCHEDULER_TaskIdentifier read_task_id;
+ struct GNUNET_TIME_Relative retransmit_timeout;
+
+ /**
+ * Time when the Acknowledgement was queued
+ */
+ struct GNUNET_TIME_Absolute ack_time_registered;
+
+ /**
+ * Queued Acknowledgement deadline
+ */
+ struct GNUNET_TIME_Relative ack_time_deadline;
+
+ /**
+ * Mesh transmit timeout
+ */
+ struct GNUNET_TIME_Relative mesh_retry_timeout;
+
+ /**
+ * Data retransmission timeout
+ */
+ struct GNUNET_TIME_Relative data_retransmit_timeout;
/**
* The state of the protocol associated with this socket
@@ -286,14 +296,19 @@ struct GNUNET_STREAM_Socket
enum State state;
/**
- * The status of the socket
+ * Whether testing mode is active or not
*/
- enum GNUNET_STREAM_Status status;
+ int testing_active;
/**
- * The number of previous timeouts; FIXME: currently not used
+ * Is receive closed
*/
- unsigned int retries;
+ int receive_closed;
+
+ /**
+ * Is transmission closed
+ */
+ int transmit_closed;
/**
* The application port number (type: uint32_t)
@@ -301,10 +316,9 @@ struct GNUNET_STREAM_Socket
GNUNET_MESH_ApplicationType app_port;
/**
- * The session id associated with this stream connection
- * FIXME: Not used currently, may be removed
+ * The write sequence number to be set incase of testing
*/
- uint32_t session_id;
+ uint32_t testing_set_write_sequence_number_value;
/**
* Write sequence number. Set to random when sending HELLO(client) and
@@ -346,6 +360,11 @@ struct GNUNET_STREAM_Socket
* The offset upto which user has read from the received buffer
*/
uint32_t copy_offset;
+
+ /**
+ * The maximum size of the data message payload this stream handle can send
+ */
+ uint16_t max_payload_size;
};
@@ -360,6 +379,31 @@ struct GNUNET_STREAM_ListenSocket
struct GNUNET_MESH_Handle *mesh;
/**
+ * Handle to statistics
+ */
+ struct GNUNET_STATISTICS_Handle *stat_handle;
+
+ /**
+ * Our configuration
+ */
+ struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ /**
+ * Handle to the lock manager service
+ */
+ struct GNUNET_LOCKMANAGER_Handle *lockmanager;
+
+ /**
+ * The active LockingRequest from lockmanager
+ */
+ struct GNUNET_LOCKMANAGER_LockingRequest *locking_request;
+
+ /**
+ * Callback to call after acquring a lock and listening
+ */
+ GNUNET_STREAM_ListenSuccessCallback listen_ok_cb;
+
+ /**
* The callback function which is called after successful opening socket
*/
GNUNET_STREAM_ListenCallback listen_cb;
@@ -371,16 +415,46 @@ struct GNUNET_STREAM_ListenSocket
/**
* The service port
- * FIXME: Remove if not required!
*/
GNUNET_MESH_ApplicationType port;
+
+ /**
+ * The id of the lockmanager timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier lockmanager_acquire_timeout_task;
+
+ /**
+ * The retransmit timeout
+ */
+ struct GNUNET_TIME_Relative retransmit_timeout;
+
+ /**
+ * Listen enabled?
+ */
+ int listening;
+
+ /**
+ * Whether testing mode is active or not
+ */
+ int testing_active;
+
+ /**
+ * The write sequence number to be set incase of testing
+ */
+ uint32_t testing_set_write_sequence_number_value;
+
+ /**
+ * The maximum size of the data message payload this stream handle can send
+ */
+ uint16_t max_payload_size;
+
};
/**
* The IO Write Handle
*/
-struct GNUNET_STREAM_IOWriteHandle
+struct GNUNET_STREAM_WriteHandle
{
/**
* The socket to which this write handle is associated
@@ -388,11 +462,6 @@ struct GNUNET_STREAM_IOWriteHandle
struct GNUNET_STREAM_Socket *socket;
/**
- * The packet_buffers associated with this Handle
- */
- struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
-
- /**
* The write continuation callback
*/
GNUNET_STREAM_CompletionContinuation write_cont;
@@ -403,6 +472,11 @@ struct GNUNET_STREAM_IOWriteHandle
void *write_cont_cls;
/**
+ * The packet_buffers associated with this Handle
+ */
+ struct GNUNET_STREAM_DataMessage *messages[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
+
+ /**
* The bitmap of this IOHandle; Corresponding bit for a message is set when
* it has been acknowledged by the receiver
*/
@@ -412,15 +486,34 @@ struct GNUNET_STREAM_IOWriteHandle
* Number of bytes in this write handle
*/
size_t size;
+
+ /**
+ * Number of packets already transmitted from this IO handle. Retransmitted
+ * packets are not taken into account here. This is used to determine which
+ * packets account for retransmission and which packets occupy buffer space at
+ * the receiver.
+ */
+ unsigned int packets_sent;
+
+ /**
+ * The maximum of the base numbers of the received acks
+ */
+ uint32_t max_ack_base_num;
+
};
/**
* The IO Read Handle
*/
-struct GNUNET_STREAM_IOReadHandle
+struct GNUNET_STREAM_ReadHandle
{
/**
+ * The socket to which this read handle is associated
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
* Callback for the read processor
*/
GNUNET_STREAM_DataProcessor proc;
@@ -429,6 +522,22 @@ struct GNUNET_STREAM_IOReadHandle
* The closure pointer for the read processor callback
*/
void *proc_cls;
+
+ /**
+ * Task identifier for the read io timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task_id;
+
+ /**
+ * Task scheduled to continue a read operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_task_id;
+
+ /**
+ * Task scheduled from GNUNET_STREAM_read() to lookup the ACK bitmap and call
+ * the read processor task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier probe_data_availability_task_id;
};
@@ -458,6 +567,11 @@ struct GNUNET_STREAM_ShutdownHandle
GNUNET_SCHEDULER_TaskIdentifier close_msg_retransmission_task_id;
/**
+ * Task scheduled to call the shutdown continuation callback
+ */
+ GNUNET_SCHEDULER_TaskIdentifier call_cont_task_id;
+
+ /**
* Which operation to shutdown? SHUT_RD, SHUT_WR or SHUT_RDWR
*/
int operation;
@@ -467,8 +581,12 @@ struct GNUNET_STREAM_ShutdownHandle
/**
* Default value in seconds for various timeouts
*/
-static unsigned int default_timeout = 10;
+static const unsigned int default_timeout = 10;
+/**
+ * The domain name for locks we use here
+ */
+static const char *locking_domain = "GNUNET_STREAM_APPLOCK";
/**
* Callback function for sending queued message
@@ -491,24 +609,23 @@ send_message_notify (void *cls, size_t size, void *buf)
return 0; /* just to be safe */
if (0 == size) /* request timed out */
{
- socket->retries++;
+ socket->mesh_retry_timeout = GNUNET_TIME_STD_BACKOFF
+ (socket->mesh_retry_timeout);
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Message sending timed out. Retry %d \n",
+ "%s: Message sending to MESH timed out. Retrying in %s \n",
GNUNET_i2s (&socket->other_peer),
- socket->retries);
+ GNUNET_STRINGS_relative_time_to_string (socket->mesh_retry_timeout,
+ GNUNET_YES));
socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- /* FIXME: exponential backoff */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (head->message->header.size),
- &send_message_notify,
- socket);
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ GNUNET_NO, /* Corking */
+ socket->mesh_retry_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
return 0;
}
-
ret = ntohs (head->message->header.size);
GNUNET_assert (size >= ret);
memcpy (buf, head->message, ret);
@@ -521,20 +638,20 @@ send_message_notify (void *cls, size_t size, void *buf)
head);
GNUNET_free (head->message);
GNUNET_free (head);
+ if (NULL != socket->transmit_handle)
+ return ret; /* 'finish_cb' might have triggered message already! */
head = socket->queue_head;
if (NULL != head) /* more pending messages to send */
{
- socket->retries = 0;
+ socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- /* FIXME: exponential backoff */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (head->message->header.size),
- &send_message_notify,
- socket);
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ GNUNET_NO, /* Corking */
+ socket->mesh_retry_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
}
return ret;
}
@@ -547,19 +664,21 @@ send_message_notify (void *cls, size_t size, void *buf)
* @param message the message to be sent
* @param finish_cb the callback to be called when the message is sent
* @param finish_cb_cls the closure for the callback
+ * @param urgent set to GNUNET_YES to add the message to the beginning of the
+ * queue; GNUNET_NO to add at the tail
*/
static void
queue_message (struct GNUNET_STREAM_Socket *socket,
struct GNUNET_STREAM_MessageHeader *message,
SendFinishCallback finish_cb,
- void *finish_cb_cls)
+ void *finish_cb_cls,
+ int urgent)
{
struct MessageQueue *queue_entity;
GNUNET_assert
((ntohs (message->header.type) >= GNUNET_MESSAGE_TYPE_STREAM_DATA)
&& (ntohs (message->header.type) <= GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK));
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Queueing message of type %d and size %d\n",
GNUNET_i2s (&socket->other_peer),
@@ -570,21 +689,31 @@ queue_message (struct GNUNET_STREAM_Socket *socket,
queue_entity->message = message;
queue_entity->finish_cb = finish_cb;
queue_entity->finish_cb_cls = finish_cb_cls;
- GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
- socket->queue_tail,
- queue_entity);
+ if (GNUNET_YES == urgent)
+ {
+ GNUNET_CONTAINER_DLL_insert (socket->queue_head, socket->queue_tail,
+ queue_entity);
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ }
+ else
+ GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
+ socket->queue_tail,
+ queue_entity);
if (NULL == socket->transmit_handle)
{
- socket->retries = 0;
+ socket->mesh_retry_timeout = GNUNET_TIME_UNIT_ZERO;
socket->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (message->header.size),
- &send_message_notify,
- socket);
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ GNUNET_NO, /* Corking */
+ socket->mesh_retry_timeout,
+ &socket->other_peer,
+ ntohs (message->header.size),
+ &send_message_notify,
+ socket);
}
}
@@ -610,41 +739,11 @@ copy_and_queue_message (struct GNUNET_STREAM_Socket *socket,
size = ntohs (message->header.size);
msg_copy = GNUNET_malloc (size);
memcpy (msg_copy, message, size);
- queue_message (socket, msg_copy, finish_cb, finish_cb_cls);
+ queue_message (socket, msg_copy, finish_cb, finish_cb_cls, GNUNET_NO);
}
/**
- * Callback function for sending ack message
- *
- * @param cls closure the ACK message created in ack_task
- * @param size number of bytes available in buffer
- * @param buf where the callee should write the message
- * @return number of bytes written to buf
- */
-static size_t
-send_ack_notify (void *cls, size_t size, void *buf)
-{
- struct GNUNET_STREAM_Socket *socket = cls;
-
- if (0 == size)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s called with size 0\n", __func__);
- return 0;
- }
- GNUNET_assert (ntohs (socket->ack_msg->header.header.size) <= size);
-
- size = ntohs (socket->ack_msg->header.header.size);
- memcpy (buf, socket->ack_msg, size);
-
- GNUNET_free (socket->ack_msg);
- socket->ack_msg = NULL;
- socket->ack_transmit_handle = NULL;
- return size;
-}
-
-/**
* Writes data using the given socket. The amount of data written is limited by
* the receiver_window_size
*
@@ -653,6 +752,7 @@ send_ack_notify (void *cls, size_t size, void *buf)
static void
write_data (struct GNUNET_STREAM_Socket *socket);
+
/**
* Task for retransmitting data messages if they aren't ACK before their ack
* deadline
@@ -661,17 +761,16 @@ write_data (struct GNUNET_STREAM_Socket *socket);
* @param tc the Task context
*/
static void
-retransmission_timeout_task (void *cls,
- const struct GNUNET_SCHEDULER_TaskContext *tc)
+data_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
- if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
return;
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Retransmitting DATA...\n", GNUNET_i2s (&socket->other_peer));
- socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
write_data (socket);
}
@@ -689,11 +788,9 @@ ack_task (void *cls,
struct GNUNET_STREAM_Socket *socket = cls;
struct GNUNET_STREAM_AckMessage *ack_msg;
- if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
- {
- return;
- }
socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
/* Create the ACK Message */
ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
ack_msg->header.header.size = htons (sizeof (struct
@@ -703,17 +800,8 @@ ack_task (void *cls,
ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
ack_msg->receive_window_remaining =
htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
- socket->ack_msg = ack_msg;
- /* Request MESH for sending ACK */
- socket->ack_transmit_handle =
- GNUNET_MESH_notify_transmit_ready (socket->tunnel,
- 0, /* Corking */
- 1, /* Priority */
- socket->retransmit_timeout,
- &socket->other_peer,
- ntohs (ack_msg->header.header.size),
- &send_ack_notify,
- socket);
+ /* Queue up ACK for immediate sending */
+ queue_message (socket, &ack_msg->header, NULL, NULL, GNUNET_YES);
}
@@ -731,9 +819,11 @@ close_msg_retransmission_task (void *cls,
struct GNUNET_STREAM_MessageHeader *msg;
struct GNUNET_STREAM_Socket *socket;
+ shutdown_handle->close_msg_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
GNUNET_assert (NULL != shutdown_handle);
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
socket = shutdown_handle->socket;
-
msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
switch (shutdown_handle->operation)
@@ -753,7 +843,7 @@ close_msg_retransmission_task (void *cls,
GNUNET_SCHEDULER_NO_TASK;
return;
}
- queue_message (socket, msg, NULL, NULL);
+ queue_message (socket, msg, NULL, NULL, GNUNET_NO);
shutdown_handle->close_msg_retransmission_task_id =
GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
&close_msg_retransmission_task,
@@ -806,41 +896,29 @@ ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
static void
write_data (struct GNUNET_STREAM_Socket *socket)
{
- struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
- int packet; /* Although an int, should never be negative */
- int ack_packet;
-
- ack_packet = -1;
- /* Find the last acknowledged packet */
- for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
- {
- if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
- packet))
- ack_packet = packet;
- else if (NULL == io_handle->messages[packet])
- break;
- }
- /* Resend packets which weren't ack'ed */
- for (packet=0; packet < ack_packet; packet++)
+ struct GNUNET_STREAM_WriteHandle *io_handle = socket->write_handle;
+ unsigned int packet;
+
+ for (packet=0; packet < io_handle->packets_sent; packet++)
{
if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
- packet))
+ packet))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Placing DATA message with sequence %u in send queue\n",
- GNUNET_i2s (&socket->other_peer),
- ntohl (io_handle->messages[packet]->sequence_number));
+ "%s: Retransmitting DATA message with sequence %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ ntohl (io_handle->messages[packet]->sequence_number));
copy_and_queue_message (socket,
- &io_handle->messages[packet]->header,
- NULL,
- NULL);
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
}
}
- packet = ack_packet + 1;
/* Now send new packets if there is enough buffer space */
- while ( (NULL != io_handle->messages[packet]) &&
- (socket->receiver_window_available
- >= ntohs (io_handle->messages[packet]->header.header.size)) )
+ while ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH) &&
+ (NULL != io_handle->messages[packet]) &&
+ (socket->receiver_window_available
+ >= ntohs (io_handle->messages[packet]->header.header.size)))
{
socket->receiver_window_available -=
ntohs (io_handle->messages[packet]->header.header.size);
@@ -854,12 +932,42 @@ write_data (struct GNUNET_STREAM_Socket *socket)
NULL);
packet++;
}
- if (GNUNET_SCHEDULER_NO_TASK == socket->retransmission_timeout_task_id)
- socket->retransmission_timeout_task_id =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 8),
- &retransmission_timeout_task,
- socket);
+ io_handle->packets_sent = packet;
+ if (GNUNET_SCHEDULER_NO_TASK == socket->data_retransmission_task_id)
+ {
+ socket->data_retransmit_timeout = GNUNET_TIME_STD_BACKOFF
+ (socket->data_retransmit_timeout);
+ socket->data_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->data_retransmit_timeout,
+ &data_retransmission_task,
+ socket);
+ }
+}
+
+
+/**
+ * Cleansup the sockets read handle
+ *
+ * @param socket the socket whose read handle has to be cleanedup
+ */
+static void
+cleanup_read_handle (struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_ReadHandle *read_handle;
+
+ read_handle = socket->read_handle;
+ /* Read io time task should be there; if it is already executed then this
+ read handle is not valid; However upon scheduler shutdown the read io task
+ may be executed before */
+ if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_io_timeout_task_id)
+ GNUNET_SCHEDULER_cancel (read_handle->read_io_timeout_task_id);
+ /* reading task may be present; if so we have to stop it */
+ if (GNUNET_SCHEDULER_NO_TASK != read_handle->read_task_id)
+ GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+ if (GNUNET_SCHEDULER_NO_TASK != read_handle->probe_data_availability_task_id)
+ GNUNET_SCHEDULER_cancel (read_handle->probe_data_availability_task_id);
+ GNUNET_free (read_handle);
+ socket->read_handle = NULL;
}
@@ -874,22 +982,23 @@ call_read_processor (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_ReadHandle *read_handle;
+ GNUNET_STREAM_DataProcessor proc;
+ void *proc_cls;
size_t read_size;
size_t valid_read_size;
unsigned int packet;
uint32_t sequence_increase;
uint32_t offset_increase;
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
-
if (NULL == socket->receive_buffer)
return;
-
- GNUNET_assert (NULL != socket->read_handle);
- GNUNET_assert (NULL != socket->read_handle->proc);
-
+ GNUNET_assert (NULL != read_handle->proc);
/* Check the bitmap for any holes */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
@@ -902,51 +1011,46 @@ call_read_processor (void *cls,
valid_read_size =
socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
GNUNET_assert (0 != valid_read_size);
- /* Cancel the read_io_timeout_task */
- GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task_id);
- socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ proc = read_handle->proc;
+ proc_cls = read_handle->proc_cls;
+ cleanup_read_handle (socket);
/* Call the data processor */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Calling read processor\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Calling read processor\n",
GNUNET_i2s (&socket->other_peer));
- read_size =
- socket->read_handle->proc (socket->read_handle->proc_cls,
- socket->status,
- socket->receive_buffer + socket->copy_offset,
- valid_read_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Read processor read %d bytes\n",
+ read_size = proc (proc_cls, GNUNET_STREAM_OK,
+ socket->receive_buffer + socket->copy_offset,
+ valid_read_size);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor read %d bytes\n",
GNUNET_i2s (&socket->other_peer), read_size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Read processor completed successfully\n",
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Read processor completed successfully\n",
GNUNET_i2s (&socket->other_peer));
- /* Free the read handle */
- GNUNET_free (socket->read_handle);
- socket->read_handle = NULL;
GNUNET_assert (read_size <= valid_read_size);
socket->copy_offset += read_size;
/* Determine upto which packet we can remove from the buffer */
for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
if (socket->copy_offset == socket->receive_buffer_boundaries[packet])
- { packet++; break; }
+ {
+ packet++;
+ break;
+ }
if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
break;
}
-
/* If no packets can be removed we can't move the buffer */
- if (0 == packet) return;
+ if (0 == packet)
+ return;
sequence_increase = packet;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Sequence increase after read processor completion: %u\n",
GNUNET_i2s (&socket->other_peer), sequence_increase);
-
/* Shift the data in the receive buffer */
- memmove (socket->receive_buffer,
- socket->receive_buffer
- + socket->receive_buffer_boundaries[sequence_increase-1],
- socket->receive_buffer_size
- - socket->receive_buffer_boundaries[sequence_increase-1]);
+ socket->receive_buffer =
+ memmove (socket->receive_buffer,
+ socket->receive_buffer
+ + socket->receive_buffer_boundaries[sequence_increase-1],
+ socket->receive_buffer_size
+ - socket->receive_buffer_boundaries[sequence_increase-1]);
/* Shift the bitmap */
socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
/* Set read_sequence_number */
@@ -960,11 +1064,20 @@ call_read_processor (void *cls,
/* Fix relative boundaries */
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
- if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
+ if (packet < (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase))
{
- socket->receive_buffer_boundaries[packet] =
- socket->receive_buffer_boundaries[packet + sequence_increase]
- - offset_increase;
+ uint32_t ahead_buffer_boundary;
+
+ ahead_buffer_boundary =
+ socket->receive_buffer_boundaries[packet + sequence_increase];
+ if (0 == ahead_buffer_boundary)
+ socket->receive_buffer_boundaries[packet] = 0;
+ else
+ {
+ GNUNET_assert (offset_increase < ahead_buffer_boundary);
+ socket->receive_buffer_boundaries[packet] =
+ ahead_buffer_boundary - offset_increase;
+ }
}
else
socket->receive_buffer_boundaries[packet] = 0;
@@ -979,27 +1092,30 @@ call_read_processor (void *cls,
* @param tc the task context
*/
static void
-read_io_timeout (void *cls,
+read_io_timeout (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_ReadHandle *read_handle;
GNUNET_STREAM_DataProcessor proc;
void *proc_cls;
- socket->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
- if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
+ read_handle = socket->read_handle;
+ GNUNET_assert (NULL != read_handle);
+ read_handle->read_io_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (GNUNET_SCHEDULER_REASON_SHUTDOWN & tc->reason))
+ return;
+ if (read_handle->read_task_id != GNUNET_SCHEDULER_NO_TASK)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Read task timedout - Cancelling it\n",
GNUNET_i2s (&socket->other_peer));
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (read_handle->read_task_id);
+ read_handle->read_task_id = GNUNET_SCHEDULER_NO_TASK;
}
- GNUNET_assert (NULL != socket->read_handle);
- proc = socket->read_handle->proc;
- proc_cls = socket->read_handle->proc_cls;
-
- GNUNET_free (socket->read_handle);
+ proc = read_handle->proc;
+ proc_cls = read_handle->proc_cls;
+ GNUNET_free (read_handle);
socket->read_handle = NULL;
/* Call the read processor to signal timeout */
proc (proc_cls,
@@ -1028,6 +1144,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
const struct GNUNET_ATS_Information*atsi)
{
const void *payload;
+ struct GNUNET_TIME_Relative ack_deadline_rel;
uint32_t bytes_needed;
uint32_t relative_offset;
uint32_t relative_sequence_number;
@@ -1039,28 +1156,24 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
GNUNET_break_op (0);
return GNUNET_SYSERR;
}
-
- if (0 != memcmp (sender,
- &socket->other_peer,
- sizeof (struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received DATA from non-confirming peer\n",
- GNUNET_i2s (&socket->other_peer));
+ "%s: Received DATA from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
return GNUNET_YES;
}
-
switch (socket->state)
{
case STATE_ESTABLISHED:
case STATE_TRANSMIT_CLOSED:
- case STATE_TRANSMIT_CLOSE_WAIT:
-
+ case STATE_TRANSMIT_CLOSE_WAIT:
/* check if the message's sequence number is in the range we are
expecting */
relative_sequence_number =
ntohl (msg->sequence_number) - socket->read_sequence_number;
- if ( relative_sequence_number > GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
+ if ( relative_sequence_number >= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Ignoring received message with sequence number %u\n",
@@ -1076,8 +1189,7 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
socket);
}
return GNUNET_YES;
- }
-
+ }
/* Check if we have already seen this message */
if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
relative_sequence_number))
@@ -1091,20 +1203,14 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
{
socket->ack_task_id =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
- (msg->ack_deadline),
- &ack_task,
- socket);
+ (msg->ack_deadline), &ack_task, socket);
}
return GNUNET_YES;
}
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Receiving DATA with sequence number: %u and size: %d from %s\n",
- GNUNET_i2s (&socket->other_peer),
- ntohl (msg->sequence_number),
- ntohs (msg->header.header.size),
- GNUNET_i2s (&socket->other_peer));
-
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number),
+ ntohs (msg->header.header.size), GNUNET_i2s (&socket->other_peer));
/* Check if we have to allocate the buffer */
size -= sizeof (struct GNUNET_STREAM_DataMessage);
relative_offset = ntohl (msg->offset) - socket->read_offset;
@@ -1121,54 +1227,67 @@ handle_data (struct GNUNET_STREAM_Socket *socket,
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Cannot accommodate packet %d as buffer is full\n",
- GNUNET_i2s (&socket->other_peer),
- ntohl (msg->sequence_number));
+ GNUNET_i2s (&socket->other_peer), ntohl (msg->sequence_number));
return GNUNET_YES;
}
}
-
/* Copy Data to buffer */
payload = &msg[1];
GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
- memcpy (socket->receive_buffer + relative_offset,
- payload,
- size);
+ memcpy (socket->receive_buffer + relative_offset, payload, size);
socket->receive_buffer_boundaries[relative_sequence_number] =
- relative_offset + size;
-
+ relative_offset + size;
/* Modify the ACK bitmap */
- ackbitmap_modify_bit (&socket->ack_bitmap,
- relative_sequence_number,
- GNUNET_YES);
-
+ ackbitmap_modify_bit (&socket->ack_bitmap, relative_sequence_number,
+ GNUNET_YES);
/* Start ACK sending task if one is not already present */
+ ack_deadline_rel = GNUNET_TIME_relative_ntoh (msg->ack_deadline);
if (GNUNET_SCHEDULER_NO_TASK == socket->ack_task_id)
{
+ ack_deadline_rel =
+ GNUNET_TIME_relative_min (ack_deadline_rel,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 300));
socket->ack_task_id =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
- (msg->ack_deadline),
- &ack_task,
- socket);
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline), &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }
+ else
+ {
+ struct GNUNET_TIME_Relative ack_time_past;
+ struct GNUNET_TIME_Relative ack_time_remaining;
+ struct GNUNET_TIME_Relative ack_time_min;
+ ack_time_past =
+ GNUNET_TIME_absolute_get_duration (socket->ack_time_registered);
+ ack_time_remaining = GNUNET_TIME_relative_subtract
+ (socket->ack_time_deadline, ack_time_past);
+ ack_time_min = GNUNET_TIME_relative_min (ack_time_remaining,
+ ack_deadline_rel);
+ if (0 == memcmp(&ack_deadline_rel, &ack_time_min,
+ sizeof (struct GNUNET_TIME_Relative)))
+ {
+ ack_deadline_rel = ack_time_min;
+ GNUNET_SCHEDULER_cancel (socket->ack_task_id);
+ socket->ack_task_id = GNUNET_SCHEDULER_add_delayed (ack_deadline_rel,
+ &ack_task, socket);
+ socket->ack_time_registered = GNUNET_TIME_absolute_get ();
+ socket->ack_time_deadline = ack_deadline_rel;
+ }
}
-
if ((NULL != socket->read_handle) /* A read handle is waiting */
/* There is no current read task */
- && (GNUNET_SCHEDULER_NO_TASK == socket->read_task_id)
+ && (GNUNET_SCHEDULER_NO_TASK == socket->read_handle->read_task_id)
/* We have the first packet */
- && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
- 0)))
+ && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap, 0)))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Scheduling read processor\n",
- GNUNET_i2s (&socket->other_peer));
-
- socket->read_task_id =
- GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Scheduling read processor\n",
+ GNUNET_i2s (&socket->other_peer));
+ socket->read_handle->read_task_id =
+ GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
}
-
break;
-
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received data message when it cannot be handled\n",
@@ -1201,18 +1320,15 @@ client_handle_data (void *cls,
{
struct GNUNET_STREAM_Socket *socket = cls;
- return handle_data (socket,
- tunnel,
- sender,
- (const struct GNUNET_STREAM_DataMessage *) message,
- atsi);
+ return handle_data (socket, tunnel, sender,
+ (const struct GNUNET_STREAM_DataMessage *) message, atsi);
}
/**
* Callback to set state to ESTABLISHED
*
- * @param cls the closure from queue_message FIXME: document
+ * @param cls the closure NULL;
* @param socket the socket to requiring state change
*/
static void
@@ -1225,7 +1341,10 @@ set_state_established (void *cls,
socket->write_offset = 0;
socket->read_offset = 0;
socket->state = STATE_ESTABLISHED;
- /* FIXME: What if listen_cb is NULL */
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
+ socket->control_retransmission_task_id);
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
if (NULL != socket->lsocket)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -1237,12 +1356,12 @@ set_state_established (void *cls,
&socket->other_peer))
{
socket->state = STATE_CLOSED;
- /* FIXME: We should close in a decent way */
+ /* FIXME: We should close in a decent way (send RST) */
GNUNET_MESH_tunnel_destroy (socket->tunnel); /* Destroy the tunnel */
GNUNET_free (socket);
}
}
- else if (socket->open_cb)
+ else
socket->open_cb (socket->open_cls, socket);
}
@@ -1258,7 +1377,7 @@ set_state_hello_wait (void *cls,
struct GNUNET_STREAM_Socket *socket)
{
GNUNET_assert (STATE_INIT == socket->state);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Attaining HELLO_WAIT state\n",
GNUNET_i2s (&socket->other_peer));
socket->state = STATE_HELLO_WAIT;
@@ -1335,38 +1454,115 @@ set_state_closed (void *cls,
socket->state = STATE_CLOSED;
}
+
+/**
+ * Returns GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ *
+ * @return the generate hello message
+ */
+static struct GNUNET_STREAM_MessageHeader *
+generate_hello (void)
+{
+ struct GNUNET_STREAM_MessageHeader *msg;
+
+ msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+ msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ return msg;
+}
+
+
/**
* Returns a new HelloAckMessage. Also sets the write sequence number for the
* socket
*
* @param socket the socket for which this HelloAckMessage has to be generated
+ * @param generate_seq GNUNET_YES to generate the write sequence number,
+ * GNUNET_NO to use the existing sequence number
* @return the HelloAckMessage
*/
static struct GNUNET_STREAM_HelloAckMessage *
-generate_hello_ack_msg (struct GNUNET_STREAM_Socket *socket)
+generate_hello_ack (struct GNUNET_STREAM_Socket *socket,
+ int generate_seq)
{
struct GNUNET_STREAM_HelloAckMessage *msg;
- /* Get the random sequence number */
- socket->write_sequence_number =
- GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Generated write sequence number %u\n",
- GNUNET_i2s (&socket->other_peer),
- (unsigned int) socket->write_sequence_number);
-
+ if (GNUNET_YES == generate_seq)
+ {
+ if (GNUNET_YES == socket->testing_active)
+ socket->write_sequence_number =
+ socket->testing_set_write_sequence_number_value;
+ else
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ LOG_DEBUG ("%s: write sequence number %u\n",
+ GNUNET_i2s (&socket->other_peer),
+ (unsigned int) socket->write_sequence_number);
+ }
msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
msg->header.header.size =
htons (sizeof (struct GNUNET_STREAM_HelloAckMessage));
msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
msg->sequence_number = htonl (socket->write_sequence_number);
msg->receiver_window_size = htonl (RECEIVE_BUFFER_SIZE);
-
return msg;
}
/**
+ * Task for retransmitting control messages if they aren't ACK'ed before a
+ * deadline
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+control_retransmission_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ return;
+ LOG_DEBUG ("%s: Retransmitting a control message\n",
+ GNUNET_i2s (&socket->other_peer));
+ switch (socket->state)
+ {
+ case STATE_INIT:
+ GNUNET_break (0);
+ break;
+ case STATE_LISTEN:
+ GNUNET_break (0);
+ break;
+ case STATE_HELLO_WAIT:
+ if (NULL == socket->lsocket) /* We are client */
+ queue_message (socket, generate_hello (), NULL, NULL, GNUNET_NO);
+ else
+ queue_message (socket,
+ (struct GNUNET_STREAM_MessageHeader *)
+ generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+ GNUNET_NO);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &control_retransmission_task, socket);
+ break;
+ case STATE_ESTABLISHED:
+ if (NULL == socket->lsocket)
+ queue_message (socket,
+ (struct GNUNET_STREAM_MessageHeader *)
+ generate_hello_ack (socket, GNUNET_NO), NULL, NULL,
+ GNUNET_NO);
+ else
+ GNUNET_break (0);
+ break;
+ default:
+ GNUNET_break (0);
+ }
+}
+
+
+/**
* Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
*
* @param cls the socket (set from GNUNET_MESH_connect)
@@ -1390,9 +1586,8 @@ client_handle_hello_ack (void *cls,
const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
struct GNUNET_STREAM_HelloAckMessage *reply;
- if (0 != memcmp (sender,
- &socket->other_peer,
- sizeof (struct GNUNET_PeerIdentity)))
+ if (0 != memcmp (sender, &socket->other_peer,
+ sizeof (struct GNUNET_PeerIdentity)))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received HELLO_ACK from non-confirming peer\n",
@@ -1400,11 +1595,8 @@ client_handle_hello_ack (void *cls,
return GNUNET_YES;
}
ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received HELLO_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
-
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received HELLO_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
GNUNET_assert (socket->tunnel == tunnel);
switch (socket->state)
{
@@ -1415,27 +1607,24 @@ client_handle_hello_ack (void *cls,
GNUNET_i2s (&socket->other_peer),
(unsigned int) socket->read_sequence_number);
socket->receiver_window_available = ntohl (ack_msg->receiver_window_size);
- reply = generate_hello_ack_msg (socket);
- queue_message (socket,
- &reply->header,
- &set_state_established,
- NULL);
+ reply = generate_hello_ack (socket, GNUNET_YES);
+ queue_message (socket, &reply->header, &set_state_established,
+ NULL, GNUNET_NO);
return GNUNET_OK;
case STATE_ESTABLISHED:
- case STATE_RECEIVE_CLOSE_WAIT:
// call statistics (# ACKs ignored++)
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+ socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
return GNUNET_OK;
- case STATE_INIT:
default:
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Server %s sent HELLO_ACK when in state %d\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer),
- socket->state);
+ LOG_DEBUG ("%s: Server %s sent HELLO_ACK when in state %d\n",
+ GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer), socket->state);
socket->state = STATE_CLOSED; // introduce STATE_ERROR?
return GNUNET_SYSERR;
}
-
}
@@ -1466,6 +1655,63 @@ client_handle_reset (void *cls,
/**
+ * Frees the socket's receive buffers, marks the socket as receive closed and
+ * calls the DataProcessor with GNUNET_STREAM_SHUTDOWN status if a read handle
+ * is present
+ *
+ * @param socket the socket
+ */
+static void
+do_receive_shutdown (struct GNUNET_STREAM_Socket *socket)
+{
+ socket->receive_closed = GNUNET_YES;
+ GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
+ socket->receive_buffer = NULL;
+ socket->receive_buffer_size = 0;
+ if (NULL != socket->read_handle)
+ {
+ GNUNET_STREAM_DataProcessor proc;
+ void *proc_cls;
+
+ proc = socket->read_handle->proc;
+ proc_cls = socket->read_handle->proc_cls;
+ GNUNET_STREAM_read_cancel (socket->read_handle);
+ socket->read_handle = NULL;
+ if (NULL != proc)
+ proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
+ }
+}
+
+
+/**
+ * Marks the socket as transmit closed and calls the CompletionContinuation with
+ * GNUNET_STREAM_SHUTDOWN status if a write handle is present
+ *
+ * @param socket the socket
+ */
+static void
+do_transmit_shutdown (struct GNUNET_STREAM_Socket *socket)
+{
+ socket->transmit_closed = GNUNET_YES;
+ /* If write handle is present call it with GNUNET_STREAM_SHUTDOWN to signal
+ that that stream has been shutdown */
+ if (NULL != socket->write_handle)
+ {
+ GNUNET_STREAM_CompletionContinuation wc;
+ void *wc_cls;
+
+ wc = socket->write_handle->write_cont;
+ wc_cls = socket->write_handle->write_cont_cls;
+ GNUNET_STREAM_write_cancel (socket->write_handle);
+ socket->write_handle = NULL;
+ if (NULL != wc)
+ wc (wc_cls,
+ GNUNET_STREAM_SHUTDOWN, 0);
+ }
+}
+
+
+/**
* Common message handler for handling TRANSMIT_CLOSE messages
*
* @param socket the socket through which the ack was received
@@ -1487,22 +1733,40 @@ handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
switch (socket->state)
{
- case STATE_ESTABLISHED:
- socket->state = STATE_RECEIVE_CLOSED;
-
- /* Send TRANSMIT_CLOSE_ACK */
- reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
- reply->header.type =
- htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
- reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- queue_message (socket, reply, NULL, NULL);
+ case STATE_INIT:
+ case STATE_LISTEN:
+ case STATE_HELLO_WAIT:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Ignoring RECEIVE_CLOSE as it cannot be handled now\n",
+ GNUNET_i2s (&socket->other_peer));
+ return GNUNET_OK;
+ default:
break;
-
+ }
+ /* Send TRANSMIT_CLOSE_ACK */
+ reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
+ reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ queue_message (socket, reply, NULL, NULL, GNUNET_NO);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
+ switch(socket->state)
+ {
+ case STATE_RECEIVE_CLOSED:
+ case STATE_RECEIVE_CLOSE_WAIT:
+ case STATE_CLOSE_WAIT:
+ case STATE_CLOSED:
+ return GNUNET_OK;
default:
- /* FIXME: Call statistics? */
break;
}
- return GNUNET_YES;
+ do_receive_shutdown (socket);
+ if (GNUNET_YES == socket->transmit_closed)
+ socket->state = STATE_CLOSED;
+ else
+ socket->state = STATE_RECEIVE_CLOSED;
+ return GNUNET_OK;
}
@@ -1537,6 +1801,28 @@ client_handle_transmit_close (void *cls,
/**
+ * Task for calling the shutdown continuation callback
+ *
+ * @param cls the socket
+ * @param tc the scheduler task context
+ */
+static void
+call_cont_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ GNUNET_assert (NULL != socket->shutdown_handle);
+ socket->shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_NO_TASK;
+ if (NULL != socket->shutdown_handle->completion_cb)
+ socket->shutdown_handle->completion_cb
+ (socket->shutdown_handle->completion_cls,
+ socket->shutdown_handle->operation);
+ GNUNET_free (socket->shutdown_handle);
+ socket->shutdown_handle = NULL;
+}
+
+
+/**
* Generic handler for GNUNET_MESSAGE_TYPE_STREAM_*_CLOSE_ACK messages
*
* @param socket the socket
@@ -1561,12 +1847,12 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
shutdown_handle = socket->shutdown_handle;
if (NULL == shutdown_handle)
{
+ /* This may happen when the shudown handle is cancelled */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received CLOSE_ACK when shutdown handle is NULL\n",
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
switch (operation)
{
case SHUT_RDWR:
@@ -1577,25 +1863,20 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received CLOSE_ACK when shutdown handle is not for "
- "SHUT_RDWR\n",
- GNUNET_i2s (&socket->other_peer));
+ "SHUT_RDWR\n", GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_CLOSED;
break;
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received CLOSE_ACK when in it not expected\n",
+ "%s: Received CLOSE_ACK when it is not expected\n",
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
break;
-
case SHUT_RD:
switch (socket->state)
{
@@ -1604,24 +1885,19 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received RECEIVE_CLOSE_ACK when shutdown handle "
- "is not for SHUT_RD\n",
- GNUNET_i2s (&socket->other_peer));
+ "is not for SHUT_RD\n", GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received RECEIVE_CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_RECEIVE_CLOSED;
break;
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received RECEIVE_CLOSE_ACK when in it not expected\n",
+ "%s: Received RECEIVE_CLOSE_ACK when it is not expected\n",
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
break;
case SHUT_WR:
switch (socket->state)
@@ -1635,37 +1911,29 @@ handle_generic_close_ack (struct GNUNET_STREAM_Socket *socket,
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received TRANSMIT_CLOSE_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
socket->state = STATE_TRANSMIT_CLOSED;
break;
default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received TRANSMIT_CLOSE_ACK when in it not expected\n",
- GNUNET_i2s (&socket->other_peer));
-
+ "%s: Received TRANSMIT_CLOSE_ACK when it is not expected\n",
+ GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
break;
default:
GNUNET_assert (0);
}
-
- if (NULL != shutdown_handle->completion_cb) /* Shutdown completion */
- shutdown_handle->completion_cb(shutdown_handle->completion_cls,
- operation);
- GNUNET_free (shutdown_handle); /* Free shutdown handle */
- socket->shutdown_handle = NULL;
+ shutdown_handle->call_cont_task_id = GNUNET_SCHEDULER_add_now
+ (&call_cont_task, socket);
if (GNUNET_SCHEDULER_NO_TASK
!= shutdown_handle->close_msg_retransmission_task_id)
{
GNUNET_SCHEDULER_cancel
(shutdown_handle->close_msg_retransmission_task_id);
shutdown_handle->close_msg_retransmission_task_id =
- GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_NO_TASK;
}
return GNUNET_OK;
}
@@ -1734,26 +2002,31 @@ handle_receive_close (struct GNUNET_STREAM_Socket *socket,
return GNUNET_OK;
default:
break;
- }
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received RECEIVE_CLOSE from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received RECEIVE_CLOSE from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
receive_close_ack =
GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
receive_close_ack->header.size =
htons (sizeof (struct GNUNET_STREAM_MessageHeader));
receive_close_ack->header.type =
htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK);
- queue_message (socket,
- receive_close_ack,
- &set_state_closed,
- NULL);
-
- /* FIXME: Handle the case where write handle is present; the write operation
- should be deemed as finised and the write continuation callback
- has to be called with the stream status GNUNET_STREAM_SHUTDOWN */
+ queue_message (socket, receive_close_ack, NULL, NULL, GNUNET_NO);
+ switch (socket->state)
+ {
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+ case STATE_CLOSED:
+ case STATE_CLOSE_WAIT:
+ return GNUNET_OK;
+ default:
+ break;
+ }
+ do_transmit_shutdown (socket);
+ if (GNUNET_YES == socket->receive_closed)
+ socket->state = STATE_CLOSED;
+ else
+ socket->state = STATE_TRANSMIT_CLOSED;
return GNUNET_OK;
}
@@ -1853,24 +2126,18 @@ handle_close (struct GNUNET_STREAM_Socket *socket,
default:
break;
}
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received CLOSE from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received CLOSE from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
close_ack = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
close_ack->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
close_ack->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK);
- queue_message (socket,
- close_ack,
- &set_state_closed,
- NULL);
- if (socket->state == STATE_CLOSED)
+ queue_message (socket, close_ack, &set_state_closed, NULL, GNUNET_NO);
+ if ((STATE_CLOSED == socket->state) || (STATE_CLOSE_WAIT == socket->state))
return GNUNET_OK;
-
- GNUNET_free_non_null (socket->receive_buffer); /* Free the receive buffer */
- socket->receive_buffer = NULL;
- socket->receive_buffer_size = 0;
+ if (GNUNET_NO == socket->transmit_closed)
+ do_transmit_shutdown (socket);
+ if (GNUNET_NO == socket->receive_closed)
+ do_receive_shutdown (socket);
return GNUNET_OK;
}
@@ -1997,34 +2264,38 @@ server_handle_hello (void *cls,
&socket->other_peer,
sizeof (struct GNUNET_PeerIdentity)))
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received HELLO from non-confirming peer\n",
- GNUNET_i2s (&socket->other_peer));
+ LOG_DEBUG ("%s: Received HELLO from non-confirming peer\n",
+ GNUNET_i2s (&socket->other_peer));
return GNUNET_YES;
}
-
- GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO ==
- ntohs (message->type));
+ GNUNET_assert (GNUNET_MESSAGE_TYPE_STREAM_HELLO == ntohs (message->type));
GNUNET_assert (socket->tunnel == tunnel);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received HELLO from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
-
- if (STATE_INIT == socket->state)
- {
- reply = generate_hello_ack_msg (socket);
- queue_message (socket,
- &reply->header,
- &set_state_hello_wait,
- NULL);
- }
- else
+ LOG_DEBUG ("%s: Received HELLO from %s\n", GNUNET_i2s (&socket->other_peer),
+ GNUNET_i2s (&socket->other_peer));
+ switch (socket->state)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Client sent HELLO when in state %d\n", socket->state);
+ case STATE_INIT:
+ reply = generate_hello_ack (socket, GNUNET_YES);
+ queue_message (socket, &reply->header, &set_state_hello_wait, NULL,
+ GNUNET_NO);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+ socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &control_retransmission_task, socket);
+ break;
+ case STATE_HELLO_WAIT:
+ /* Perhaps our HELLO_ACK was lost */
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK !=
+ socket->control_retransmission_task_id);
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_now (&control_retransmission_task, socket);
+ break;
+ default:
+ LOG_DEBUG( "%s: Client sent HELLO when in state %d\n",
+ GNUNET_i2s (&socket->other_peer), socket->state);
/* FIXME: Send RESET? */
-
}
return GNUNET_OK;
}
@@ -2057,8 +2328,9 @@ server_handle_hello_ack (void *cls,
ntohs (message->type));
GNUNET_assert (socket->tunnel == tunnel);
ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
- if (STATE_HELLO_WAIT == socket->state)
+ switch (socket->state)
{
+ case STATE_HELLO_WAIT:
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received HELLO_ACK from %s\n",
GNUNET_i2s (&socket->other_peer),
@@ -2070,15 +2342,11 @@ server_handle_hello_ack (void *cls,
(unsigned int) socket->read_sequence_number);
socket->receiver_window_available =
ntohl (ack_message->receiver_window_size);
- /* Attain ESTABLISHED state */
set_state_established (NULL, socket);
- }
- else
- {
+ break;
+ default:
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Client sent HELLO_ACK when in state %d\n", socket->state);
- /* FIXME: Send RESET? */
-
+ "Client sent HELLO_ACK when in state %d\n", socket->state);
}
return GNUNET_OK;
}
@@ -2316,9 +2584,11 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
const struct GNUNET_STREAM_AckMessage *ack,
const struct GNUNET_ATS_Information*atsi)
{
+ struct GNUNET_STREAM_WriteHandle *write_handle;
+ uint64_t ack_bitmap;
unsigned int packet;
int need_retransmission;
-
+ uint32_t sequence_difference;
if (0 != memcmp (sender,
&socket->other_peer,
@@ -2329,7 +2599,6 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
GNUNET_i2s (&socket->other_peer));
return GNUNET_YES;
}
-
switch (socket->state)
{
case (STATE_ESTABLISHED):
@@ -2342,10 +2611,9 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
GNUNET_i2s (&socket->other_peer));
return GNUNET_OK;
}
- /* FIXME: increment in the base sequence number is breaking current flow
- */
- if (!((socket->write_sequence_number
- - ntohl (ack->base_sequence_number)) < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ sequence_difference =
+ socket->write_sequence_number - ntohl (ack->base_sequence_number);
+ if (!(sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Received DATA_ACK with unexpected base sequence number\n",
@@ -2357,46 +2625,59 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
ntohl (ack->base_sequence_number));
return GNUNET_OK;
}
- /* FIXME: include the case when write_handle is cancelled - ignore the
- acks */
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Received DATA_ACK from %s\n",
- GNUNET_i2s (&socket->other_peer),
- GNUNET_i2s (&socket->other_peer));
-
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: Received DATA_ACK from %s\n",
+ GNUNET_i2s (&socket->other_peer), GNUNET_i2s (&socket->other_peer));
/* Cancel the retransmission task */
- if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
{
- GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
- socket->retransmission_timeout_task_id =
- GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
}
-
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
+ if (NULL == socket->write_handle->messages[packet])
+ break;
+ /* BS: Base sequence from ack; PS: sequence num of current packet */
+ sequence_difference = ntohl (ack->base_sequence_number)
+ - ntohl (socket->write_handle->messages[packet]->sequence_number);
+ if (0 == sequence_difference)
+ break; /* The message in our handle is not yet received */
+ /* case where BS = PS + GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
+ /* sequence_difference <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH */
+ ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
+ packet, GNUNET_YES);
+ }
+ if (((ntohl (ack->base_sequence_number)
+ - (socket->write_handle->max_ack_base_num))
+ <= GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH))
+ {
+ socket->write_handle->max_ack_base_num = ntohl (ack->base_sequence_number);
+ socket->receiver_window_available =
+ ntohl (ack->receive_window_remaining);
+ }
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring to modify receive window available as base: %u, max_ack_base: %u\n",
+ ntohl (ack->base_sequence_number),
+ socket->write_handle->max_ack_base_num);
+ if ((packet == GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
+ || ((packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH)
+ && (NULL == socket->write_handle->messages[packet])))
+ goto call_write_cont_cb;
+ GNUNET_assert (ntohl
+ (socket->write_handle->messages[packet]->sequence_number)
+ == ntohl (ack->base_sequence_number));
+ /* Update our bitmap */
+ ack_bitmap = GNUNET_ntohll (ack->bitmap);
+ for (; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
if (NULL == socket->write_handle->messages[packet]) break;
- if (ntohl (ack->base_sequence_number)
- >= ntohl (socket->write_handle->messages[packet]->sequence_number))
- ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
- packet,
- GNUNET_YES);
- else
- if (GNUNET_YES ==
- ackbitmap_is_bit_set (&socket->write_handle->ack_bitmap,
- ntohl (socket->write_handle->messages[packet]->sequence_number)
- - ntohl (ack->base_sequence_number)))
- ackbitmap_modify_bit (&socket->write_handle->ack_bitmap,
- packet,
- GNUNET_YES);
+ if (ackbitmap_is_bit_set (&ack_bitmap, ntohl
+ (socket->write_handle->messages[packet]->sequence_number)
+ - ntohl (ack->base_sequence_number)))
+ ackbitmap_modify_bit (&socket->write_handle->ack_bitmap, packet, GNUNET_YES);
}
-
- /* Update the receive window remaining
- FIXME : Should update with the value from a data ack with greater
- sequence number */
- socket->receiver_window_available =
- ntohl (ack->receive_window_remaining);
-
/* Check if we have received all acknowledgements */
need_retransmission = GNUNET_NO;
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
@@ -2412,26 +2693,26 @@ handle_ack (struct GNUNET_STREAM_Socket *socket,
if (GNUNET_YES == need_retransmission)
{
write_data (socket);
+ return GNUNET_OK;
}
- else /* We have to call the write continuation callback now */
+
+ call_write_cont_cb:
+ /* Free the packets */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
- /* Free the packets */
- for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
- {
- GNUNET_free_non_null (socket->write_handle->messages[packet]);
- }
- if (NULL != socket->write_handle->write_cont)
- socket->write_handle->write_cont
- (socket->write_handle->write_cont_cls,
- socket->status,
- socket->write_handle->size);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Write completion callback completed\n",
- GNUNET_i2s (&socket->other_peer));
- /* We are done with the write handle - Freeing it */
- GNUNET_free (socket->write_handle);
- socket->write_handle = NULL;
+ GNUNET_free_non_null (socket->write_handle->messages[packet]);
}
+ write_handle = socket->write_handle;
+ socket->write_handle = NULL;
+ if (NULL != write_handle->write_cont)
+ write_handle->write_cont (write_handle->write_cont_cls,
+ GNUNET_STREAM_OK,
+ write_handle->size);
+ /* We are done with the write handle - Freeing it */
+ GNUNET_free (write_handle);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "%s: Write completion callback completed\n",
+ GNUNET_i2s (&socket->other_peer));
break;
default:
break;
@@ -2576,30 +2857,20 @@ mesh_peer_connect_callback (void *cls,
GNUNET_i2s(peer));
return;
}
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Target peer %s connected\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
-
/* Set state to INIT */
socket->state = STATE_INIT;
-
/* Send HELLO message */
- message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
- message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
- message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
- queue_message (socket,
- message,
- &set_state_hello_wait,
- NULL);
-
- /* Call open callback */
- if (NULL == socket->open_cb)
- {
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "STREAM_open callback is NULL\n");
- }
+ message = generate_hello ();
+ queue_message (socket, message, &set_state_hello_wait, NULL, GNUNET_NO);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK ==
+ socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id =
+ GNUNET_SCHEDULER_add_delayed (socket->retransmit_timeout,
+ &control_retransmission_task, socket);
}
@@ -2645,19 +2916,34 @@ new_tunnel_notify (void *cls,
/* FIXME: If a tunnel is already created, we should not accept new tunnels
from the same peer again until the socket is closed */
+ if (GNUNET_NO == lsocket->listening)
+ {
+ GNUNET_MESH_tunnel_destroy (tunnel);
+ return NULL;
+ }
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *initiator;
socket->tunnel = tunnel;
- socket->session_id = 0; /* FIXME */
socket->state = STATE_INIT;
socket->lsocket = lsocket;
-
+ socket->stat_handle = lsocket->stat_handle;
+ socket->retransmit_timeout = lsocket->retransmit_timeout;
+ socket->testing_active = lsocket->testing_active;
+ socket->testing_set_write_sequence_number_value =
+ lsocket->testing_set_write_sequence_number_value;
+ socket->max_payload_size = lsocket->max_payload_size;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Peer %s initiated tunnel to us\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
-
- /* FIXME: Copy MESH handle from lsocket to socket */
+ if (NULL != socket->stat_handle)
+ {
+ GNUNET_STATISTICS_update (socket->stat_handle,
+ "total inbound connections received",
+ 1, GNUNET_NO);
+ GNUNET_STATISTICS_update (socket->stat_handle,
+ "inbound connections", 1, GNUNET_NO);
+ }
return socket;
}
@@ -2681,47 +2967,123 @@ tunnel_cleaner (void *cls,
void *tunnel_ctx)
{
struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+ struct MessageQueue *head;
- if (tunnel != socket->tunnel)
- return;
-
+ GNUNET_assert (tunnel == socket->tunnel);
GNUNET_break_op(0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: Peer %s has terminated connection abruptly\n",
GNUNET_i2s (&socket->other_peer),
GNUNET_i2s (&socket->other_peer));
-
- socket->status = GNUNET_STREAM_SHUTDOWN;
-
+ if (NULL != socket->stat_handle)
+ {
+ GNUNET_STATISTICS_update (socket->stat_handle,
+ "connections terminated abruptly", 1, GNUNET_NO);
+ GNUNET_STATISTICS_update (socket->stat_handle,
+ "inbound connections", -1, GNUNET_NO);
+ }
/* Clear Transmit handles */
if (NULL != socket->transmit_handle)
{
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
socket->transmit_handle = NULL;
}
- if (NULL != socket->ack_transmit_handle)
- {
- GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
- GNUNET_free (socket->ack_msg);
- socket->ack_msg = NULL;
- socket->ack_transmit_handle = NULL;
- }
/* Stop Tasks using socket->tunnel */
if (GNUNET_SCHEDULER_NO_TASK != socket->ack_task_id)
{
GNUNET_SCHEDULER_cancel (socket->ack_task_id);
socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
}
- if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* Terminate the control retransmission tasks */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
{
- GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
- socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ socket->control_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
+ }
+ /* Clear existing message queue */
+ while (NULL != (head = socket->queue_head)) {
+ GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+ socket->queue_tail,
+ head);
+ GNUNET_free (head->message);
+ GNUNET_free (head);
}
- /* FIXME: Cancel all other tasks using socket->tunnel */
socket->tunnel = NULL;
}
+/**
+ * Callback to signal timeout on lockmanager lock acquire
+ *
+ * @param cls the ListenSocket
+ * @param tc the scheduler task context
+ */
+static void
+lockmanager_acquire_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+ GNUNET_STREAM_ListenCallback listen_cb;
+ void *listen_cb_cls;
+
+ lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ listen_cb = lsocket->listen_cb;
+ listen_cb_cls = lsocket->listen_cb_cls;
+ if (NULL != listen_cb)
+ listen_cb (listen_cb_cls, NULL, NULL);
+}
+
+
+/**
+ * Callback to notify us on the status changes on app_port lock
+ *
+ * @param cls the ListenSocket
+ * @param domain the domain name of the lock
+ * @param lock the app_port
+ * @param status the current status of the lock
+ */
+static void
+lock_status_change_cb (void *cls, const char *domain, uint32_t lock,
+ enum GNUNET_LOCKMANAGER_Status status)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+
+ GNUNET_assert (lock == (uint32_t) lsocket->port);
+ if (GNUNET_LOCKMANAGER_SUCCESS == status)
+ {
+ lsocket->listening = GNUNET_YES;
+ if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
+ {
+ GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
+ lsocket->lockmanager_acquire_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ if (NULL == lsocket->mesh)
+ {
+ GNUNET_MESH_ApplicationType ports[] = {lsocket->port, 0};
+
+ lsocket->mesh = GNUNET_MESH_connect (lsocket->cfg,
+ lsocket, /* Closure */
+ &new_tunnel_notify,
+ &tunnel_cleaner,
+ server_message_handlers,
+ ports);
+ GNUNET_assert (NULL != lsocket->mesh);
+ if (NULL != lsocket->listen_ok_cb)
+ {
+ (void) lsocket->listen_ok_cb ();
+ }
+ }
+ }
+ if (GNUNET_LOCKMANAGER_RELEASE == status)
+ lsocket->listening = GNUNET_NO;
+}
+
+
/*****************/
/* API functions */
/*****************/
@@ -2734,7 +3096,8 @@ tunnel_cleaner (void *cls,
* @param target the target peer to which the stream has to be opened
* @param app_port the application port number which uniquely identifies this
* stream
- * @param open_cb this function will be called after stream has be established
+ * @param open_cb this function will be called after stream has be established;
+ * cannot be NULL
* @param open_cb_cls the closure for open_cb
* @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
* @return if successful it returns the stream socket; NULL if stream cannot be
@@ -2751,17 +3114,20 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
struct GNUNET_STREAM_Socket *socket;
enum GNUNET_STREAM_Option option;
GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
- va_list vargs; /* Variable arguments */
+ va_list vargs;
+ uint16_t payload_size;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s\n", __func__);
+ GNUNET_assert (NULL != open_cb);
socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
socket->other_peer = *target;
socket->open_cb = open_cb;
socket->open_cls = open_cb_cls;
/* Set defaults */
- socket->retransmit_timeout =
- GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+ socket->retransmit_timeout = TIME_REL_SECS (default_timeout);
+ socket->testing_active = GNUNET_NO;
+ socket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
va_start (vargs, open_cb_cls); /* Parse variable args */
do {
option = va_arg (vargs, enum GNUNET_STREAM_Option);
@@ -2772,13 +3138,29 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
socket->retransmit_timeout = va_arg (vargs,
struct GNUNET_TIME_Relative);
break;
+ case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
+ socket->testing_active = GNUNET_YES;
+ socket->testing_set_write_sequence_number_value = va_arg (vargs,
+ uint32_t);
+ break;
+ case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
+ GNUNET_break (0); /* Option irrelevant in STREAM_open */
+ break;
+ case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
+ GNUNET_break (0); /* Option irrelevant in STREAM_open */
+ break;
+ case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
+ payload_size = (uint16_t) va_arg (vargs, unsigned int);
+ GNUNET_assert (0 != payload_size);
+ if (payload_size < socket->max_payload_size)
+ socket->max_payload_size = payload_size;
+ break;
case GNUNET_STREAM_OPTION_END:
break;
}
} while (GNUNET_STREAM_OPTION_END != option);
va_end (vargs); /* End of variable args parsing */
socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
- 10, /* QUEUE size as parameter? */
socket, /* cls */
NULL, /* No inbound tunnel handler */
NULL, /* No in-tunnel cleaner */
@@ -2789,10 +3171,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
GNUNET_free (socket);
return NULL;
}
-
/* Now create the mesh tunnel to target */
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Creating MESH Tunnel\n");
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Creating MESH Tunnel\n");
socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
NULL, /* Tunnel context */
&mesh_peer_connect_callback,
@@ -2801,9 +3181,8 @@ GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
GNUNET_assert (NULL != socket->tunnel);
GNUNET_MESH_peer_request_connect_add (socket->tunnel,
&socket->other_peer);
-
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s() END\n", __func__);
+ socket->stat_handle = GNUNET_STATISTICS_create ("stream", cfg);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s() END\n", __func__);
return socket;
}
@@ -2828,13 +3207,22 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
struct GNUNET_STREAM_MessageHeader *msg;
GNUNET_assert (NULL == socket->shutdown_handle);
-
handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ShutdownHandle));
handle->socket = socket;
handle->completion_cb = completion_cb;
handle->completion_cls = completion_cls;
socket->shutdown_handle = handle;
-
+ if ( ((GNUNET_YES == socket->receive_closed) && (SHUT_RD == operation))
+ || ((GNUNET_YES == socket->transmit_closed) && (SHUT_WR == operation))
+ || ((GNUNET_YES == socket->transmit_closed)
+ && (GNUNET_YES == socket->receive_closed)
+ && (SHUT_RDWR == operation)) )
+ {
+ handle->operation = operation;
+ handle->call_cont_task_id = GNUNET_SCHEDULER_add_now (&call_cont_task,
+ socket);
+ return handle;
+ }
msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
msg->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
switch (operation)
@@ -2846,10 +3234,9 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
"Existing read handle should be cancelled before shutting"
" down reading\n");
msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE);
- queue_message (socket,
- msg,
- &set_state_receive_close_wait,
- NULL);
+ queue_message (socket, msg, &set_state_receive_close_wait, NULL,
+ GNUNET_NO);
+ socket->receive_closed = GNUNET_YES;
break;
case SHUT_WR:
handle->operation = SHUT_WR;
@@ -2858,10 +3245,9 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
"Existing write handle should be cancelled before shutting"
" down writing\n");
msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE);
- queue_message (socket,
- msg,
- &set_state_transmit_close_wait,
- NULL);
+ queue_message (socket, msg, &set_state_transmit_close_wait, NULL,
+ GNUNET_NO);
+ socket->transmit_closed = GNUNET_YES;
break;
case SHUT_RDWR:
handle->operation = SHUT_RDWR;
@@ -2874,10 +3260,9 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
"Existing read handle should be cancelled before shutting"
" down reading\n");
msg->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_CLOSE);
- queue_message (socket,
- msg,
- &set_state_close_wait,
- NULL);
+ queue_message (socket, msg, &set_state_close_wait, NULL, GNUNET_NO);
+ socket->transmit_closed = GNUNET_YES;
+ socket->receive_closed = GNUNET_YES;
break;
default:
LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -2896,7 +3281,10 @@ GNUNET_STREAM_shutdown (struct GNUNET_STREAM_Socket *socket,
/**
- * Cancels a pending shutdown
+ * Cancels a pending shutdown. Note that the shutdown messages may already be
+ * sent and the stream is shutdown already for the operation given to
+ * GNUNET_STREAM_shutdown(). This function only clears up any retranmissions of
+ * shutdown messages and frees the shutdown handle.
*
* @param handle the shutdown handle returned from GNUNET_STREAM_shutdown
*/
@@ -2905,8 +3293,10 @@ GNUNET_STREAM_shutdown_cancel (struct GNUNET_STREAM_ShutdownHandle *handle)
{
if (GNUNET_SCHEDULER_NO_TASK != handle->close_msg_retransmission_task_id)
GNUNET_SCHEDULER_cancel (handle->close_msg_retransmission_task_id);
+ if (GNUNET_SCHEDULER_NO_TASK != handle->call_cont_task_id)
+ GNUNET_SCHEDULER_cancel (handle->call_cont_task_id);
+ handle->socket->shutdown_handle = NULL;
GNUNET_free (handle);
- return;
}
@@ -2924,42 +3314,32 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Closing STREAM socket when a read handle is pending\n");
+ GNUNET_STREAM_read_cancel (socket->read_handle);
}
if (NULL != socket->write_handle)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
"Closing STREAM socket when a write handle is pending\n");
+ GNUNET_STREAM_write_cancel (socket->write_handle);
+ //socket->write_handle = NULL;
}
-
- if (socket->read_task_id != GNUNET_SCHEDULER_NO_TASK)
- {
- /* socket closed with read task pending!? */
- GNUNET_break (0);
- GNUNET_SCHEDULER_cancel (socket->read_task_id);
- socket->read_task_id = GNUNET_SCHEDULER_NO_TASK;
- }
-
- /* Terminate the ack'ing tasks if they are still present */
+ /* Terminate the ack'ing task if they are still present */
if (socket->ack_task_id != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (socket->ack_task_id);
socket->ack_task_id = GNUNET_SCHEDULER_NO_TASK;
}
-
+ /* Terminate the control retransmission tasks */
+ if (GNUNET_SCHEDULER_NO_TASK != socket->control_retransmission_task_id)
+ {
+ GNUNET_SCHEDULER_cancel (socket->control_retransmission_task_id);
+ }
/* Clear Transmit handles */
if (NULL != socket->transmit_handle)
{
GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
socket->transmit_handle = NULL;
}
- if (NULL != socket->ack_transmit_handle)
- {
- GNUNET_MESH_notify_transmit_ready_cancel (socket->ack_transmit_handle);
- GNUNET_free (socket->ack_msg);
- socket->ack_msg = NULL;
- socket->ack_transmit_handle = NULL;
- }
-
/* Clear existing message queue */
while (NULL != (head = socket->queue_head)) {
GNUNET_CONTAINER_DLL_remove (socket->queue_head,
@@ -2968,27 +3348,26 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
GNUNET_free (head->message);
GNUNET_free (head);
}
-
/* Close associated tunnel */
if (NULL != socket->tunnel)
{
GNUNET_MESH_tunnel_destroy (socket->tunnel);
socket->tunnel = NULL;
}
-
/* Close mesh connection */
- if (NULL != socket->mesh && NULL == socket->lsocket)
+ if ((NULL != socket->mesh) && (NULL == socket->lsocket))
{
GNUNET_MESH_disconnect (socket->mesh);
socket->mesh = NULL;
}
-
+ /* Close statistics connection */
+ if ( (NULL != socket->stat_handle) && (NULL == socket->lsocket) )
+ GNUNET_STATISTICS_destroy (socket->stat_handle, GNUNET_YES);
/* Release receive buffer */
if (NULL != socket->receive_buffer)
{
GNUNET_free (socket->receive_buffer);
}
-
GNUNET_free (socket);
}
@@ -3001,30 +3380,84 @@ GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
* @param listen_cb this function will be called when a peer tries to establish
* a stream with us
* @param listen_cb_cls closure for listen_cb
+ * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
* @return listen socket, NULL for any error
*/
struct GNUNET_STREAM_ListenSocket *
GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
GNUNET_MESH_ApplicationType app_port,
GNUNET_STREAM_ListenCallback listen_cb,
- void *listen_cb_cls)
+ void *listen_cb_cls,
+ ...)
{
- /* FIXME: Add variable args for passing configration options? */
struct GNUNET_STREAM_ListenSocket *lsocket;
- GNUNET_MESH_ApplicationType ports[] = {app_port, 0};
+ struct GNUNET_TIME_Relative listen_timeout;
+ enum GNUNET_STREAM_Option option;
+ va_list vargs;
+ uint16_t payload_size;
+ GNUNET_assert (NULL != listen_cb);
lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
+ lsocket->cfg = GNUNET_CONFIGURATION_dup (cfg);
+ lsocket->lockmanager = GNUNET_LOCKMANAGER_connect (lsocket->cfg);
+ if (NULL == lsocket->lockmanager)
+ {
+ GNUNET_CONFIGURATION_destroy (lsocket->cfg);
+ GNUNET_free (lsocket);
+ return NULL;
+ }
+ lsocket->listening = GNUNET_NO;/* We listen when we get a lock on app_port */
+ /* Set defaults */
+ lsocket->retransmit_timeout = TIME_REL_SECS (default_timeout);
+ lsocket->testing_active = GNUNET_NO;
+ lsocket->listen_ok_cb = NULL;
+ lsocket->max_payload_size = DEFAULT_MAX_PAYLOAD_SIZE;
+ listen_timeout = TIME_REL_SECS (60); /* A minute for listen timeout */
+ va_start (vargs, listen_cb_cls);
+ do {
+ option = va_arg (vargs, enum GNUNET_STREAM_Option);
+ switch (option)
+ {
+ case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
+ lsocket->retransmit_timeout = va_arg (vargs,
+ struct GNUNET_TIME_Relative);
+ break;
+ case GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER:
+ lsocket->testing_active = GNUNET_YES;
+ lsocket->testing_set_write_sequence_number_value = va_arg (vargs,
+ uint32_t);
+ break;
+ case GNUNET_STREAM_OPTION_LISTEN_TIMEOUT:
+ listen_timeout = GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_MILLISECONDS, va_arg (vargs, uint32_t));
+ break;
+ case GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS:
+ lsocket->listen_ok_cb = va_arg (vargs,
+ GNUNET_STREAM_ListenSuccessCallback);
+ break;
+ case GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE:
+ payload_size = (uint16_t) va_arg (vargs, unsigned int);
+ GNUNET_assert (0 != payload_size);
+ if (payload_size < lsocket->max_payload_size)
+ lsocket->max_payload_size = payload_size;
+ break;
+ case GNUNET_STREAM_OPTION_END:
+ break;
+ }
+ } while (GNUNET_STREAM_OPTION_END != option);
+ va_end (vargs);
lsocket->port = app_port;
lsocket->listen_cb = listen_cb;
lsocket->listen_cb_cls = listen_cb_cls;
- lsocket->mesh = GNUNET_MESH_connect (cfg,
- 10, /* FIXME: QUEUE size as parameter? */
- lsocket, /* Closure */
- &new_tunnel_notify,
- &tunnel_cleaner,
- server_message_handlers,
- ports);
- GNUNET_assert (NULL != lsocket->mesh);
+ lsocket->locking_request =
+ GNUNET_LOCKMANAGER_acquire_lock (lsocket->lockmanager, locking_domain,
+ (uint32_t) lsocket->port,
+ &lock_status_change_cb, lsocket);
+ lsocket->lockmanager_acquire_timeout_task =
+ GNUNET_SCHEDULER_add_delayed (listen_timeout,
+ &lockmanager_acquire_timeout, lsocket);
+ lsocket->stat_handle = GNUNET_STATISTICS_create ("stream",
+ lsocket->cfg);
return lsocket;
}
@@ -3038,16 +3471,24 @@ void
GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
{
/* Close MESH connection */
- GNUNET_assert (NULL != lsocket->mesh);
- GNUNET_MESH_disconnect (lsocket->mesh);
-
+ if (NULL != lsocket->mesh)
+ GNUNET_MESH_disconnect (lsocket->mesh);
+ if (NULL != lsocket->stat_handle)
+ GNUNET_STATISTICS_destroy (lsocket->stat_handle, GNUNET_YES);
+ GNUNET_CONFIGURATION_destroy (lsocket->cfg);
+ if (GNUNET_SCHEDULER_NO_TASK != lsocket->lockmanager_acquire_timeout_task)
+ GNUNET_SCHEDULER_cancel (lsocket->lockmanager_acquire_timeout_task);
+ if (NULL != lsocket->locking_request)
+ GNUNET_LOCKMANAGER_cancel_request (lsocket->locking_request);
+ if (NULL != lsocket->lockmanager)
+ GNUNET_LOCKMANAGER_disconnect (lsocket->lockmanager);
GNUNET_free (lsocket);
}
/**
* Tries to write the given data to the stream. The maximum size of data that
- * can be written as part of a write operation is (64 * (64000 - sizeof (struct
+ * can be written per a write operation is ~ 4MB (64 * (64000 - sizeof (struct
* GNUNET_STREAM_DataMessage))). If size is greater than this it is not an API
* violation, however only the said number of maximum bytes will be written.
*
@@ -3059,11 +3500,12 @@ GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
* stream
* @param write_cont_cls the closure
*
- * @return handle to cancel the operation; if a previous write is pending or
- * the stream has been shutdown for this operation then write_cont is
- * immediately called and NULL is returned.
+ * @return handle to cancel the operation; if a previous write is pending NULL
+ * is returned. If the stream has been shutdown for this operation or
+ * is broken then write_cont is immediately called and NULL is
+ * returned.
*/
-struct GNUNET_STREAM_IOWriteHandle *
+struct GNUNET_STREAM_WriteHandle *
GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
const void *data,
size_t size,
@@ -3071,25 +3513,29 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
GNUNET_STREAM_CompletionContinuation write_cont,
void *write_cont_cls)
{
+ struct GNUNET_STREAM_WriteHandle *io_handle;
+ struct GNUNET_STREAM_DataMessage *data_msg;
+ const void *sweep;
+ struct GNUNET_TIME_Relative ack_deadline;
unsigned int num_needed_packets;
unsigned int packet;
- struct GNUNET_STREAM_IOWriteHandle *io_handle;
uint32_t packet_size;
uint32_t payload_size;
- struct GNUNET_STREAM_DataMessage *data_msg;
- const void *sweep;
- struct GNUNET_TIME_Relative ack_deadline;
+ uint16_t max_data_packet_size;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s\n", __func__);
-
- /* Return NULL if there is already a write request pending */
if (NULL != socket->write_handle)
{
GNUNET_break (0);
return NULL;
}
-
+ if (NULL == socket->tunnel)
+ {
+ if (NULL != write_cont)
+ write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
+ return NULL;
+ }
switch (socket->state)
{
case STATE_TRANSMIT_CLOSED:
@@ -3105,7 +3551,6 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
case STATE_LISTEN:
case STATE_HELLO_WAIT:
if (NULL != write_cont)
- /* FIXME: GNUNET_STREAM_SYSERR?? */
write_cont (write_cont_cls, GNUNET_STREAM_SYSERR, 0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s() END\n", __func__);
@@ -3115,32 +3560,36 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
case STATE_RECEIVE_CLOSE_WAIT:
break;
}
-
- if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
- size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
- num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
- io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+ if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size < size)
+ size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * socket->max_payload_size;
+ num_needed_packets =
+ (size + (socket->max_payload_size - 1)) / socket->max_payload_size;
+ io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_WriteHandle));
io_handle->socket = socket;
io_handle->write_cont = write_cont;
io_handle->write_cont_cls = write_cont_cls;
io_handle->size = size;
+ io_handle->packets_sent = 0;
sweep = data;
/* FIXME: Remove the fixed delay for ack deadline; Set it to the value
determined from RTT */
ack_deadline = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5);
/* Divide the given buffer into packets for sending */
+ max_data_packet_size =
+ socket->max_payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
+ io_handle->max_ack_base_num = socket->write_sequence_number;
for (packet=0; packet < num_needed_packets; packet++)
{
- if ((packet + 1) * max_payload_size < size)
+ if ((packet + 1) * socket->max_payload_size < size)
{
- payload_size = max_payload_size;
- packet_size = MAX_PACKET_SIZE;
+ payload_size = socket->max_payload_size;
+ packet_size = max_data_packet_size;
}
else
{
- payload_size = size - packet * max_payload_size;
- packet_size = payload_size + sizeof (struct
- GNUNET_STREAM_DataMessage);
+ payload_size = size - packet * socket->max_payload_size;
+ packet_size =
+ payload_size + sizeof (struct GNUNET_STREAM_DataMessage);
}
io_handle->messages[packet] = GNUNET_malloc (packet_size);
io_handle->messages[packet]->header.header.size = htons (packet_size);
@@ -3149,143 +3598,170 @@ GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
io_handle->messages[packet]->sequence_number =
htonl (socket->write_sequence_number++);
io_handle->messages[packet]->offset = htonl (socket->write_offset);
-
/* FIXME: Remove the fixed delay for ack deadline; Set it to the value
determined from RTT */
io_handle->messages[packet]->ack_deadline =
GNUNET_TIME_relative_hton (ack_deadline);
data_msg = io_handle->messages[packet];
/* Copy data from given buffer to the packet */
- memcpy (&data_msg[1],
- sweep,
- payload_size);
+ memcpy (&data_msg[1], sweep, payload_size);
sweep += payload_size;
socket->write_offset += payload_size;
}
+ /* ack the last data message. FIXME: remove when we figure out how to do this
+ using RTT */
+ io_handle->messages[num_needed_packets - 1]->ack_deadline =
+ GNUNET_TIME_relative_hton (GNUNET_TIME_UNIT_ZERO);
+ socket->data_retransmit_timeout = GNUNET_TIME_UNIT_SECONDS;
socket->write_handle = io_handle;
write_data (socket);
-
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s() END\n", __func__);
-
return io_handle;
}
+/**
+ * Function to check the ACK bitmap for any received messages and call the data processor
+ *
+ * @param cls the socket
+ * @param tc the scheduler task context
+ */
+static void
+probe_data_availability (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ GNUNET_assert (NULL != socket->read_handle);
+ socket->read_handle->probe_data_availability_task_id =
+ GNUNET_SCHEDULER_NO_TASK;
+ if (GNUNET_SCHEDULER_NO_TASK != socket->read_handle->read_task_id)
+ return; /* A task to call read processor is present */
+ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ 0))
+ socket->read_handle->read_task_id
+ = GNUNET_SCHEDULER_add_now (&call_read_processor, socket);
+}
+
+
/**
- * Tries to read data from the stream.
+ * Tries to read data from the stream. Should not be called when another read
+ * handle is present; the existing read handle should be canceled with
+ * GNUNET_STREAM_read_cancel(). Only one read handle per socket is present at
+ * any time
*
* @param socket the socket representing a stream
* @param timeout the timeout period
* @param proc function to call with data (once only)
* @param proc_cls the closure for proc
- *
- * @return handle to cancel the operation; if the stream has been shutdown for
- * this type of opeartion then the DataProcessor is immediately
- * called with GNUNET_STREAM_SHUTDOWN as status and NULL if returned
+ * @return handle to cancel the operation; NULL is returned if the stream has
+ * been shutdown for this type of opeartion (the DataProcessor is
+ * immediately called with GNUNET_STREAM_SHUTDOWN as status)
*/
-struct GNUNET_STREAM_IOReadHandle *
+struct GNUNET_STREAM_ReadHandle *
GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
struct GNUNET_TIME_Relative timeout,
GNUNET_STREAM_DataProcessor proc,
void *proc_cls)
{
- struct GNUNET_STREAM_IOReadHandle *read_handle;
+ struct GNUNET_STREAM_ReadHandle *read_handle;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: %s()\n",
GNUNET_i2s (&socket->other_peer),
__func__);
-
- /* Return NULL if there is already a read handle; the user has to cancel that
- first before continuing or has to wait until it is completed */
- if (NULL != socket->read_handle) return NULL;
-
+ /* Only one read handle is permitted at any time; cancel the existing or wait
+ for it to complete */
+ GNUNET_assert (NULL == socket->read_handle);
GNUNET_assert (NULL != proc);
-
+ if (GNUNET_YES == socket->receive_closed)
+ return NULL;
switch (socket->state)
{
case STATE_RECEIVE_CLOSED:
case STATE_RECEIVE_CLOSE_WAIT:
case STATE_CLOSED:
case STATE_CLOSE_WAIT:
- proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"%s: %s() END\n",
GNUNET_i2s (&socket->other_peer),
__func__);
+ proc (proc_cls, GNUNET_STREAM_SHUTDOWN, NULL, 0);
return NULL;
default:
break;
}
-
- read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
+ read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ReadHandle));
read_handle->proc = proc;
read_handle->proc_cls = proc_cls;
+ read_handle->socket = socket;
socket->read_handle = read_handle;
-
- /* Check if we have a packet at bitmap 0 */
- if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
- 0))
- {
- socket->read_task_id = GNUNET_SCHEDULER_add_now (&call_read_processor,
- socket);
-
- }
-
- /* Setup the read timeout task */
- socket->read_io_timeout_task_id =
- GNUNET_SCHEDULER_add_delayed (timeout,
- &read_io_timeout,
- socket);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "%s: %s() END\n",
- GNUNET_i2s (&socket->other_peer),
- __func__);
+ read_handle->probe_data_availability_task_id =
+ GNUNET_SCHEDULER_add_now (&probe_data_availability, socket);
+ read_handle->read_io_timeout_task_id =
+ GNUNET_SCHEDULER_add_delayed (timeout, &read_io_timeout, socket);
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "%s: %s() END\n",
+ GNUNET_i2s (&socket->other_peer), __func__);
return read_handle;
}
/**
- * Cancel pending write operation.
+ * Cancels pending write operation. Also cancels packet retransmissions which
+ * may have resulted otherwise.
+ *
+ * CAUTION: Normally a write operation is considered successful if the data
+ * given to it is sent and acknowledged by the receiver. As data is divided
+ * into packets, it is possible that not all packets are received by the
+ * receiver. Any missing packets are then retransmitted till the receiver
+ * acknowledges all packets or until a timeout . During this scenario if the
+ * write operation is cancelled all such retransmissions are also
+ * cancelled. This may leave the receiver's receive buffer incompletely filled
+ * as some missing packets are never retransmitted. So this operation should be
+ * used before shutting down transmission from our side or before closing the
+ * socket.
*
- * @param ioh handle to operation to cancel
+ * @param wh write operation handle to cancel
*/
void
-GNUNET_STREAM_io_write_cancel (struct GNUNET_STREAM_IOWriteHandle *ioh)
+GNUNET_STREAM_write_cancel (struct GNUNET_STREAM_WriteHandle *wh)
{
- struct GNUNET_STREAM_Socket *socket = ioh->socket;
+ struct GNUNET_STREAM_Socket *socket = wh->socket;
unsigned int packet;
GNUNET_assert (NULL != socket->write_handle);
- GNUNET_assert (socket->write_handle == ioh);
-
- if (GNUNET_SCHEDULER_NO_TASK != socket->retransmission_timeout_task_id)
+ GNUNET_assert (socket->write_handle == wh);
+ if (GNUNET_SCHEDULER_NO_TASK != socket->data_retransmission_task_id)
{
- GNUNET_SCHEDULER_cancel (socket->retransmission_timeout_task_id);
- socket->retransmission_timeout_task_id = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (socket->data_retransmission_task_id);
+ socket->data_retransmission_task_id = GNUNET_SCHEDULER_NO_TASK;
}
-
for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
{
- if (NULL == ioh->messages[packet]) break;
- GNUNET_free (ioh->messages[packet]);
- }
-
+ if (NULL == wh->messages[packet]) break;
+ GNUNET_free (wh->messages[packet]);
+ }
GNUNET_free (socket->write_handle);
socket->write_handle = NULL;
- return;
}
/**
* Cancel pending read operation.
*
- * @param ioh handle to operation to cancel
+ * @param rh read operation handle to cancel
*/
void
-GNUNET_STREAM_io_read_cancel (struct GNUNET_STREAM_IOReadHandle *ioh)
+GNUNET_STREAM_read_cancel (struct GNUNET_STREAM_ReadHandle *rh)
{
- return;
+ struct GNUNET_STREAM_Socket *socket;
+
+ socket = rh->socket;
+ GNUNET_assert (NULL != socket->read_handle);
+ GNUNET_assert (rh == socket->read_handle);
+ cleanup_read_handle (socket);
}
+
+/* end of stream_api.c */
diff --git a/src/stream/test_stream_2peers.c b/src/stream/test_stream_2peers.c
index 1fdc0ee..0126439 100644
--- a/src/stream/test_stream_2peers.c
+++ b/src/stream/test_stream_2peers.c
@@ -30,21 +30,30 @@
#include "gnunet_util_lib.h"
#include "gnunet_mesh_service.h"
#include "gnunet_stream_lib.h"
-#include "gnunet_testing_lib.h"
-
-#define VERBOSE 1
+#include "gnunet_testbed_service.h"
/**
- * Number of peers
+ * Number of peers; Do NOT change this
*/
#define NUM_PEERS 2
/**
+ * Shorthand for Relative time in seconds
+ */
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
+/**
* Structure for holding peer's sockets and IO Handles
*/
struct PeerData
{
/**
+ * Handle to testbed peer
+ */
+ struct GNUNET_TESTBED_Peer *peer;
+
+ /**
* Peer's stream socket
*/
struct GNUNET_STREAM_Socket *socket;
@@ -52,12 +61,12 @@ struct PeerData
/**
* Peer's io write handle
*/
- struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
+ struct GNUNET_STREAM_WriteHandle *io_write_handle;
/**
* Peer's io read handle
*/
- struct GNUNET_STREAM_IOReadHandle *io_read_handle;
+ struct GNUNET_STREAM_ReadHandle *io_read_handle;
/**
* Peer's shutdown handle
@@ -65,6 +74,11 @@ struct PeerData
struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
/**
+ * The service connect operation to stream
+ */
+ struct GNUNET_TESTBED_Operation *op;
+
+ /**
* Our Peer id
*/
struct GNUNET_PeerIdentity our_id;
@@ -80,25 +94,53 @@ struct PeerData
unsigned int bytes_read;
};
+
/**
- * The current peer group
+ * Different states in test setup
*/
-static struct GNUNET_TESTING_PeerGroup *pg;
+enum SetupState
+{
+ /**
+ * Get the identity of peer 1
+ */
+ PEER1_GET_IDENTITY,
+
+ /**
+ * Get the identity of peer 2
+ */
+ PEER2_GET_IDENTITY,
+
+ /**
+ * Connect to stream service of peer 1
+ */
+ PEER1_STREAM_CONNECT,
+
+ /**
+ * Connect to stream service of peer 2
+ */
+ PEER2_STREAM_CONNECT
+
+};
/**
- * Peer 1 daemon
+ * Various states during test setup
*/
-static struct GNUNET_TESTING_Daemon *d1;
+static enum SetupState setup_state;
/**
- * Peer 2 daemon
+ * Data context for peer 1
*/
-static struct GNUNET_TESTING_Daemon *d2;
-
static struct PeerData peer1;
+
+/**
+ * Data context for peer 2
+ */
static struct PeerData peer2;
-static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
-static struct GNUNET_CONFIGURATION_Handle *config;
+
+/**
+ * Testbed operation handle
+ */
+static struct GNUNET_TESTBED_Operation *op;
static GNUNET_SCHEDULER_TaskIdentifier abort_task;
@@ -185,51 +227,19 @@ stream_write_task (void *cls,
/**
- * Check whether peers successfully shut down.
- */
-static void
-peergroup_shutdown_callback (void *cls, const char *emsg)
-{
- if (emsg != NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Shutdown of peers failed!\n");
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "All peers successfully shut down!\n");
- }
- GNUNET_CONFIGURATION_destroy (config);
-}
-
-
-/**
* Close sockets and stop testing deamons nicely
*/
static void
do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
if (NULL != peer1.socket)
GNUNET_STREAM_close (peer1.socket);
- if (NULL != peer2.socket)
- GNUNET_STREAM_close (peer2.socket);
- if (NULL != peer2_listen_socket)
- GNUNET_STREAM_listen_close (peer2_listen_socket);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
- if (0 != abort_task)
- {
- GNUNET_SCHEDULER_cancel (abort_task);
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
-
- GNUNET_TESTING_daemons_stop (pg,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &peergroup_shutdown_callback,
- NULL);
+ if (NULL != peer1.op)
+ GNUNET_TESTBED_operation_done (peer1.op);
+ else
+ GNUNET_SCHEDULER_shutdown (); /* For shutting down testbed */
}
@@ -244,12 +254,18 @@ static void
shutdown_completion (void *cls,
int operation)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "STREAM shutdown successful\n");
- GNUNET_SCHEDULER_add_now (&do_close,
- cls);
-}
+ static int shutdowns;
+ if (++shutdowns == 1)
+ {
+ peer1.shutdown_handle = NULL;
+ peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
+ &shutdown_completion, cls);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
+ GNUNET_SCHEDULER_add_now (&do_close, cls);
+}
/**
@@ -258,10 +274,9 @@ shutdown_completion (void *cls,
static void
do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket,
- SHUT_RDWR,
- &shutdown_completion,
- cls);
+ result = GNUNET_OK;
+ peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
+ &shutdown_completion, cls);
}
@@ -306,7 +321,7 @@ write_completion (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Writing completed\n");
- if (&peer1 == peer) /* Peer1 has finished writing; should read now */
+ if (&peer2 == peer) /* Peer1 has finished writing; should read now */
{
peer->bytes_read = 0;
GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
@@ -333,10 +348,9 @@ stream_open_cb (void *cls,
{
struct PeerData *peer=cls;
- GNUNET_assert (&peer1 == peer);
- GNUNET_assert (socket == peer1.socket);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Stream established from peer1\n",
+ GNUNET_assert (&peer2 == peer);
+ GNUNET_assert (socket == peer2.socket);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Stream established from peer2\n",
GNUNET_i2s (&peer1.our_id));
peer->bytes_wrote = 0;
GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
@@ -385,7 +399,7 @@ input_processor (void *cls,
}
else
{
- if (&peer2 == peer) /* Peer2 has completed reading; should write */
+ if (&peer1 == peer) /* Peer2 has completed reading; should write */
{
peer->bytes_wrote = 0;
GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
@@ -416,145 +430,232 @@ stream_listen_cb (void *cls,
struct GNUNET_STREAM_Socket *socket,
const struct GNUNET_PeerIdentity *initiator)
{
- GNUNET_assert (NULL != socket);
+ if ((NULL == socket) || (NULL == initiator))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
+ return GNUNET_OK;
+ }
GNUNET_assert (NULL != initiator);
- GNUNET_assert (socket != peer1.socket);
+ GNUNET_assert (socket != peer2.socket);
+ GNUNET_assert (0 == memcmp (initiator, &peer2.our_id,
+ sizeof (struct GNUNET_PeerIdentity)));
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%s: Peer connected: %s\n",
+ GNUNET_i2s (&peer1.our_id), GNUNET_i2s (initiator));
+ peer1.socket = socket;
+ peer1.bytes_read = 0;
+ GNUNET_SCHEDULER_add_now (&stream_read_task, &peer1);
+ return GNUNET_OK;
+}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s: Peer connected: %s\n",
- GNUNET_i2s (&peer2.our_id),
- GNUNET_i2s(initiator));
- peer2.socket = socket;
- peer2.bytes_read = 0;
- GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
- return GNUNET_OK;
+/**
+ * Listen success callback; connects a peer to stream as client
+ */
+static void stream_connect (void);
+
+
+/**
+ * Adapter function called to destroy a connection to
+ * a service.
+ *
+ * @param cls closure
+ * @param op_result service handle returned from the connect adapter
+ */
+static void
+stream_da (void *cls, void *op_result)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket;
+ struct GNUNET_STREAM_Socket *socket;
+
+ if (&peer1 == cls)
+ {
+ lsocket = op_result;
+ GNUNET_STREAM_listen_close (lsocket);
+ if (NULL != peer2.op)
+ GNUNET_TESTBED_operation_done (peer2.op);
+ else
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (&peer2 == cls)
+ {
+ socket = op_result;
+ GNUNET_STREAM_close (socket);
+ GNUNET_SCHEDULER_shutdown (); /* Exit point of the test */
+ return;
+ }
+ GNUNET_assert (0);
+}
+
+
+/**
+ * Adapter function called to establish a connection to
+ * a service.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer to connect to; will be available until
+ * GNUNET_TESTBED_operation_done() is called on the operation returned
+ * from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
+ */
+static void *
+stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket;
+
+ switch (setup_state)
+ {
+ case PEER1_STREAM_CONNECT:
+ lsocket = GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, NULL,
+ GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
+ &stream_connect, GNUNET_STREAM_OPTION_END);
+ return lsocket;
+ case PEER2_STREAM_CONNECT:
+ peer2.socket = GNUNET_STREAM_open (cfg, &peer1.our_id, 10, &stream_open_cb,
+ &peer2, GNUNET_STREAM_OPTION_END);
+ return peer2.socket;
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+/**
+ * Listen success callback; connects a peer to stream as client
+ */
+static void
+stream_connect (void)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
+ peer2.op = GNUNET_TESTBED_service_connect (&peer2, peer2.peer, "stream",
+ NULL, NULL,
+ stream_ca, stream_da, &peer2);
+ setup_state = PEER2_STREAM_CONNECT;
}
/**
- * Callback to be called when testing peer group is ready
+ * Callback to be called when the requested peer information is available
*
- * @param cls NULL
- * @param emsg NULL on success
+ * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
+ * @param op the operation this callback corresponds to
+ * @param pinfo the result; will be NULL if the operation has failed
+ * @param emsg error message if the operation has failed; will be NULL if the
+ * operation is successfull
*/
-void
-peergroup_ready (void *cls, const char *emsg)
+static void
+peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op_,
+ const struct GNUNET_TESTBED_PeerInformation *pinfo,
+ const char *emsg)
{
- if (NULL != emsg)
+ GNUNET_assert (NULL == emsg);
+ GNUNET_assert (op == op_);
+ switch (setup_state)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Starting peer group failed: %s\n", emsg);
- return;
+ case PEER1_GET_IDENTITY:
+ memcpy (&peer1.our_id, pinfo->result.id,
+ sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_TESTBED_operation_done (op);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", GNUNET_i2s
+ (&peer1.our_id));
+ op = GNUNET_TESTBED_peer_get_information (peer2.peer,
+ GNUNET_TESTBED_PIT_IDENTITY,
+ &peerinfo_cb, NULL);
+ setup_state = PEER2_GET_IDENTITY;
+ break;
+ case PEER2_GET_IDENTITY:
+ memcpy (&peer2.our_id, pinfo->result.id,
+ sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_TESTBED_operation_done (op);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", GNUNET_i2s
+ (&peer2.our_id));
+ peer1.op = GNUNET_TESTBED_service_connect (&peer1, peer1.peer, "stream",
+ NULL, NULL, stream_ca,
+ stream_da, &peer1);
+ setup_state = PEER1_STREAM_CONNECT;
+ break;
+ default:
+ GNUNET_assert (0);
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer group is now ready\n");
-
- GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
-
- d1 = GNUNET_TESTING_daemon_get (pg, 0);
- GNUNET_assert (NULL != d1);
-
- d2 = GNUNET_TESTING_daemon_get (pg, 1);
- GNUNET_assert (NULL != d2);
-
- GNUNET_TESTING_get_peer_identity (d1->cfg,
- &peer1.our_id);
- GNUNET_TESTING_get_peer_identity (d2->cfg,
- &peer2.our_id);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s : %s\n",
- GNUNET_i2s (&peer1.our_id),
- GNUNET_i2s (&d1->id));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s : %s\n",
- GNUNET_i2s (&peer2.our_id),
- GNUNET_i2s (&d2->id));
-
- peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg,
- 10, /* App port */
- &stream_listen_cb,
- NULL);
- GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (d1->cfg,
- &d2->id, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
}
/**
- * Initialize framework and start test
+ * Controller event callback
+ *
+ * @param cls NULL
+ * @param event the controller event
*/
static void
-run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
+controller_event_cb (void *cls,
+ const struct GNUNET_TESTBED_EventInformation *event)
{
- struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
-
- GNUNET_log_setup ("test_stream_2peers",
- "DEBUG",
- NULL);
+ switch (event->type)
+ {
+ case GNUNET_TESTBED_ET_OPERATION_FINISHED:
+ switch (setup_state)
+ {
+ case PEER1_STREAM_CONNECT:
+ case PEER2_STREAM_CONNECT:
+ GNUNET_assert (NULL == event->details.operation_finished.emsg);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Starting test\n");
- /* Duplicate the configuration */
- config = GNUNET_CONFIGURATION_dup (cfg);
- hosts = GNUNET_TESTING_hosts_load (config);
-
- pg = GNUNET_TESTING_peergroup_start (config,
- 2,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 3),
- NULL,
- &peergroup_ready,
- NULL,
- hosts);
- GNUNET_assert (NULL != pg);
-
+/**
+ * Signature of a main function for a testcase.
+ *
+ * @param cls closure
+ * @param num_peers number of peers in 'peers'
+ * @param peers handle to peers run in the testbed
+ */
+static void
+test_master (void *cls, unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **peers)
+{
+ GNUNET_assert (NULL != peers);
+ GNUNET_assert (NULL != peers[0]);
+ GNUNET_assert (NULL != peers[1]);
+ peer1.peer = peers[0];
+ peer2.peer = peers[1];
+ /* Get the peer identity and configuration of peers */
+ op = GNUNET_TESTBED_peer_get_information (peer1.peer,
+ GNUNET_TESTBED_PIT_IDENTITY,
+ &peerinfo_cb, NULL);
+ setup_state = PEER1_GET_IDENTITY;
abort_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
NULL);
}
+
/**
* Main function
*/
int main (int argc, char **argv)
{
- int ret;
-
- char *argv2[] = { "test-stream-2peers",
- "-L", "DEBUG",
- "-c", "test_stream_local.conf",
- NULL};
-
- struct GNUNET_GETOPT_CommandLineOption options[] = {
- GNUNET_GETOPT_OPTION_END
- };
-
- ret =
- GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
- "test-stream-2peers", "nohelp", options, &run, NULL);
-
- if (GNUNET_OK != ret)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
- ret);
- return 1;
- }
+ uint64_t event_mask;
+
+ result = GNUNET_NO;
+ event_mask = 0;
+ event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
+ (void) GNUNET_TESTBED_test_run ("test_stream_2peers",
+ "test_stream_local.conf",
+ NUM_PEERS, event_mask, &controller_event_cb,
+ NULL,
+ &test_master, NULL);
if (GNUNET_SYSERR == result)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
return 1;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
return 0;
}
diff --git a/src/stream/test_stream_2peers_halfclose.c b/src/stream/test_stream_2peers_halfclose.c
index 7997c20..95f5aa9 100644
--- a/src/stream/test_stream_2peers_halfclose.c
+++ b/src/stream/test_stream_2peers_halfclose.c
@@ -23,29 +23,31 @@
* @brief Testcases for Stream API halfclosed connections between 2 peers
* @author Sree Harsha Totakura
*/
-
-#include <string.h>
-
#include "platform.h"
#include "gnunet_util_lib.h"
+#include "gnunet_testbed_service.h"
#include "gnunet_mesh_service.h"
#include "gnunet_stream_lib.h"
-#include "gnunet_testing_lib.h"
-#include "gnunet_scheduler_lib.h"
-
-#define VERBOSE 1
/**
* Number of peers
*/
#define NUM_PEERS 2
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* Structure for holding peer's sockets and IO Handles
*/
struct PeerData
{
/**
+ * The testbed peer handle corresponding to this peer
+ */
+ struct GNUNET_TESTBED_Peer *peer;
+
+ /**
* Peer's stream socket
*/
struct GNUNET_STREAM_Socket *socket;
@@ -53,12 +55,12 @@ struct PeerData
/**
* Peer's io write handle
*/
- struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
+ struct GNUNET_STREAM_WriteHandle *io_write_handle;
/**
* Peer's io read handle
*/
- struct GNUNET_STREAM_IOReadHandle *io_read_handle;
+ struct GNUNET_STREAM_ReadHandle *io_read_handle;
/**
* Peer's shutdown handle
@@ -66,6 +68,11 @@ struct PeerData
struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
/**
+ * Testbed operation handle specific for this peer
+ */
+ struct GNUNET_TESTBED_Operation *op;
+
+ /**
* Our Peer id
*/
struct GNUNET_PeerIdentity our_id;
@@ -91,20 +98,71 @@ struct PeerData
int shutdown_operation;
};
-/**
- * The current peer group
- */
-static struct GNUNET_TESTING_PeerGroup *pg;
/**
- * Peer 1 daemon
+ * Enumeration for various tests that are to be passed in the same order as
+ * below
*/
-static struct GNUNET_TESTING_Daemon *d1;
+enum Test
+{
+ /**
+ * Peer1 writing; Peer2 reading
+ */
+ PEER1_WRITE,
+
+ /**
+ * Peer1 write shutdown; Peer2 should get an error when it tries to read;
+ */
+ PEER1_WRITE_SHUTDOWN,
+
+ /**
+ * Peer1 reads; Peer2 writes (connection is halfclosed)
+ */
+ PEER1_HALFCLOSE_READ,
+
+ /**
+ * Peer1 attempts to write; Should fail with stream already shutdown error
+ */
+ PEER1_HALFCLOSE_WRITE_FAIL,
+
+ /**
+ * Peer1 read shutdown; Peer2 should get stream shutdown error during write
+ */
+ PEER1_READ_SHUTDOWN,
+
+ /**
+ * All tests successfully finished
+ */
+ SUCCESS
+};
+
/**
- * Peer 2 daemon
+ * Different states in test setup
*/
-static struct GNUNET_TESTING_Daemon *d2;
+enum SetupState
+{
+ /**
+ * Get the identity of peer 1
+ */
+ PEER1_GET_IDENTITY,
+
+ /**
+ * Get the identity of peer 2
+ */
+ PEER2_GET_IDENTITY,
+
+ /**
+ * Connect to stream service of peer 2
+ */
+ PEER2_STREAM_CONNECT,
+
+ /**
+ * Connect to stream service of peer 1
+ */
+ PEER1_STREAM_CONNECT
+
+};
/**
@@ -116,51 +174,28 @@ static struct GNUNET_TESTING_Daemon *d2;
*/
static struct PeerData peer1;
static struct PeerData peer2;
-static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
-static struct GNUNET_CONFIGURATION_Handle *config;
+/**
+ * Task for aborting the test case if it takes too long
+ */
static GNUNET_SCHEDULER_TaskIdentifier abort_task;
+
+/**
+ * Task for reading from stream
+ */
static GNUNET_SCHEDULER_TaskIdentifier read_task;
static char *data = "ABCD";
-static int result;
/**
- * Enumeration for various tests that are to be passed in the same order as
- * below
+ * Handle to testbed operation
*/
-enum Test
- {
- /**
- * Peer1 writing; Peer2 reading
- */
- PEER1_WRITE,
-
- /**
- * Peer1 write shutdown; Peer2 should get an error when it tries to read;
- */
- PEER1_WRITE_SHUTDOWN,
-
- /**
- * Peer1 reads; Peer2 writes (connection is halfclosed)
- */
- PEER1_HALFCLOSE_READ,
-
- /**
- * Peer1 attempts to write; Should fail with stream already shutdown error
- */
- PEER1_HALFCLOSE_WRITE_FAIL,
-
- /**
- * Peer1 read shutdown; Peer2 should get stream shutdown error during write
- */
- PEER1_READ_SHUTDOWN,
-
- /**
- * All tests successfully finished
- */
- SUCCESS
- };
+struct GNUNET_TESTBED_Operation *op;
+
+/**
+ * Final testing result
+ */
+static int result;
/**
* Current running test
@@ -168,6 +203,12 @@ enum Test
enum Test current_test;
/**
+ * State is test setup
+ */
+enum SetupState setup_state;
+
+
+/**
* Input processor
*
* @param cls the closure from GNUNET_STREAM_write/read
@@ -213,6 +254,7 @@ stream_read_task (void *cls,
case PEER1_WRITE_SHUTDOWN:
GNUNET_assert (&peer2 == peer);
GNUNET_assert (NULL == peer->io_read_handle);
+ peer2.test_ok = GNUNET_YES;
transition (); /* to PEER1_HALFCLOSE_READ */
break;
default:
@@ -274,51 +316,19 @@ stream_write_task (void *cls,
/**
- * Check whether peers successfully shut down.
- */
-static void
-peergroup_shutdown_callback (void *cls, const char *emsg)
-{
- if (emsg != NULL)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Shutdown of peers failed!\n");
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "All peers successfully shut down!\n");
- }
- GNUNET_CONFIGURATION_destroy (config);
-}
-
-
-/**
* Close sockets and stop testing deamons nicely
*/
static void
do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- if (NULL != peer1.socket)
- GNUNET_STREAM_close (peer1.socket);
if (NULL != peer2.socket)
GNUNET_STREAM_close (peer2.socket);
- if (NULL != peer2_listen_socket)
- GNUNET_STREAM_listen_close (peer2_listen_socket);
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
- if (0 != abort_task)
- {
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
GNUNET_SCHEDULER_cancel (abort_task);
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
-
- GNUNET_TESTING_daemons_stop (pg,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
- &peergroup_shutdown_callback,
- NULL);
+ if (NULL != peer2.op)
+ GNUNET_TESTBED_operation_done (peer2.op);
+ else
+ GNUNET_SCHEDULER_shutdown (); /* For shutting down testbed */
}
@@ -600,8 +610,8 @@ input_processor (void *cls,
}
break;
case PEER1_WRITE_SHUTDOWN:
- GNUNET_assert (GNUNET_STREAM_SHUTDOWN == status);
- peer2.test_ok = GNUNET_YES;
+ GNUNET_assert (0); /* This callback will not be called when stream
+ is shutdown */
break;
case PEER1_HALFCLOSE_WRITE_FAIL:
case PEER1_READ_SHUTDOWN:
@@ -642,15 +652,19 @@ stream_listen_cb (void *cls,
struct GNUNET_STREAM_Socket *socket,
const struct GNUNET_PeerIdentity *initiator)
{
- GNUNET_assert (NULL != socket);
- GNUNET_assert (NULL != initiator);
+ if ((NULL == socket) || (NULL == initiator))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
+ return GNUNET_OK;
+ }
GNUNET_assert (socket != peer1.socket);
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"%s: Peer connected: %s\n",
GNUNET_i2s (&peer2.our_id),
GNUNET_i2s(initiator));
-
peer2.socket = socket;
/* FIXME: reading should be done right now instead of a scheduled call */
read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
@@ -659,128 +673,212 @@ stream_listen_cb (void *cls,
/**
- * Callback to be called when testing peer group is ready
- *
- * @param cls NULL
- * @param emsg NULL on success
+ * Listen success callback; connects a peer to stream as client
+ */
+static void
+stream_connect (void);
+
+
+/**
+ * Adapter function called to destroy a connection to
+ * a service.
+ *
+ * @param cls closure
+ * @param op_result service handle returned from the connect adapter
*/
-void
-peergroup_ready (void *cls, const char *emsg)
+static void
+stream_da (void *cls, void *op_result)
{
- if (NULL != emsg)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Starting peer group failed: %s\n", emsg);
- return;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer group is now ready\n");
-
- GNUNET_assert (2 == GNUNET_TESTING_daemons_running (pg));
-
- d1 = GNUNET_TESTING_daemon_get (pg, 0);
- GNUNET_assert (NULL != d1);
-
- d2 = GNUNET_TESTING_daemon_get (pg, 1);
- GNUNET_assert (NULL != d2);
+ struct GNUNET_STREAM_ListenSocket *lsocket;
- GNUNET_TESTING_get_peer_identity (d1->cfg,
- &peer1.our_id);
- GNUNET_TESTING_get_peer_identity (d2->cfg,
- &peer2.our_id);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s : %s\n",
- GNUNET_i2s (&peer1.our_id),
- GNUNET_i2s (&d1->id));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "%s : %s\n",
- GNUNET_i2s (&peer2.our_id),
- GNUNET_i2s (&d2->id));
-
- peer2_listen_socket = GNUNET_STREAM_listen (d2->cfg,
- 10, /* App port */
- &stream_listen_cb,
- NULL);
- GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (d1->cfg,
- &d2->id, /* Null for local peer? */
- 10, /* App port */
- &stream_open_cb,
- &peer1,
- GNUNET_STREAM_OPTION_END);
- GNUNET_assert (NULL != peer1.socket);
+ if (&peer2 == cls)
+ {
+ lsocket = op_result;
+ GNUNET_STREAM_listen_close (lsocket);
+ if (NULL != peer1.op)
+ GNUNET_TESTBED_operation_done (peer1.op);
+ else
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (&peer1 == cls)
+ {
+ GNUNET_assert (op_result == peer1.socket);
+ GNUNET_STREAM_close (peer1.socket);
+ GNUNET_SCHEDULER_shutdown (); /* Exit point of the test */
+ return;
+ }
+ GNUNET_assert (0);
}
/**
- * Initialize framework and start test
+ * Adapter function called to establish a connection to
+ * a service.
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer to connect to; will be available until
+ * GNUNET_TESTBED_operation_done() is called on the operation returned
+ * from GNUNET_TESTBED_service_connect()
+ * @return service handle to return in 'op_result', NULL on error
+ */
+static void *
+stream_ca (void *cls, const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket;
+
+ switch (setup_state)
+ {
+ case PEER2_STREAM_CONNECT:
+ lsocket = GNUNET_STREAM_listen (cfg, 10, &stream_listen_cb, NULL,
+ GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
+ &stream_connect, GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != lsocket);
+ return lsocket;
+ case PEER1_STREAM_CONNECT:
+ peer1.socket = GNUNET_STREAM_open (cfg, &peer2.our_id, 10, &stream_open_cb,
+ &peer1, GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+ return peer1.socket;
+ default:
+ GNUNET_assert (0);
+ }
+}
+
+
+/**
+ * Listen success callback; connects a peer to stream as client
*/
static void
-run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
+stream_connect (void)
+{
+ GNUNET_assert (PEER2_STREAM_CONNECT == setup_state);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream listen open successful\n");
+ peer1.op = GNUNET_TESTBED_service_connect (&peer1, peer1.peer, "stream",
+ NULL, NULL,
+ stream_ca, stream_da, &peer1);
+ setup_state = PEER1_STREAM_CONNECT;
+}
+
+
+/**
+ * Callback to be called when the requested peer information is available
+ *
+ * @param cb_cls the closure from GNUNET_TETSBED_peer_get_information()
+ * @param op the operation this callback corresponds to
+ * @param pinfo the result; will be NULL if the operation has failed
+ * @param emsg error message if the operation has failed; will be NULL if the
+ * operation is successfull
+ */
+static void
+peerinfo_cb (void *cb_cls, struct GNUNET_TESTBED_Operation *op_,
+ const struct GNUNET_TESTBED_PeerInformation *pinfo,
+ const char *emsg)
{
- struct GNUNET_TESTING_Host *hosts; /* FIXME: free hosts (DLL) */
+ GNUNET_assert (NULL == emsg);
+ GNUNET_assert (op == op_);
+ switch (setup_state)
+ {
+ case PEER1_GET_IDENTITY:
+ memcpy (&peer1.our_id, pinfo->result.id,
+ sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_TESTBED_operation_done (op);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 1 id: %s\n", GNUNET_i2s
+ (&peer1.our_id));
+ op = GNUNET_TESTBED_peer_get_information (peer2.peer,
+ GNUNET_TESTBED_PIT_IDENTITY,
+ &peerinfo_cb, NULL);
+ setup_state = PEER2_GET_IDENTITY;
+ break;
+ case PEER2_GET_IDENTITY:
+ memcpy (&peer2.our_id, pinfo->result.id,
+ sizeof (struct GNUNET_PeerIdentity));
+ GNUNET_TESTBED_operation_done (op);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer 2 id: %s\n", GNUNET_i2s
+ (&peer2.our_id));
+ peer2.op = GNUNET_TESTBED_service_connect (&peer2, peer2.peer, "stream",
+ NULL, NULL,
+ stream_ca, stream_da, &peer2);
+ setup_state = PEER2_STREAM_CONNECT;
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+}
- /* GNUNET_log_setup ("test_stream_local", */
- /* "DEBUG", */
- /* NULL); */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Starting test\n");
- /* Duplicate the configuration */
- config = GNUNET_CONFIGURATION_dup (cfg);
+/**
+ * Controller event callback
+ *
+ * @param cls NULL
+ * @param event the controller event
+ */
+static void
+controller_event_cb (void *cls,
+ const struct GNUNET_TESTBED_EventInformation *event)
+{
+ switch (event->type)
+ {
+ case GNUNET_TESTBED_ET_OPERATION_FINISHED:
+ switch (setup_state)
+ {
+ case PEER1_STREAM_CONNECT:
+ case PEER2_STREAM_CONNECT:
+ GNUNET_assert (NULL == event->details.operation_finished.emsg);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+}
- hosts = GNUNET_TESTING_hosts_load (config);
-
- pg = GNUNET_TESTING_peergroup_start (config,
- 2,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 3),
- NULL,
- &peergroup_ready,
- NULL,
- hosts);
- GNUNET_assert (NULL != pg);
-
+
+/**
+ * Signature of a main function for a testcase.
+ *
+ * @param cls closure
+ * @param num_peers number of peers in 'peers'
+ * @param peers handle to peers run in the testbed
+ */
+static void
+test_master (void *cls, unsigned int num_peers,
+ struct GNUNET_TESTBED_Peer **peers)
+{
+ GNUNET_assert (NULL != peers);
+ GNUNET_assert (NULL != peers[0]);
+ GNUNET_assert (NULL != peers[1]);
+ peer1.peer = peers[0];
+ peer2.peer = peers[1];
+ op = GNUNET_TESTBED_peer_get_information (peer1.peer,
+ GNUNET_TESTBED_PIT_IDENTITY,
+ &peerinfo_cb, NULL);
+ setup_state = PEER1_GET_IDENTITY;
abort_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 40), &do_abort,
+ (GNUNET_TIME_UNIT_SECONDS, 1000), &do_abort,
NULL);
}
+
/**
* Main function
*/
int main (int argc, char **argv)
{
- int ret;
-
- char *argv2[] = { "test-stream-2peers-halfclose",
- "-L", "DEBUG",
- "-c", "test_stream_local.conf",
- NULL};
-
- struct GNUNET_GETOPT_CommandLineOption options[] = {
- GNUNET_GETOPT_OPTION_END
- };
-
- ret =
- GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
- "test-stream-2peers-halfclose", "nohelp", options, &run, NULL);
-
- if (GNUNET_OK != ret)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
- ret);
- return 1;
- }
+ uint64_t event_mask;
+
+ result = GNUNET_NO;
+ event_mask = 0;
+ event_mask |= (1LL << GNUNET_TESTBED_ET_OPERATION_FINISHED);
+ (void) GNUNET_TESTBED_test_run ("test_stream_2peers_halfclose",
+ "test_stream_local.conf", NUM_PEERS,
+ event_mask,
+ &controller_event_cb, NULL, &test_master,
+ NULL);
if (GNUNET_SYSERR == result)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
return 1;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
return 0;
}
diff --git a/src/stream/test_stream_big.c b/src/stream/test_stream_big.c
new file mode 100644
index 0000000..06d2a49
--- /dev/null
+++ b/src/stream/test_stream_big.c
@@ -0,0 +1,429 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file stream/test_stream_big.c
+ * @brief large data transfer using stream API between local peers
+ * @author Sree Harsha Totakura
+ */
+
+#include <string.h>
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
+
+#define LOG(kind, ...) \
+ GNUNET_log (kind, __VA_ARGS__);
+
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
+
+/**
+ * Structure for holding peer's sockets and IO Handles
+ */
+struct PeerData
+{
+ /**
+ * Peer's stream socket
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ struct GNUNET_PeerIdentity self;
+
+ /**
+ * Peer's io write handle
+ */
+ struct GNUNET_STREAM_WriteHandle *io_write_handle;
+
+ /**
+ * Peer's io read handle
+ */
+ struct GNUNET_STREAM_ReadHandle *io_read_handle;
+
+ /**
+ * Peer's shutdown handle
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ /**
+ * Bytes the peer has written
+ */
+ unsigned int bytes_wrote;
+
+ /**
+ * Byte the peer has read
+ */
+ unsigned int bytes_read;
+};
+
+static struct PeerData peer1;
+static struct PeerData peer2;
+static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static const struct GNUNET_CONFIGURATION_Handle *config;
+
+static GNUNET_SCHEDULER_TaskIdentifier abort_task;
+static GNUNET_SCHEDULER_TaskIdentifier read_task;
+static GNUNET_SCHEDULER_TaskIdentifier write_task;
+
+#define DATA_SIZE 65536 /* 64KB */
+static uint32_t data[DATA_SIZE / 4]; /* 64KB array */
+static int result;
+
+/**
+ * Shutdown nicely
+ */
+static void
+do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ if (NULL != peer1.socket)
+ GNUNET_STREAM_close (peer1.socket);
+ if (NULL != peer2.socket)
+ GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2_listen_socket)
+ GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
+}
+
+
+/**
+ * Something went wrong and timed out. Kill everything and set error flag
+ */
+static void
+do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
+ if (GNUNET_SCHEDULER_NO_TASK != read_task)
+ GNUNET_SCHEDULER_cancel (read_task);
+ result = GNUNET_SYSERR;
+ abort_task = GNUNET_SCHEDULER_NO_TASK;
+ do_close (cls, tc);
+}
+
+
+/**
+ * Completion callback for shutdown
+ *
+ * @param cls the closure from GNUNET_STREAM_shutdown call
+ * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
+ * SHUT_RDWR)
+ */
+static void
+shutdown_completion (void *cls,
+ int operation)
+{
+ static int shutdowns;
+
+ if (++shutdowns == 1)
+ {
+ peer1.shutdown_handle = NULL;
+ peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
+ &shutdown_completion, cls);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
+ GNUNET_SCHEDULER_add_now (&do_close, cls);
+}
+
+
+/**
+ * Shutdown sockets gracefully
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ result = GNUNET_OK;
+ peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
+ &shutdown_completion, cls);
+}
+
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void
+write_completion (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size)
+{
+ struct PeerData *peer;
+
+ peer = (struct PeerData *) cls;
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (size <= DATA_SIZE);
+ peer->bytes_wrote += size;
+
+ if (peer->bytes_wrote < DATA_SIZE) /* Have more data to send */
+ {
+ peer->io_write_handle =
+ GNUNET_STREAM_write (peer->socket,
+ ((void *) data) + peer->bytes_wrote,
+ sizeof (data) - peer->bytes_wrote,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
+ GNUNET_assert (NULL != peer->io_write_handle);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing successfully finished\n");
+ result = GNUNET_OK;
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ }
+}
+
+
+/**
+ * Task for calling STREAM_write with a chunk of random data
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_write_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer=cls;
+ unsigned int count;
+
+ write_task = GNUNET_SCHEDULER_NO_TASK;
+ for (count=0; count < DATA_SIZE / 4; count++)
+ {
+ data[count]=GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Generation of random data complete\n");
+ peer->io_write_handle = GNUNET_STREAM_write (peer->socket,
+ data,
+ sizeof (data),
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 10),
+ &write_completion,
+ peer);
+ GNUNET_assert (NULL != peer->io_write_handle);
+}
+
+
+/**
+ * Function executed after stream has been established
+ *
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
+ */
+static void
+stream_open_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct PeerData *peer;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
+ peer = (struct PeerData *) cls;
+ peer->bytes_wrote = 0;
+ GNUNET_assert (socket == peer1.socket);
+ GNUNET_assert (socket == peer->socket);
+ write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Input processor
+ *
+ * @param cls peer2
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size)
+{
+ struct PeerData *peer = cls;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (&peer2 == peer);
+ GNUNET_assert (size < DATA_SIZE);
+ GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read,
+ input_data, size));
+ peer->bytes_read += size;
+
+ if (peer->bytes_read < DATA_SIZE)
+ {
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+ /* peer->io_read_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) */
+ /* peer->socket, */
+ /* GNUNET_TIME_relative_multiply */
+ /* (GNUNET_TIME_UNIT_SECONDS, 5), */
+ /* &input_processor, */
+ /* cls); */
+ /* GNUNET_assert (NULL != peer->io_read_handle); */
+ }
+ else
+ {
+ /* Peer2 has completed reading*/
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
+ }
+ return size;
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ read_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (&peer2 == peer);
+ peer->io_read_handle =
+ GNUNET_STREAM_read (peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 10),
+ &input_processor,
+ peer);
+ GNUNET_assert (NULL != peer->io_read_handle);
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other peers
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+stream_listen_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ if ((NULL == socket) || (NULL == initiator))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
+ return GNUNET_OK;
+ }
+ GNUNET_assert (NULL != socket);
+ GNUNET_assert (socket != peer1.socket);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer connected: %s\n", GNUNET_i2s(initiator));
+
+ peer2.socket = socket;
+ peer2.bytes_read = 0;
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Listen success callback; connects a peer to stream as client
+ */
+static void
+stream_connect (void)
+{
+ struct PeerData *peer = &peer1;
+
+ /* Connect to stream */
+ peer->socket = GNUNET_STREAM_open (config,
+ &peer2.self, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb, &peer1,
+ GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, 500,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer->socket);
+}
+
+
+/**
+ * Initialize framework and start test
+ *
+ * @param cls closure
+ * @param cfg configuration of the peer that was started
+ * @param peer identity of the peer that was created
+ */
+static void
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
+{
+ struct GNUNET_PeerIdentity self;
+
+ GNUNET_TESTING_peer_get_identity (peer, &self);
+ config = cfg;
+ peer2_listen_socket =
+ GNUNET_STREAM_listen (config,
+ 10, /* App port */
+ &stream_listen_cb,
+ NULL,
+ GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
+ &stream_connect,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer2_listen_socket);
+ peer1.self = self;
+ peer2.self = self;
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
+ NULL);
+}
+
+/**
+ * Main function
+ */
+int main (int argc, char **argv)
+{
+ if (0 != GNUNET_TESTING_peer_run ("test_stream_big",
+ "test_stream_local.conf",
+ &run, NULL))
+ return 1;
+ return (GNUNET_SYSERR == result) ? 1 : 0;
+}
+
+/* end of test_stream_big.c */
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c
index c9fab84..f3c8aae 100644
--- a/src/stream/test_stream_local.c
+++ b/src/stream/test_stream_local.c
@@ -32,7 +32,12 @@
#include "gnunet_stream_lib.h"
#include "gnunet_testing_lib.h"
-#define VERBOSE 1
+/**
+ * Relative seconds shorthand
+ */
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
/**
* Structure for holding peer's sockets and IO Handles
@@ -47,12 +52,17 @@ struct PeerData
/**
* Peer's io write handle
*/
- struct GNUNET_STREAM_IOWriteHandle *io_write_handle;
+ struct GNUNET_STREAM_WriteHandle *io_write_handle;
/**
* Peer's io read handle
*/
- struct GNUNET_STREAM_IOReadHandle *io_read_handle;
+ struct GNUNET_STREAM_ReadHandle *io_read_handle;
+
+ /**
+ * Peer's shutdown handle
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
/**
* Bytes the peer has written
@@ -65,15 +75,14 @@ struct PeerData
unsigned int bytes_read;
};
-static struct GNUNET_OS_Process *arm_pid;
static struct PeerData peer1;
static struct PeerData peer2;
static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
-static struct GNUNET_CONFIGURATION_Handle *config_peer1;
-static struct GNUNET_CONFIGURATION_Handle *config_peer2;
+static const struct GNUNET_CONFIGURATION_Handle *config;
+static struct GNUNET_TESTING_Peer *self;
+static struct GNUNET_PeerIdentity self_id;
static GNUNET_SCHEDULER_TaskIdentifier abort_task;
-static GNUNET_SCHEDULER_TaskIdentifier test_task;
static char *data = "ABCD";
static int result;
@@ -81,6 +90,7 @@ static int result;
static int writing_success;
static int reading_success;
+
/**
* Input processor
*
@@ -117,6 +127,7 @@ stream_read_task (void *cls,
GNUNET_assert (NULL != peer->io_read_handle);
}
+
/**
* The write completion function; called upon writing some data to stream or
* upon error
@@ -155,33 +166,21 @@ stream_write_task (void *cls,
GNUNET_assert (NULL != peer->io_write_handle);
}
+
/**
* Shutdown nicely
*/
static void
-do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- GNUNET_STREAM_close (peer1.socket);
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ if (NULL != peer1.socket)
+ GNUNET_STREAM_close (peer1.socket);
if (NULL != peer2.socket)
GNUNET_STREAM_close (peer2.socket);
if (NULL != peer2_listen_socket)
GNUNET_STREAM_listen_close (peer2_listen_socket);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
- if (0 != abort_task)
- {
- GNUNET_SCHEDULER_cancel (abort_task);
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: arm\n");
- if (0 != GNUNET_OS_process_kill (arm_pid, SIGTERM))
- {
- GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
- }
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
- /* Free the duplicated configuration */
- GNUNET_CONFIGURATION_destroy (config_peer1);
- GNUNET_CONFIGURATION_destroy (config_peer2);
- GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid));
- GNUNET_OS_process_destroy (arm_pid);
}
@@ -192,14 +191,46 @@ static void
do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
- if (0 != test_task)
+ result = GNUNET_SYSERR;
+ abort_task = GNUNET_SCHEDULER_NO_TASK;
+ do_close (cls, tc);
+}
+
+
+/**
+ * Completion callback for shutdown
+ *
+ * @param cls the closure from GNUNET_STREAM_shutdown call
+ * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
+ * SHUT_RDWR)
+ */
+static void
+shutdown_completion (void *cls,
+ int operation)
+{
+ static int shutdowns;
+
+ if (++shutdowns == 1)
{
- GNUNET_SCHEDULER_cancel (test_task);
- }
+ peer1.shutdown_handle = NULL;
+ peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
+ &shutdown_completion, cls);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
+ GNUNET_SCHEDULER_add_now (&do_close, cls);
+}
- result = GNUNET_SYSERR;
- abort_task = 0;
- do_shutdown (cls, tc);
+
+/**
+ * Shutdown sockets gracefully
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ result = GNUNET_OK;
+ peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
+ &shutdown_completion, cls);
}
@@ -221,7 +252,6 @@ write_completion (void *cls,
GNUNET_assert (GNUNET_STREAM_OK == status);
GNUNET_assert (size <= strlen (data));
peer->bytes_wrote += size;
-
if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
{
GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
@@ -230,8 +260,7 @@ write_completion (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Writing completed\n");
-
- if (&peer1 == peer) /* Peer1 has finished writing; should read now */
+ if (&peer1 == peer) /* Peer1 has finished writing; should read now */
{
peer->bytes_read = 0;
GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
@@ -261,7 +290,6 @@ stream_open_cb (void *cls,
GNUNET_assert (&peer1 == peer);
GNUNET_assert (socket == peer1.socket);
GNUNET_assert (socket == peer->socket);
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
peer->bytes_wrote = 0;
GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
@@ -291,8 +319,7 @@ input_processor (void *cls,
GNUNET_assert (0 == strncmp ((const char *) data + peer->bytes_read,
(const char *) input_data,
size));
- peer->bytes_read += size;
-
+ peer->bytes_read += size;
if (peer->bytes_read < strlen (data))
{
GNUNET_SCHEDULER_add_now (&stream_read_task, peer);
@@ -331,22 +358,23 @@ stream_listen_cb (void *cls,
const struct GNUNET_PeerIdentity *initiator)
{
struct PeerData *peer=cls;
- struct GNUNET_PeerIdentity self;
+ if ((NULL == socket) || (NULL == initiator))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
+ return GNUNET_OK;
+ }
GNUNET_assert (NULL != socket);
GNUNET_assert (socket != peer1.socket);
GNUNET_assert (&peer2 == peer);
-
- /* Get our identity */
- GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1,
- &self));
- GNUNET_assert (0 == memcmp (&self,
+ GNUNET_assert (0 == memcmp (&self_id,
initiator,
sizeof (struct GNUNET_PeerIdentity)));
-
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peer connected: %s\n", GNUNET_i2s(initiator));
-
peer->socket = socket;
peer->bytes_read = 0;
GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
@@ -355,30 +383,13 @@ stream_listen_cb (void *cls,
/**
- * Testing function
- *
- * @param cls NULL
- * @param tc the task context
+ * Listen success callback; connects a peer to stream as client
*/
static void
-test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+stream_connect (void)
{
- struct GNUNET_PeerIdentity self;
-
- test_task = GNUNET_SCHEDULER_NO_TASK;
- /* Get our identity */
- GNUNET_assert (GNUNET_OK == GNUNET_TESTING_get_peer_identity (config_peer1,
- &self));
-
- peer2_listen_socket = GNUNET_STREAM_listen (config_peer2,
- 10, /* App port */
- &stream_listen_cb,
- &peer2);
- GNUNET_assert (NULL != peer2_listen_socket);
-
- /* Connect to stream library */
- peer1.socket = GNUNET_STREAM_open (config_peer1,
- &self, /* Null for local peer? */
+ peer1.socket = GNUNET_STREAM_open (config,
+ &self_id,
10, /* App port */
&stream_open_cb,
&peer1,
@@ -386,37 +397,31 @@ test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_assert (NULL != peer1.socket);
}
+
/**
* Initialize framework and start test
*/
static void
-run (void *cls, char *const *args, const char *cfgfile,
- const struct GNUNET_CONFIGURATION_Handle *cfg)
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
{
- GNUNET_log_setup ("test_stream_local",
-#if VERBOSE
- "DEBUG",
-#else
- "WARNING",
-#endif
- NULL);
- /* Duplicate the configuration */
- config_peer1 = GNUNET_CONFIGURATION_dup (cfg);
- config_peer2 = GNUNET_CONFIGURATION_dup (cfg);
- arm_pid =
- GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
- "gnunet-service-arm",
-#if VERBOSE_ARM
- "-L", "DEBUG",
-#endif
- "-c", "test_stream_local.conf", NULL);
-
- abort_task =
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 60), &do_abort,
- NULL);
-
- test_task = GNUNET_SCHEDULER_add_now (&test, NULL);
+ config = cfg;
+ self = peer;
+ GNUNET_TESTING_peer_get_identity (peer, &self_id);
+ peer2_listen_socket =
+ GNUNET_STREAM_listen (config,
+ 10, /* App port */
+ &stream_listen_cb,
+ &peer2,
+ GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
+ &stream_connect,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer2_listen_socket);
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 30), &do_abort,
+ NULL);
}
/**
@@ -424,35 +429,11 @@ run (void *cls, char *const *args, const char *cfgfile,
*/
int main (int argc, char **argv)
{
- int ret;
-
- char *const argv2[] = { "test-stream-local",
- "-c", "test_stream_local.conf",
-#if VERBOSE
- "-L", "DEBUG",
-#endif
- NULL
- };
-
- struct GNUNET_GETOPT_CommandLineOption options[] = {
- GNUNET_GETOPT_OPTION_END
- };
-
- ret =
- GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
- "test-stream-local", "nohelp", options, &run, NULL);
-
- if (GNUNET_OK != ret)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
- ret);
+ if (0 != GNUNET_TESTING_peer_run ("test_stream_local",
+ "test_stream_local.conf",
+ &run, NULL))
return 1;
- }
- if (GNUNET_SYSERR == result)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
- return 1;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO, "test ok\n");
- return 0;
+ return (GNUNET_SYSERR == result) ? 1 : 0;
}
+
+/* end of test_stream_local.c */
diff --git a/src/stream/test_stream_local.conf b/src/stream/test_stream_local.conf
index 884ecbc..35da4b4 100644
--- a/src/stream/test_stream_local.conf
+++ b/src/stream/test_stream_local.conf
@@ -1,3 +1,14 @@
+[lockmanager]
+AUTOSTART = NO
+ACCEPT_FROM = 127.0.0.1;
+HOSTNAME = localhost
+PORT = 12101
+
+[statistics]
+AUTOSTART = YES
+ACCEPT_FROM = 127.0.0.1;
+PORT = 12102
+
[fs]
AUTOSTART = NO
@@ -5,7 +16,6 @@ AUTOSTART = NO
AUTOSTART = NO
[mesh]
-DEBUG = YES
AUTOSTART = YES
ACCEPT_FROM = 127.0.0.1;
HOSTNAME = localhost
@@ -14,7 +24,6 @@ PORT = 10700
# PREFIX = xterm -geometry 100x85 -T peer1 -e gdb --args
[dht]
-DEBUG = NO
AUTOSTART = YES
ACCEPT_FROM6 = ::1;
ACCEPT_FROM = 127.0.0.1;
@@ -30,7 +39,6 @@ DATABASE = sqlite
[transport]
PLUGINS = tcp
-DEBUG = NO
ACCEPT_FROM6 = ::1;
ACCEPT_FROM = 127.0.0.1;
NEIGHBOUR_LIMIT = 50
@@ -44,28 +52,23 @@ WAN_QUOTA_IN = 3932160
PORT = 12092
[arm]
-DEFAULTSERVICES = core
+DEFAULTSERVICES = core lockmanager
PORT = 12366
-DEBUG = NO
[transport-tcp]
TIMEOUT = 300 s
PORT = 12368
[TESTING]
-NUM_PEERS = 5
WEAKRANDOM = YES
-DEBUG = YES
-HOSTKEYSFILE = ../../contrib/testing_hostkeys.dat
-MAX_CONCURRENT_SSH = 10
-USE_PROGRESSBARS = YES
-PEERGROUP_TIMEOUT = 2400 s
+
+[testbed]
+OVERLAY_TOPOLOGY = LINE
[gnunetd]
HOSTKEY = $SERVICEHOME/.hostkey
[PATHS]
-DEFAULTCONFIG = test_stream_local.conf
SERVICEHOME = /tmp/test-stream/
[dns]
@@ -73,3 +76,13 @@ AUTOSTART = NO
[nse]
AUTOSTART = NO
+
+[vpn]
+AUTOSTART = NO
+
+[nat]
+RETURN_LOCAL_ADDRESSES = YES
+
+[consensus]
+AUTOSTART = NO
+
diff --git a/src/stream/test_stream_sequence_wraparound.c b/src/stream/test_stream_sequence_wraparound.c
new file mode 100644
index 0000000..bbe9f1a
--- /dev/null
+++ b/src/stream/test_stream_sequence_wraparound.c
@@ -0,0 +1,425 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file stream/test_stream_sequence_wraparound.c
+ * @brief test cases for sequence wrap around situations during data transfer
+ * @author Sree Harsha Totakura
+ */
+
+#include <string.h>
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_stream_lib.h"
+#include "gnunet_testing_lib.h"
+
+/**
+ * Generic logging shorthand
+ */
+#define LOG(kind, ...) \
+ GNUNET_log (kind, __VA_ARGS__);
+
+/**
+ * Relative seconds shorthand
+ */
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
+/**
+ * Structure for holding peer's sockets and IO Handles
+ */
+struct PeerData
+{
+ /**
+ * Peer's stream socket
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Peer's io write handle
+ */
+ struct GNUNET_STREAM_WriteHandle *io_write_handle;
+
+ /**
+ * Peer's io read handle
+ */
+ struct GNUNET_STREAM_ReadHandle *io_read_handle;
+
+ /**
+ * Peer's shutdown handle
+ */
+ struct GNUNET_STREAM_ShutdownHandle *shutdown_handle;
+
+ /**
+ * Bytes the peer has written
+ */
+ unsigned int bytes_wrote;
+
+ /**
+ * Byte the peer has read
+ */
+ unsigned int bytes_read;
+};
+
+static struct PeerData peer1;
+static struct PeerData peer2;
+static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+static const struct GNUNET_CONFIGURATION_Handle *config;
+static struct GNUNET_TESTING_Peer *self;
+static struct GNUNET_PeerIdentity self_id;
+
+static GNUNET_SCHEDULER_TaskIdentifier abort_task;
+static GNUNET_SCHEDULER_TaskIdentifier read_task;
+static GNUNET_SCHEDULER_TaskIdentifier write_task;
+
+#define DATA_SIZE 65536 /* 64KB */
+static uint32_t data[DATA_SIZE / 4]; /* 64KB array */
+static int result;
+
+/**
+ * Shutdown nicely
+ */
+static void
+do_close (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ if (NULL != peer1.socket)
+ GNUNET_STREAM_close (peer1.socket);
+ if (NULL != peer2.socket)
+ GNUNET_STREAM_close (peer2.socket);
+ if (NULL != peer2_listen_socket)
+ GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
+}
+
+
+/**
+ * Something went wrong and timed out. Kill everything and set error flag
+ */
+static void
+do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
+ if (GNUNET_SCHEDULER_NO_TASK != read_task)
+ {
+ GNUNET_SCHEDULER_cancel (read_task);
+ }
+ result = GNUNET_SYSERR;
+ abort_task = GNUNET_SCHEDULER_NO_TASK;
+ do_close (cls, tc);
+}
+
+
+/**
+ * Completion callback for shutdown
+ *
+ * @param cls the closure from GNUNET_STREAM_shutdown call
+ * @param operation the operation that was shutdown (SHUT_RD, SHUT_WR,
+ * SHUT_RDWR)
+ */
+static void
+shutdown_completion (void *cls,
+ int operation)
+{
+ static int shutdowns;
+
+ if (++shutdowns == 1)
+ {
+ peer1.shutdown_handle = NULL;
+ peer2.shutdown_handle = GNUNET_STREAM_shutdown (peer2.socket, SHUT_RDWR,
+ &shutdown_completion, cls);
+ return;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "STREAM shutdown successful\n");
+ GNUNET_SCHEDULER_add_now (&do_close, cls);
+}
+
+
+/**
+ * Shutdown sockets gracefully
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ result = GNUNET_OK;
+ peer1.shutdown_handle = GNUNET_STREAM_shutdown (peer1.socket, SHUT_RDWR,
+ &shutdown_completion, cls);
+}
+
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void
+write_completion (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size)
+{
+ struct PeerData *peer;
+
+ peer = (struct PeerData *) cls;
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (size <= DATA_SIZE);
+ peer->bytes_wrote += size;
+
+ if (peer->bytes_wrote < DATA_SIZE) /* Have more data to send */
+ {
+ peer->io_write_handle =
+ GNUNET_STREAM_write (peer->socket,
+ ((void *) data) + peer->bytes_wrote,
+ DATA_SIZE - peer->bytes_wrote,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
+ GNUNET_assert (NULL != peer->io_write_handle);
+ }
+ else
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Writing successfully finished\n");
+ result = GNUNET_OK;
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ }
+}
+
+
+/**
+ * Task for calling STREAM_write with a chunk of random data
+ *
+ * @param cls the peer data entity
+ * @param tc the task context
+ */
+static void
+stream_write_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer=cls;
+ unsigned int count;
+
+ write_task = GNUNET_SCHEDULER_NO_TASK;
+ for (count=0; count < DATA_SIZE / 4; count++)
+ {
+ data[count]=GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
+ UINT32_MAX);
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Generation of random data complete\n");
+ peer->io_write_handle = GNUNET_STREAM_write (peer->socket,
+ data,
+ DATA_SIZE,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 10),
+ &write_completion,
+ peer);
+ GNUNET_assert (NULL != peer->io_write_handle);
+}
+
+
+/**
+ * Function executed after stream has been established
+ *
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
+ */
+static void
+stream_open_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct PeerData *peer;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Stream established from peer1\n");
+ peer = (struct PeerData *) cls;
+ peer->bytes_wrote = 0;
+ GNUNET_assert (socket == peer1.socket);
+ GNUNET_assert (socket == peer->socket);
+ write_task = GNUNET_SCHEDULER_add_now (&stream_write_task, peer);
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Input processor
+ *
+ * @param cls peer2
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size)
+{
+ struct PeerData *peer = cls;
+
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (&peer2 == peer);
+ GNUNET_assert (size < DATA_SIZE);
+ GNUNET_assert (0 == memcmp (((void *)data ) + peer->bytes_read,
+ input_data, size));
+ peer->bytes_read += size;
+
+ if (peer->bytes_read < DATA_SIZE)
+ {
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == read_task);
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+ }
+ else
+ {
+ /* Peer2 has completed reading*/
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Reading finished successfully\n");
+ }
+ return size;
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerData *peer = cls;
+
+ read_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (&peer2 == peer);
+ peer->io_read_handle =
+ GNUNET_STREAM_read (peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 10),
+ &input_processor,
+ peer);
+ GNUNET_assert (NULL != peer->io_read_handle);
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other peers
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+stream_listen_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ if ((NULL == socket) || (NULL == initiator))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Binding error\n");
+ if (GNUNET_SCHEDULER_NO_TASK != abort_task)
+ GNUNET_SCHEDULER_cancel (abort_task);
+ abort_task = GNUNET_SCHEDULER_add_now (&do_abort, NULL);
+ return GNUNET_OK;
+ }
+ GNUNET_assert (NULL != socket);
+ GNUNET_assert (socket != peer1.socket);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer connected: %s\n", GNUNET_i2s(initiator));
+ peer2.socket = socket;
+ peer2.bytes_read = 0;
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read_task, &peer2);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Listen success callback; connects a peer to stream as client
+ */
+static void
+stream_connect (void)
+{
+ peer1.socket =
+ GNUNET_STREAM_open (config,
+ &self_id, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ &peer1,
+ GNUNET_STREAM_OPTION_TESTING_SET_WRITE_SEQUENCE_NUMBER,
+ UINT32_MAX - GNUNET_CRYPTO_random_u32
+ (GNUNET_CRYPTO_QUALITY_WEAK, 64),
+ GNUNET_STREAM_OPTION_MAX_PAYLOAD_SIZE, 500,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer1.socket);
+}
+
+
+/**
+ * Initialize framework and start test
+ */
+static void
+run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
+{
+ config = cfg;
+ self = peer;
+ (void) GNUNET_TESTING_peer_get_identity (peer, &self_id);
+ peer2_listen_socket =
+ GNUNET_STREAM_listen (config,
+ 10, /* App port */
+ &stream_listen_cb,
+ NULL,
+ GNUNET_STREAM_OPTION_LISTEN_TIMEOUT,
+ 60 * 1000,
+ GNUNET_STREAM_OPTION_SIGNAL_LISTEN_SUCCESS,
+ &stream_connect,
+ GNUNET_STREAM_OPTION_END);
+ GNUNET_assert (NULL != peer2_listen_socket);
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 100), &do_abort,
+ NULL);
+}
+
+
+/**
+ * Main function
+ */
+int main (int argc, char **argv)
+{
+ if (0 != GNUNET_TESTING_peer_run ("test_stream_sequence_wraparound",
+ "test_stream_local.conf",
+ &run, NULL))
+ return 1;
+ return (GNUNET_SYSERR == result) ? 1 : 0;
+}
+
+/* end of test_stream_sequence_wraparound.c */