aboutsummaryrefslogtreecommitdiff
path: root/src/stream
diff options
context:
space:
mode:
authorBertrand Marc <beberking@gmail.com>2012-05-02 21:43:37 +0200
committerBertrand Marc <beberking@gmail.com>2012-05-02 21:43:37 +0200
commit2b81464a43485fcc8ce079fafdee7b7a171835f4 (patch)
tree394774c0f735199b57d51a2d3840356317853fe1 /src/stream
Imported Upstream version 0.9.2upstream/0.9.2
Diffstat (limited to 'src/stream')
-rw-r--r--src/stream/Makefile.am43
-rw-r--r--src/stream/Makefile.in789
-rw-r--r--src/stream/README11
-rw-r--r--src/stream/stream_api.c2180
-rw-r--r--src/stream/stream_protocol.h197
-rw-r--r--src/stream/test_stream_local.c386
-rw-r--r--src/stream/test_stream_local.conf69
7 files changed, 3675 insertions, 0 deletions
diff --git a/src/stream/Makefile.am b/src/stream/Makefile.am
new file mode 100644
index 0000000..385c0cf
--- /dev/null
+++ b/src/stream/Makefile.am
@@ -0,0 +1,43 @@
+INCLUDES = -I$(top_srcdir)/src/include
+
+if MINGW
+ WINFLAGS = -Wl,--no-undefined -Wl,--export-all-symbols
+endif
+
+if USE_COVERAGE
+ AM_CFLAGS = --coverage -O0
+ XLIB = -lgcov
+endif
+
+lib_LTLIBRARIES = libgnunetstream.la
+
+libgnunetstream_la_SOURCES = \
+ stream_api.c stream_protocol.h
+libgnunetstream_la_LIBADD = \
+ $(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/util/libgnunetutil.la $(XLIB)
+libgnunetstream_la_LDFLAGS = \
+ $(GN_LIB_LDFLAGS)
+
+check_PROGRAMS = \
+ test_stream_local
+# test_stream_halfclose
+
+EXTRA_DIST = test_stream_local.conf
+
+if ENABLE_TEST_RUN
+TESTS = $(check_PROGRAMS)
+endif
+
+test_stream_local_SOURCES = \
+ test_stream_local.c
+test_stream_local_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+#test_stream_halfclose_SOURCES = \
+# test_stream_halfclose.c
+#test_stream_halfclose_LDADD = \
+# $(top_builddir)/src/stream/libgnunetstream.la \
+# $(top_builddir)/src/util/libgnunetutil.la
+
diff --git a/src/stream/Makefile.in b/src/stream/Makefile.in
new file mode 100644
index 0000000..44b63f7
--- /dev/null
+++ b/src/stream/Makefile.in
@@ -0,0 +1,789 @@
+# Makefile.in generated by automake 1.11.1 from Makefile.am.
+# @configure_input@
+
+# Copyright (C) 1994, 1995, 1996, 1997, 1998, 1999, 2000, 2001, 2002,
+# 2003, 2004, 2005, 2006, 2007, 2008, 2009 Free Software Foundation,
+# Inc.
+# This Makefile.in is free software; the Free Software Foundation
+# gives unlimited permission to copy and/or distribute it,
+# with or without modifications, as long as this notice is preserved.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY, to the extent permitted by law; without
+# even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+# PARTICULAR PURPOSE.
+
+@SET_MAKE@
+
+VPATH = @srcdir@
+pkgdatadir = $(datadir)/@PACKAGE@
+pkgincludedir = $(includedir)/@PACKAGE@
+pkglibdir = $(libdir)/@PACKAGE@
+pkglibexecdir = $(libexecdir)/@PACKAGE@
+am__cd = CDPATH="$${ZSH_VERSION+.}$(PATH_SEPARATOR)" && cd
+install_sh_DATA = $(install_sh) -c -m 644
+install_sh_PROGRAM = $(install_sh) -c
+install_sh_SCRIPT = $(install_sh) -c
+INSTALL_HEADER = $(INSTALL_DATA)
+transform = $(program_transform_name)
+NORMAL_INSTALL = :
+PRE_INSTALL = :
+POST_INSTALL = :
+NORMAL_UNINSTALL = :
+PRE_UNINSTALL = :
+POST_UNINSTALL = :
+build_triplet = @build@
+host_triplet = @host@
+target_triplet = @target@
+check_PROGRAMS = test_stream_local$(EXEEXT)
+subdir = src/stream
+DIST_COMMON = README $(srcdir)/Makefile.am $(srcdir)/Makefile.in
+ACLOCAL_M4 = $(top_srcdir)/aclocal.m4
+am__aclocal_m4_deps = $(top_srcdir)/m4/absolute-header.m4 \
+ $(top_srcdir)/m4/align.m4 $(top_srcdir)/m4/argz.m4 \
+ $(top_srcdir)/m4/gettext.m4 $(top_srcdir)/m4/iconv.m4 \
+ $(top_srcdir)/m4/lib-ld.m4 $(top_srcdir)/m4/lib-link.m4 \
+ $(top_srcdir)/m4/lib-prefix.m4 $(top_srcdir)/m4/libcurl.m4 \
+ $(top_srcdir)/m4/libgcrypt.m4 $(top_srcdir)/m4/libtool.m4 \
+ $(top_srcdir)/m4/libunistring.m4 $(top_srcdir)/m4/ltdl.m4 \
+ $(top_srcdir)/m4/ltoptions.m4 $(top_srcdir)/m4/ltsugar.m4 \
+ $(top_srcdir)/m4/ltversion.m4 $(top_srcdir)/m4/lt~obsolete.m4 \
+ $(top_srcdir)/m4/nls.m4 $(top_srcdir)/m4/po.m4 \
+ $(top_srcdir)/m4/progtest.m4 $(top_srcdir)/acinclude.m4 \
+ $(top_srcdir)/configure.ac
+am__configure_deps = $(am__aclocal_m4_deps) $(CONFIGURE_DEPENDENCIES) \
+ $(ACLOCAL_M4)
+mkinstalldirs = $(install_sh) -d
+CONFIG_HEADER = $(top_builddir)/gnunet_config.h
+CONFIG_CLEAN_FILES =
+CONFIG_CLEAN_VPATH_FILES =
+am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
+am__vpath_adj = case $$p in \
+ $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
+ *) f=$$p;; \
+ esac;
+am__strip_dir = f=`echo $$p | sed -e 's|^.*/||'`;
+am__install_max = 40
+am__nobase_strip_setup = \
+ srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*|]/\\\\&/g'`
+am__nobase_strip = \
+ for p in $$list; do echo "$$p"; done | sed -e "s|$$srcdirstrip/||"
+am__nobase_list = $(am__nobase_strip_setup); \
+ for p in $$list; do echo "$$p $$p"; done | \
+ sed "s| $$srcdirstrip/| |;"' / .*\//!s/ .*/ ./; s,\( .*\)/[^/]*$$,\1,' | \
+ $(AWK) 'BEGIN { files["."] = "" } { files[$$2] = files[$$2] " " $$1; \
+ if (++n[$$2] == $(am__install_max)) \
+ { print $$2, files[$$2]; n[$$2] = 0; files[$$2] = "" } } \
+ END { for (dir in files) print dir, files[dir] }'
+am__base_list = \
+ sed '$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;$$!N;s/\n/ /g' | \
+ sed '$$!N;$$!N;$$!N;$$!N;s/\n/ /g'
+am__installdirs = "$(DESTDIR)$(libdir)"
+LTLIBRARIES = $(lib_LTLIBRARIES)
+am__DEPENDENCIES_1 =
+libgnunetstream_la_DEPENDENCIES = \
+ $(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(am__DEPENDENCIES_1)
+am_libgnunetstream_la_OBJECTS = stream_api.lo
+libgnunetstream_la_OBJECTS = $(am_libgnunetstream_la_OBJECTS)
+AM_V_lt = $(am__v_lt_$(V))
+am__v_lt_ = $(am__v_lt_$(AM_DEFAULT_VERBOSITY))
+am__v_lt_0 = --silent
+libgnunetstream_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \
+ $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=link $(CCLD) \
+ $(AM_CFLAGS) $(CFLAGS) $(libgnunetstream_la_LDFLAGS) \
+ $(LDFLAGS) -o $@
+am_test_stream_local_OBJECTS = test_stream_local.$(OBJEXT)
+test_stream_local_OBJECTS = $(am_test_stream_local_OBJECTS)
+test_stream_local_DEPENDENCIES = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+DEFAULT_INCLUDES = -I.@am__isrc@ -I$(top_builddir)
+depcomp = $(SHELL) $(top_srcdir)/depcomp
+am__depfiles_maybe = depfiles
+am__mv = mv -f
+COMPILE = $(CC) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) \
+ $(CPPFLAGS) $(AM_CFLAGS) $(CFLAGS)
+LTCOMPILE = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
+ $(LIBTOOLFLAGS) --mode=compile $(CC) $(DEFS) \
+ $(DEFAULT_INCLUDES) $(INCLUDES) $(AM_CPPFLAGS) $(CPPFLAGS) \
+ $(AM_CFLAGS) $(CFLAGS)
+AM_V_CC = $(am__v_CC_$(V))
+am__v_CC_ = $(am__v_CC_$(AM_DEFAULT_VERBOSITY))
+am__v_CC_0 = @echo " CC " $@;
+AM_V_at = $(am__v_at_$(V))
+am__v_at_ = $(am__v_at_$(AM_DEFAULT_VERBOSITY))
+am__v_at_0 = @
+CCLD = $(CC)
+LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC $(AM_LIBTOOLFLAGS) \
+ $(LIBTOOLFLAGS) --mode=link $(CCLD) $(AM_CFLAGS) $(CFLAGS) \
+ $(AM_LDFLAGS) $(LDFLAGS) -o $@
+AM_V_CCLD = $(am__v_CCLD_$(V))
+am__v_CCLD_ = $(am__v_CCLD_$(AM_DEFAULT_VERBOSITY))
+am__v_CCLD_0 = @echo " CCLD " $@;
+AM_V_GEN = $(am__v_GEN_$(V))
+am__v_GEN_ = $(am__v_GEN_$(AM_DEFAULT_VERBOSITY))
+am__v_GEN_0 = @echo " GEN " $@;
+SOURCES = $(libgnunetstream_la_SOURCES) $(test_stream_local_SOURCES)
+DIST_SOURCES = $(libgnunetstream_la_SOURCES) \
+ $(test_stream_local_SOURCES)
+ETAGS = etags
+CTAGS = ctags
+am__tty_colors = \
+red=; grn=; lgn=; blu=; std=
+DISTFILES = $(DIST_COMMON) $(DIST_SOURCES) $(TEXINFOS) $(EXTRA_DIST)
+ACLOCAL = @ACLOCAL@
+AMTAR = @AMTAR@
+AM_DEFAULT_VERBOSITY = @AM_DEFAULT_VERBOSITY@
+AR = @AR@
+ARGZ_H = @ARGZ_H@
+AS = @AS@
+AUTOCONF = @AUTOCONF@
+AUTOHEADER = @AUTOHEADER@
+AUTOMAKE = @AUTOMAKE@
+AWK = @AWK@
+CC = @CC@
+CCDEPMODE = @CCDEPMODE@
+CFLAGS = @CFLAGS@
+CPP = @CPP@
+CPPFLAGS = @CPPFLAGS@
+CXX = @CXX@
+CXXCPP = @CXXCPP@
+CXXDEPMODE = @CXXDEPMODE@
+CXXFLAGS = @CXXFLAGS@
+CYGPATH_W = @CYGPATH_W@
+DEFAULT_INTERFACE = @DEFAULT_INTERFACE@
+DEFS = @DEFS@
+DEPDIR = @DEPDIR@
+DLLDIR = @DLLDIR@
+DLLTOOL = @DLLTOOL@
+DSYMUTIL = @DSYMUTIL@
+DUMPBIN = @DUMPBIN@
+ECHO_C = @ECHO_C@
+ECHO_N = @ECHO_N@
+ECHO_T = @ECHO_T@
+EGREP = @EGREP@
+EXEEXT = @EXEEXT@
+EXT_LIBS = @EXT_LIBS@
+EXT_LIB_PATH = @EXT_LIB_PATH@
+FGREP = @FGREP@
+GMSGFMT = @GMSGFMT@
+GMSGFMT_015 = @GMSGFMT_015@
+GNUNETDNS_GROUP = @GNUNETDNS_GROUP@
+GN_DAEMON_CONFIG_DIR = @GN_DAEMON_CONFIG_DIR@
+GN_DAEMON_HOME_DIR = @GN_DAEMON_HOME_DIR@
+GN_INTLINCL = @GN_INTLINCL@
+GN_LIBINTL = @GN_LIBINTL@
+GN_LIB_LDFLAGS = @GN_LIB_LDFLAGS@
+GN_PLUGIN_LDFLAGS = @GN_PLUGIN_LDFLAGS@
+GN_USER_HOME_DIR = @GN_USER_HOME_DIR@
+GREP = @GREP@
+HAVE_LIBUNISTRING = @HAVE_LIBUNISTRING@
+INCLTDL = @INCLTDL@
+INSTALL = @INSTALL@
+INSTALL_DATA = @INSTALL_DATA@
+INSTALL_PROGRAM = @INSTALL_PROGRAM@
+INSTALL_SCRIPT = @INSTALL_SCRIPT@
+INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
+INTLLIBS = @INTLLIBS@
+INTL_MACOSX_LIBS = @INTL_MACOSX_LIBS@
+LD = @LD@
+LDFLAGS = @LDFLAGS@
+LIBADD_DL = @LIBADD_DL@
+LIBADD_DLD_LINK = @LIBADD_DLD_LINK@
+LIBADD_DLOPEN = @LIBADD_DLOPEN@
+LIBADD_SHL_LOAD = @LIBADD_SHL_LOAD@
+LIBCURL = @LIBCURL@
+LIBCURL_CPPFLAGS = @LIBCURL_CPPFLAGS@
+LIBGCRYPT_CFLAGS = @LIBGCRYPT_CFLAGS@
+LIBGCRYPT_CONFIG = @LIBGCRYPT_CONFIG@
+LIBGCRYPT_LIBS = @LIBGCRYPT_LIBS@
+LIBICONV = @LIBICONV@
+LIBINTL = @LIBINTL@
+LIBLTDL = @LIBLTDL@
+LIBOBJS = @LIBOBJS@
+LIBPREFIX = @LIBPREFIX@
+LIBS = @LIBS@
+LIBTOOL = @LIBTOOL@
+LIBUNISTRING = @LIBUNISTRING@
+LIPO = @LIPO@
+LN_S = @LN_S@
+LTDLDEPS = @LTDLDEPS@
+LTDLINCL = @LTDLINCL@
+LTDLOPEN = @LTDLOPEN@
+LTLIBICONV = @LTLIBICONV@
+LTLIBINTL = @LTLIBINTL@
+LTLIBOBJS = @LTLIBOBJS@
+LTLIBUNISTRING = @LTLIBUNISTRING@
+LT_CONFIG_H = @LT_CONFIG_H@
+LT_DLLOADERS = @LT_DLLOADERS@
+LT_DLPREOPEN = @LT_DLPREOPEN@
+MAKEINFO = @MAKEINFO@
+MKDIR_P = @MKDIR_P@
+MSGFMT = @MSGFMT@
+MSGFMT_015 = @MSGFMT_015@
+MSGMERGE = @MSGMERGE@
+MYSQL_CPPFLAGS = @MYSQL_CPPFLAGS@
+MYSQL_LDFLAGS = @MYSQL_LDFLAGS@
+NM = @NM@
+NMEDIT = @NMEDIT@
+OBJC = @OBJC@
+OBJCDEPMODE = @OBJCDEPMODE@
+OBJCFLAGS = @OBJCFLAGS@
+OBJDUMP = @OBJDUMP@
+OBJEXT = @OBJEXT@
+OTOOL = @OTOOL@
+OTOOL64 = @OTOOL64@
+PACKAGE = @PACKAGE@
+PACKAGE_BUGREPORT = @PACKAGE_BUGREPORT@
+PACKAGE_NAME = @PACKAGE_NAME@
+PACKAGE_STRING = @PACKAGE_STRING@
+PACKAGE_TARNAME = @PACKAGE_TARNAME@
+PACKAGE_URL = @PACKAGE_URL@
+PACKAGE_VERSION = @PACKAGE_VERSION@
+PATH_SEPARATOR = @PATH_SEPARATOR@
+POSTGRES_CPPFLAGS = @POSTGRES_CPPFLAGS@
+POSTGRES_LDFLAGS = @POSTGRES_LDFLAGS@
+POSUB = @POSUB@
+PYTHON = @PYTHON@
+PYTHON_EXEC_PREFIX = @PYTHON_EXEC_PREFIX@
+PYTHON_PLATFORM = @PYTHON_PLATFORM@
+PYTHON_PREFIX = @PYTHON_PREFIX@
+PYTHON_VERSION = @PYTHON_VERSION@
+RANLIB = @RANLIB@
+SED = @SED@
+SET_MAKE = @SET_MAKE@
+SHELL = @SHELL@
+SQLITE_CPPFLAGS = @SQLITE_CPPFLAGS@
+SQLITE_LDFLAGS = @SQLITE_LDFLAGS@
+STRIP = @STRIP@
+SUDO_BINARY = @SUDO_BINARY@
+UNIXONLY = @UNIXONLY@
+USE_NLS = @USE_NLS@
+VERSION = @VERSION@
+XGETTEXT = @XGETTEXT@
+XGETTEXT_015 = @XGETTEXT_015@
+XMKMF = @XMKMF@
+X_CFLAGS = @X_CFLAGS@
+X_EXTRA_LIBS = @X_EXTRA_LIBS@
+X_LIBS = @X_LIBS@
+X_PRE_LIBS = @X_PRE_LIBS@
+_libcurl_config = @_libcurl_config@
+abs_builddir = @abs_builddir@
+abs_srcdir = @abs_srcdir@
+abs_top_builddir = @abs_top_builddir@
+abs_top_srcdir = @abs_top_srcdir@
+ac_ct_CC = @ac_ct_CC@
+ac_ct_CXX = @ac_ct_CXX@
+ac_ct_DUMPBIN = @ac_ct_DUMPBIN@
+ac_ct_OBJC = @ac_ct_OBJC@
+am__include = @am__include@
+am__leading_dot = @am__leading_dot@
+am__quote = @am__quote@
+am__tar = @am__tar@
+am__untar = @am__untar@
+bindir = @bindir@
+build = @build@
+build_alias = @build_alias@
+build_cpu = @build_cpu@
+build_os = @build_os@
+build_target = @build_target@
+build_vendor = @build_vendor@
+builddir = @builddir@
+datadir = @datadir@
+datarootdir = @datarootdir@
+docdir = @docdir@
+dvidir = @dvidir@
+exec_prefix = @exec_prefix@
+host = @host@
+host_alias = @host_alias@
+host_cpu = @host_cpu@
+host_os = @host_os@
+host_vendor = @host_vendor@
+htmldir = @htmldir@
+includedir = @includedir@
+infodir = @infodir@
+install_sh = @install_sh@
+libdir = @libdir@
+libexecdir = @libexecdir@
+localedir = @localedir@
+localstatedir = @localstatedir@
+lt_ECHO = @lt_ECHO@
+ltdl_LIBOBJS = @ltdl_LIBOBJS@
+ltdl_LTLIBOBJS = @ltdl_LTLIBOBJS@
+mandir = @mandir@
+mkdir_p = @mkdir_p@
+oldincludedir = @oldincludedir@
+pdfdir = @pdfdir@
+pkgpyexecdir = @pkgpyexecdir@
+pkgpythondir = @pkgpythondir@
+prefix = @prefix@
+program_transform_name = @program_transform_name@
+psdir = @psdir@
+pyexecdir = @pyexecdir@
+pythondir = @pythondir@
+sbindir = @sbindir@
+sharedstatedir = @sharedstatedir@
+srcdir = @srcdir@
+subdirs = @subdirs@
+sys_symbol_underscore = @sys_symbol_underscore@
+sysconfdir = @sysconfdir@
+target = @target@
+target_alias = @target_alias@
+target_cpu = @target_cpu@
+target_os = @target_os@
+target_vendor = @target_vendor@
+top_build_prefix = @top_build_prefix@
+top_builddir = @top_builddir@
+top_srcdir = @top_srcdir@
+INCLUDES = -I$(top_srcdir)/src/include
+@MINGW_TRUE@WINFLAGS = -Wl,--no-undefined -Wl,--export-all-symbols
+@USE_COVERAGE_TRUE@AM_CFLAGS = --coverage -O0
+@USE_COVERAGE_TRUE@XLIB = -lgcov
+lib_LTLIBRARIES = libgnunetstream.la
+libgnunetstream_la_SOURCES = \
+ stream_api.c stream_protocol.h
+
+libgnunetstream_la_LIBADD = \
+ $(top_builddir)/src/mesh/libgnunetmesh.la \
+ $(top_builddir)/src/util/libgnunetutil.la $(XLIB)
+
+libgnunetstream_la_LDFLAGS = \
+ $(GN_LIB_LDFLAGS)
+
+# test_stream_halfclose
+EXTRA_DIST = test_stream_local.conf
+@ENABLE_TEST_RUN_TRUE@TESTS = $(check_PROGRAMS)
+test_stream_local_SOURCES = \
+ test_stream_local.c
+
+test_stream_local_LDADD = \
+ $(top_builddir)/src/stream/libgnunetstream.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+all: all-am
+
+.SUFFIXES:
+.SUFFIXES: .c .lo .o .obj
+$(srcdir)/Makefile.in: $(srcdir)/Makefile.am $(am__configure_deps)
+ @for dep in $?; do \
+ case '$(am__configure_deps)' in \
+ *$$dep*) \
+ ( cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh ) \
+ && { if test -f $@; then exit 0; else break; fi; }; \
+ exit 1;; \
+ esac; \
+ done; \
+ echo ' cd $(top_srcdir) && $(AUTOMAKE) --gnu src/stream/Makefile'; \
+ $(am__cd) $(top_srcdir) && \
+ $(AUTOMAKE) --gnu src/stream/Makefile
+.PRECIOUS: Makefile
+Makefile: $(srcdir)/Makefile.in $(top_builddir)/config.status
+ @case '$?' in \
+ *config.status*) \
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh;; \
+ *) \
+ echo ' cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe)'; \
+ cd $(top_builddir) && $(SHELL) ./config.status $(subdir)/$@ $(am__depfiles_maybe);; \
+ esac;
+
+$(top_builddir)/config.status: $(top_srcdir)/configure $(CONFIG_STATUS_DEPENDENCIES)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+
+$(top_srcdir)/configure: $(am__configure_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(ACLOCAL_M4): $(am__aclocal_m4_deps)
+ cd $(top_builddir) && $(MAKE) $(AM_MAKEFLAGS) am--refresh
+$(am__aclocal_m4_deps):
+install-libLTLIBRARIES: $(lib_LTLIBRARIES)
+ @$(NORMAL_INSTALL)
+ test -z "$(libdir)" || $(MKDIR_P) "$(DESTDIR)$(libdir)"
+ @list='$(lib_LTLIBRARIES)'; test -n "$(libdir)" || list=; \
+ list2=; for p in $$list; do \
+ if test -f $$p; then \
+ list2="$$list2 $$p"; \
+ else :; fi; \
+ done; \
+ test -z "$$list2" || { \
+ echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 '$(DESTDIR)$(libdir)'"; \
+ $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=install $(INSTALL) $(INSTALL_STRIP_FLAG) $$list2 "$(DESTDIR)$(libdir)"; \
+ }
+
+uninstall-libLTLIBRARIES:
+ @$(NORMAL_UNINSTALL)
+ @list='$(lib_LTLIBRARIES)'; test -n "$(libdir)" || list=; \
+ for p in $$list; do \
+ $(am__strip_dir) \
+ echo " $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f '$(DESTDIR)$(libdir)/$$f'"; \
+ $(LIBTOOL) $(AM_LIBTOOLFLAGS) $(LIBTOOLFLAGS) --mode=uninstall rm -f "$(DESTDIR)$(libdir)/$$f"; \
+ done
+
+clean-libLTLIBRARIES:
+ -test -z "$(lib_LTLIBRARIES)" || rm -f $(lib_LTLIBRARIES)
+ @list='$(lib_LTLIBRARIES)'; for p in $$list; do \
+ dir="`echo $$p | sed -e 's|/[^/]*$$||'`"; \
+ test "$$dir" != "$$p" || dir=.; \
+ echo "rm -f \"$${dir}/so_locations\""; \
+ rm -f "$${dir}/so_locations"; \
+ done
+libgnunetstream.la: $(libgnunetstream_la_OBJECTS) $(libgnunetstream_la_DEPENDENCIES)
+ $(AM_V_CCLD)$(libgnunetstream_la_LINK) -rpath $(libdir) $(libgnunetstream_la_OBJECTS) $(libgnunetstream_la_LIBADD) $(LIBS)
+
+clean-checkPROGRAMS:
+ @list='$(check_PROGRAMS)'; test -n "$$list" || exit 0; \
+ echo " rm -f" $$list; \
+ rm -f $$list || exit $$?; \
+ test -n "$(EXEEXT)" || exit 0; \
+ list=`for p in $$list; do echo "$$p"; done | sed 's/$(EXEEXT)$$//'`; \
+ echo " rm -f" $$list; \
+ rm -f $$list
+test_stream_local$(EXEEXT): $(test_stream_local_OBJECTS) $(test_stream_local_DEPENDENCIES)
+ @rm -f test_stream_local$(EXEEXT)
+ $(AM_V_CCLD)$(LINK) $(test_stream_local_OBJECTS) $(test_stream_local_LDADD) $(LIBS)
+
+mostlyclean-compile:
+ -rm -f *.$(OBJEXT)
+
+distclean-compile:
+ -rm -f *.tab.c
+
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/stream_api.Plo@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/test_stream_local.Po@am__quote@
+
+.c.o:
+@am__fastdepCC_TRUE@ $(AM_V_CC)$(COMPILE) -MT $@ -MD -MP -MF $(DEPDIR)/$*.Tpo -c -o $@ $<
+@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/$*.Tpo $(DEPDIR)/$*.Po
+@am__fastdepCC_FALSE@ $(AM_V_CC) @AM_BACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(COMPILE) -c $<
+
+.c.obj:
+@am__fastdepCC_TRUE@ $(AM_V_CC)$(COMPILE) -MT $@ -MD -MP -MF $(DEPDIR)/$*.Tpo -c -o $@ `$(CYGPATH_W) '$<'`
+@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/$*.Tpo $(DEPDIR)/$*.Po
+@am__fastdepCC_FALSE@ $(AM_V_CC) @AM_BACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(COMPILE) -c `$(CYGPATH_W) '$<'`
+
+.c.lo:
+@am__fastdepCC_TRUE@ $(AM_V_CC)$(LTCOMPILE) -MT $@ -MD -MP -MF $(DEPDIR)/$*.Tpo -c -o $@ $<
+@am__fastdepCC_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/$*.Tpo $(DEPDIR)/$*.Plo
+@am__fastdepCC_FALSE@ $(AM_V_CC) @AM_BACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ source='$<' object='$@' libtool=yes @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCC_FALSE@ DEPDIR=$(DEPDIR) $(CCDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCC_FALSE@ $(LTCOMPILE) -c -o $@ $<
+
+mostlyclean-libtool:
+ -rm -f *.lo
+
+clean-libtool:
+ -rm -rf .libs _libs
+
+ID: $(HEADERS) $(SOURCES) $(LISP) $(TAGS_FILES)
+ list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \
+ unique=`for i in $$list; do \
+ if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+ done | \
+ $(AWK) '{ files[$$0] = 1; nonempty = 1; } \
+ END { if (nonempty) { for (i in files) print i; }; }'`; \
+ mkid -fID $$unique
+tags: TAGS
+
+TAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \
+ $(TAGS_FILES) $(LISP)
+ set x; \
+ here=`pwd`; \
+ list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \
+ unique=`for i in $$list; do \
+ if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+ done | \
+ $(AWK) '{ files[$$0] = 1; nonempty = 1; } \
+ END { if (nonempty) { for (i in files) print i; }; }'`; \
+ shift; \
+ if test -z "$(ETAGS_ARGS)$$*$$unique"; then :; else \
+ test -n "$$unique" || unique=$$empty_fix; \
+ if test $$# -gt 0; then \
+ $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+ "$$@" $$unique; \
+ else \
+ $(ETAGS) $(ETAGSFLAGS) $(AM_ETAGSFLAGS) $(ETAGS_ARGS) \
+ $$unique; \
+ fi; \
+ fi
+ctags: CTAGS
+CTAGS: $(HEADERS) $(SOURCES) $(TAGS_DEPENDENCIES) \
+ $(TAGS_FILES) $(LISP)
+ list='$(SOURCES) $(HEADERS) $(LISP) $(TAGS_FILES)'; \
+ unique=`for i in $$list; do \
+ if test -f "$$i"; then echo $$i; else echo $(srcdir)/$$i; fi; \
+ done | \
+ $(AWK) '{ files[$$0] = 1; nonempty = 1; } \
+ END { if (nonempty) { for (i in files) print i; }; }'`; \
+ test -z "$(CTAGS_ARGS)$$unique" \
+ || $(CTAGS) $(CTAGSFLAGS) $(AM_CTAGSFLAGS) $(CTAGS_ARGS) \
+ $$unique
+
+GTAGS:
+ here=`$(am__cd) $(top_builddir) && pwd` \
+ && $(am__cd) $(top_srcdir) \
+ && gtags -i $(GTAGS_ARGS) "$$here"
+
+distclean-tags:
+ -rm -f TAGS ID GTAGS GRTAGS GSYMS GPATH tags
+
+check-TESTS: $(TESTS)
+ @failed=0; all=0; xfail=0; xpass=0; skip=0; \
+ srcdir=$(srcdir); export srcdir; \
+ list=' $(TESTS) '; \
+ $(am__tty_colors); \
+ if test -n "$$list"; then \
+ for tst in $$list; do \
+ if test -f ./$$tst; then dir=./; \
+ elif test -f $$tst; then dir=; \
+ else dir="$(srcdir)/"; fi; \
+ if $(TESTS_ENVIRONMENT) $${dir}$$tst; then \
+ all=`expr $$all + 1`; \
+ case " $(XFAIL_TESTS) " in \
+ *[\ \ ]$$tst[\ \ ]*) \
+ xpass=`expr $$xpass + 1`; \
+ failed=`expr $$failed + 1`; \
+ col=$$red; res=XPASS; \
+ ;; \
+ *) \
+ col=$$grn; res=PASS; \
+ ;; \
+ esac; \
+ elif test $$? -ne 77; then \
+ all=`expr $$all + 1`; \
+ case " $(XFAIL_TESTS) " in \
+ *[\ \ ]$$tst[\ \ ]*) \
+ xfail=`expr $$xfail + 1`; \
+ col=$$lgn; res=XFAIL; \
+ ;; \
+ *) \
+ failed=`expr $$failed + 1`; \
+ col=$$red; res=FAIL; \
+ ;; \
+ esac; \
+ else \
+ skip=`expr $$skip + 1`; \
+ col=$$blu; res=SKIP; \
+ fi; \
+ echo "$${col}$$res$${std}: $$tst"; \
+ done; \
+ if test "$$all" -eq 1; then \
+ tests="test"; \
+ All=""; \
+ else \
+ tests="tests"; \
+ All="All "; \
+ fi; \
+ if test "$$failed" -eq 0; then \
+ if test "$$xfail" -eq 0; then \
+ banner="$$All$$all $$tests passed"; \
+ else \
+ if test "$$xfail" -eq 1; then failures=failure; else failures=failures; fi; \
+ banner="$$All$$all $$tests behaved as expected ($$xfail expected $$failures)"; \
+ fi; \
+ else \
+ if test "$$xpass" -eq 0; then \
+ banner="$$failed of $$all $$tests failed"; \
+ else \
+ if test "$$xpass" -eq 1; then passes=pass; else passes=passes; fi; \
+ banner="$$failed of $$all $$tests did not behave as expected ($$xpass unexpected $$passes)"; \
+ fi; \
+ fi; \
+ dashes="$$banner"; \
+ skipped=""; \
+ if test "$$skip" -ne 0; then \
+ if test "$$skip" -eq 1; then \
+ skipped="($$skip test was not run)"; \
+ else \
+ skipped="($$skip tests were not run)"; \
+ fi; \
+ test `echo "$$skipped" | wc -c` -le `echo "$$banner" | wc -c` || \
+ dashes="$$skipped"; \
+ fi; \
+ report=""; \
+ if test "$$failed" -ne 0 && test -n "$(PACKAGE_BUGREPORT)"; then \
+ report="Please report to $(PACKAGE_BUGREPORT)"; \
+ test `echo "$$report" | wc -c` -le `echo "$$banner" | wc -c` || \
+ dashes="$$report"; \
+ fi; \
+ dashes=`echo "$$dashes" | sed s/./=/g`; \
+ if test "$$failed" -eq 0; then \
+ echo "$$grn$$dashes"; \
+ else \
+ echo "$$red$$dashes"; \
+ fi; \
+ echo "$$banner"; \
+ test -z "$$skipped" || echo "$$skipped"; \
+ test -z "$$report" || echo "$$report"; \
+ echo "$$dashes$$std"; \
+ test "$$failed" -eq 0; \
+ else :; fi
+
+distdir: $(DISTFILES)
+ @srcdirstrip=`echo "$(srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
+ topsrcdirstrip=`echo "$(top_srcdir)" | sed 's/[].[^$$\\*]/\\\\&/g'`; \
+ list='$(DISTFILES)'; \
+ dist_files=`for file in $$list; do echo $$file; done | \
+ sed -e "s|^$$srcdirstrip/||;t" \
+ -e "s|^$$topsrcdirstrip/|$(top_builddir)/|;t"`; \
+ case $$dist_files in \
+ */*) $(MKDIR_P) `echo "$$dist_files" | \
+ sed '/\//!d;s|^|$(distdir)/|;s,/[^/]*$$,,' | \
+ sort -u` ;; \
+ esac; \
+ for file in $$dist_files; do \
+ if test -f $$file || test -d $$file; then d=.; else d=$(srcdir); fi; \
+ if test -d $$d/$$file; then \
+ dir=`echo "/$$file" | sed -e 's,/[^/]*$$,,'`; \
+ if test -d "$(distdir)/$$file"; then \
+ find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
+ fi; \
+ if test -d $(srcdir)/$$file && test $$d != $(srcdir); then \
+ cp -fpR $(srcdir)/$$file "$(distdir)$$dir" || exit 1; \
+ find "$(distdir)/$$file" -type d ! -perm -700 -exec chmod u+rwx {} \;; \
+ fi; \
+ cp -fpR $$d/$$file "$(distdir)$$dir" || exit 1; \
+ else \
+ test -f "$(distdir)/$$file" \
+ || cp -p $$d/$$file "$(distdir)/$$file" \
+ || exit 1; \
+ fi; \
+ done
+check-am: all-am
+ $(MAKE) $(AM_MAKEFLAGS) $(check_PROGRAMS)
+ $(MAKE) $(AM_MAKEFLAGS) check-TESTS
+check: check-am
+all-am: Makefile $(LTLIBRARIES)
+installdirs:
+ for dir in "$(DESTDIR)$(libdir)"; do \
+ test -z "$$dir" || $(MKDIR_P) "$$dir"; \
+ done
+install: install-am
+install-exec: install-exec-am
+install-data: install-data-am
+uninstall: uninstall-am
+
+install-am: all-am
+ @$(MAKE) $(AM_MAKEFLAGS) install-exec-am install-data-am
+
+installcheck: installcheck-am
+install-strip:
+ $(MAKE) $(AM_MAKEFLAGS) INSTALL_PROGRAM="$(INSTALL_STRIP_PROGRAM)" \
+ install_sh_PROGRAM="$(INSTALL_STRIP_PROGRAM)" INSTALL_STRIP_FLAG=-s \
+ `test -z '$(STRIP)' || \
+ echo "INSTALL_PROGRAM_ENV=STRIPPROG='$(STRIP)'"` install
+mostlyclean-generic:
+
+clean-generic:
+
+distclean-generic:
+ -test -z "$(CONFIG_CLEAN_FILES)" || rm -f $(CONFIG_CLEAN_FILES)
+ -test . = "$(srcdir)" || test -z "$(CONFIG_CLEAN_VPATH_FILES)" || rm -f $(CONFIG_CLEAN_VPATH_FILES)
+
+maintainer-clean-generic:
+ @echo "This command is intended for maintainers to use"
+ @echo "it deletes files that may require special tools to rebuild."
+clean: clean-am
+
+clean-am: clean-checkPROGRAMS clean-generic clean-libLTLIBRARIES \
+ clean-libtool mostlyclean-am
+
+distclean: distclean-am
+ -rm -rf ./$(DEPDIR)
+ -rm -f Makefile
+distclean-am: clean-am distclean-compile distclean-generic \
+ distclean-tags
+
+dvi: dvi-am
+
+dvi-am:
+
+html: html-am
+
+html-am:
+
+info: info-am
+
+info-am:
+
+install-data-am:
+
+install-dvi: install-dvi-am
+
+install-dvi-am:
+
+install-exec-am: install-libLTLIBRARIES
+
+install-html: install-html-am
+
+install-html-am:
+
+install-info: install-info-am
+
+install-info-am:
+
+install-man:
+
+install-pdf: install-pdf-am
+
+install-pdf-am:
+
+install-ps: install-ps-am
+
+install-ps-am:
+
+installcheck-am:
+
+maintainer-clean: maintainer-clean-am
+ -rm -rf ./$(DEPDIR)
+ -rm -f Makefile
+maintainer-clean-am: distclean-am maintainer-clean-generic
+
+mostlyclean: mostlyclean-am
+
+mostlyclean-am: mostlyclean-compile mostlyclean-generic \
+ mostlyclean-libtool
+
+pdf: pdf-am
+
+pdf-am:
+
+ps: ps-am
+
+ps-am:
+
+uninstall-am: uninstall-libLTLIBRARIES
+
+.MAKE: check-am install-am install-strip
+
+.PHONY: CTAGS GTAGS all all-am check check-TESTS check-am clean \
+ clean-checkPROGRAMS clean-generic clean-libLTLIBRARIES \
+ clean-libtool ctags distclean distclean-compile \
+ distclean-generic distclean-libtool distclean-tags distdir dvi \
+ dvi-am html html-am info info-am install install-am \
+ install-data install-data-am install-dvi install-dvi-am \
+ install-exec install-exec-am install-html install-html-am \
+ install-info install-info-am install-libLTLIBRARIES \
+ install-man install-pdf install-pdf-am install-ps \
+ install-ps-am install-strip installcheck installcheck-am \
+ installdirs maintainer-clean maintainer-clean-generic \
+ mostlyclean mostlyclean-compile mostlyclean-generic \
+ mostlyclean-libtool pdf pdf-am ps ps-am tags uninstall \
+ uninstall-am uninstall-libLTLIBRARIES
+
+
+#test_stream_halfclose_SOURCES = \
+# test_stream_halfclose.c
+#test_stream_halfclose_LDADD = \
+# $(top_builddir)/src/stream/libgnunetstream.la \
+# $(top_builddir)/src/util/libgnunetutil.la
+
+# Tell versions [3.59,3.63) of GNU make to not export all variables.
+# Otherwise a system limit (for SysV at least) may be exceeded.
+.NOEXPORT:
diff --git a/src/stream/README b/src/stream/README
new file mode 100644
index 0000000..9b550b0
--- /dev/null
+++ b/src/stream/README
@@ -0,0 +1,11 @@
+The aim of the stream library is to provide stream connections between peers in
+GNUnet. This is a convenience library which hides the complexity of dividing
+data stream into packets, transmitting them and retransmitting them in case of
+errors.
+
+This library's API are similar to unix PIPE API. The user is expected to open a
+stream to a listening target peer. Once the stream is established, the user can
+use it as a pipe. Any data written into the stream will be readable by the
+target peer.
+
+This library uses mesh API for establishing streams between peers. \ No newline at end of file
diff --git a/src/stream/stream_api.c b/src/stream/stream_api.c
new file mode 100644
index 0000000..84fcdfd
--- /dev/null
+++ b/src/stream/stream_api.c
@@ -0,0 +1,2180 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file stream/stream_api.c
+ * @brief Implementation of the stream library
+ * @author Sree Harsha Totakura
+ */
+#include "platform.h"
+#include "gnunet_common.h"
+#include "gnunet_crypto_lib.h"
+#include "gnunet_stream_lib.h"
+#include "stream_protocol.h"
+
+
+/**
+ * The maximum packet size of a stream packet
+ */
+#define MAX_PACKET_SIZE 64000
+
+/**
+ * The maximum payload a data message packet can carry
+ */
+static size_t max_payload_size =
+ MAX_PACKET_SIZE - sizeof (struct GNUNET_STREAM_DataMessage);
+
+/**
+ * Receive buffer
+ */
+#define RECEIVE_BUFFER_SIZE 4096000
+
+/**
+ * states in the Protocol
+ */
+enum State
+ {
+ /**
+ * Client initialization state
+ */
+ STATE_INIT,
+
+ /**
+ * Listener initialization state
+ */
+ STATE_LISTEN,
+
+ /**
+ * Pre-connection establishment state
+ */
+ STATE_HELLO_WAIT,
+
+ /**
+ * State where a connection has been established
+ */
+ STATE_ESTABLISHED,
+
+ /**
+ * State where the socket is closed on our side and waiting to be ACK'ed
+ */
+ STATE_RECEIVE_CLOSE_WAIT,
+
+ /**
+ * State where the socket is closed for reading
+ */
+ STATE_RECEIVE_CLOSED,
+
+ /**
+ * State where the socket is closed on our side and waiting to be ACK'ed
+ */
+ STATE_TRANSMIT_CLOSE_WAIT,
+
+ /**
+ * State where the socket is closed for writing
+ */
+ STATE_TRANSMIT_CLOSED,
+
+ /**
+ * State where the socket is closed on our side and waiting to be ACK'ed
+ */
+ STATE_CLOSE_WAIT,
+
+ /**
+ * State where the socket is closed
+ */
+ STATE_CLOSED
+ };
+
+
+/**
+ * Functions of this type are called when a message is written
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket the written message was bound to
+ */
+typedef void (*SendFinishCallback) (void *cls,
+ struct GNUNET_STREAM_Socket *socket);
+
+
+/**
+ * The send message queue
+ */
+struct MessageQueue
+{
+ /**
+ * The message
+ */
+ struct GNUNET_STREAM_MessageHeader *message;
+
+ /**
+ * Callback to be called when the message is sent
+ */
+ SendFinishCallback finish_cb;
+
+ /**
+ * The closure for finish_cb
+ */
+ void *finish_cb_cls;
+
+ /**
+ * The next message in queue. Should be NULL in the last message
+ */
+ struct MessageQueue *next;
+
+ /**
+ * The next message in queue. Should be NULL in the first message
+ */
+ struct MessageQueue *prev;
+};
+
+
+/**
+ * The STREAM Socket Handler
+ */
+struct GNUNET_STREAM_Socket
+{
+
+ /**
+ * The peer identity of the peer at the other end of the stream
+ */
+ struct GNUNET_PeerIdentity other_peer;
+
+ /**
+ * Retransmission timeout
+ */
+ struct GNUNET_TIME_Relative retransmit_timeout;
+
+ /**
+ * The Acknowledgement Bitmap
+ */
+ GNUNET_STREAM_AckBitmap ack_bitmap;
+
+ /**
+ * Time when the Acknowledgement was queued
+ */
+ struct GNUNET_TIME_Absolute ack_time_registered;
+
+ /**
+ * Queued Acknowledgement deadline
+ */
+ struct GNUNET_TIME_Relative ack_time_deadline;
+
+ /**
+ * The task for sending timely Acks
+ */
+ GNUNET_SCHEDULER_TaskIdentifier ack_task_id;
+
+ /**
+ * Task scheduled to continue a read operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_task;
+
+ /**
+ * The mesh handle
+ */
+ struct GNUNET_MESH_Handle *mesh;
+
+ /**
+ * The mesh tunnel handle
+ */
+ struct GNUNET_MESH_Tunnel *tunnel;
+
+ /**
+ * Stream open closure
+ */
+ void *open_cls;
+
+ /**
+ * Stream open callback
+ */
+ GNUNET_STREAM_OpenCallback open_cb;
+
+ /**
+ * The current transmit handle (if a pending transmit request exists)
+ */
+ struct GNUNET_MESH_TransmitHandle *transmit_handle;
+
+ /**
+ * The current message associated with the transmit handle
+ */
+ struct MessageQueue *queue_head;
+
+ /**
+ * The queue tail, should always point to the last message in queue
+ */
+ struct MessageQueue *queue_tail;
+
+ /**
+ * The write IO_handle associated with this socket
+ */
+ struct GNUNET_STREAM_IOWriteHandle *write_handle;
+
+ /**
+ * The read IO_handle associated with this socket
+ */
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+
+ /**
+ * Buffer for storing received messages
+ */
+ void *receive_buffer;
+
+ /**
+ * Task identifier for the read io timeout task
+ */
+ GNUNET_SCHEDULER_TaskIdentifier read_io_timeout_task;
+
+ /**
+ * The state of the protocol associated with this socket
+ */
+ enum State state;
+
+ /**
+ * The status of the socket
+ */
+ enum GNUNET_STREAM_Status status;
+
+ /**
+ * The number of previous timeouts; FIXME: currently not used
+ */
+ unsigned int retries;
+
+ /**
+ * The session id associated with this stream connection
+ * FIXME: Not used currently, may be removed
+ */
+ uint32_t session_id;
+
+ /**
+ * Write sequence number. Set to random when sending HELLO(client) and
+ * HELLO_ACK(server)
+ */
+ uint32_t write_sequence_number;
+
+ /**
+ * Read sequence number. This number's value is determined during handshake
+ */
+ uint32_t read_sequence_number;
+
+ /**
+ * The receiver buffer size
+ */
+ uint32_t receive_buffer_size;
+
+ /**
+ * The receiver buffer boundaries
+ */
+ uint32_t receive_buffer_boundaries[GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH];
+
+ /**
+ * receiver's available buffer after the last acknowledged packet
+ */
+ uint32_t receive_window_available;
+
+ /**
+ * The offset pointer used during write operation
+ */
+ uint32_t write_offset;
+
+ /**
+ * The offset after which we are expecting data
+ */
+ uint32_t read_offset;
+
+ /**
+ * The offset upto which user has read from the received buffer
+ */
+ uint32_t copy_offset;
+};
+
+
+/**
+ * A socket for listening
+ */
+struct GNUNET_STREAM_ListenSocket
+{
+
+ /**
+ * The mesh handle
+ */
+ struct GNUNET_MESH_Handle *mesh;
+
+ /**
+ * The callback function which is called after successful opening socket
+ */
+ GNUNET_STREAM_ListenCallback listen_cb;
+
+ /**
+ * The call back closure
+ */
+ void *listen_cb_cls;
+
+ /**
+ * The service port
+ */
+ GNUNET_MESH_ApplicationType port;
+};
+
+
+/**
+ * The IO Write Handle
+ */
+struct GNUNET_STREAM_IOWriteHandle
+{
+ /**
+ * The packet_buffers associated with this Handle
+ */
+ struct GNUNET_STREAM_DataMessage *messages[64];
+
+ /**
+ * The bitmap of this IOHandle; Corresponding bit for a message is set when
+ * it has been acknowledged by the receiver
+ */
+ GNUNET_STREAM_AckBitmap ack_bitmap;
+
+ /**
+ * Number of packets sent before waiting for an ack
+ *
+ * FIXME: Do we need this?
+ */
+ unsigned int sent_packets;
+};
+
+
+/**
+ * The IO Read Handle
+ */
+struct GNUNET_STREAM_IOReadHandle
+{
+ /**
+ * Callback for the read processor
+ */
+ GNUNET_STREAM_DataProcessor proc;
+
+ /**
+ * The closure pointer for the read processor callback
+ */
+ void *proc_cls;
+};
+
+
+/**
+ * Default value in seconds for various timeouts
+ */
+static unsigned int default_timeout = 300;
+
+
+/**
+ * Callback function for sending hello message
+ *
+ * @param cls closure the socket
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+send_message_notify (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct MessageQueue *head;
+ size_t ret;
+
+ socket->transmit_handle = NULL; /* Remove the transmit handle */
+ head = socket->queue_head;
+ if (NULL == head)
+ return 0; /* just to be safe */
+ if (0 == size) /* request timed out */
+ {
+ socket->retries++;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Message sending timed out. Retry %d \n",
+ socket->retries);
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ /* FIXME: exponential backoff */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
+ return 0;
+ }
+
+ ret = ntohs (head->message->header.size);
+ GNUNET_assert (size >= ret);
+ memcpy (buf, head->message, ret);
+ if (NULL != head->finish_cb)
+ {
+ head->finish_cb (socket, head->finish_cb_cls);
+ }
+ GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+ socket->queue_tail,
+ head);
+ GNUNET_free (head->message);
+ GNUNET_free (head);
+ head = socket->queue_head;
+ if (NULL != head) /* more pending messages to send */
+ {
+ socket->retries = 0;
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ /* FIXME: exponential backoff */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (head->message->header.size),
+ &send_message_notify,
+ socket);
+ }
+ return ret;
+}
+
+
+/**
+ * Queues a message for sending using the mesh connection of a socket
+ *
+ * @param socket the socket whose mesh connection is used
+ * @param message the message to be sent
+ * @param finish_cb the callback to be called when the message is sent
+ * @param finish_cb_cls the closure for the callback
+ */
+static void
+queue_message (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_STREAM_MessageHeader *message,
+ SendFinishCallback finish_cb,
+ void *finish_cb_cls)
+{
+ struct MessageQueue *queue_entity;
+
+ queue_entity = GNUNET_malloc (sizeof (struct MessageQueue));
+ queue_entity->message = message;
+ queue_entity->finish_cb = finish_cb;
+ queue_entity->finish_cb_cls = finish_cb_cls;
+ GNUNET_CONTAINER_DLL_insert_tail (socket->queue_head,
+ socket->queue_tail,
+ queue_entity);
+ if (NULL == socket->transmit_handle)
+ {
+ socket->retries = 0;
+ socket->transmit_handle =
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (message->header.size),
+ &send_message_notify,
+ socket);
+ }
+}
+
+
+/**
+ * Callback function for sending ack message
+ *
+ * @param cls closure the ACK message created in ack_task
+ * @param size number of bytes available in buffer
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+send_ack_notify (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_STREAM_AckMessage *ack_msg = cls;
+
+ if (0 == size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "%s called with size 0\n", __func__);
+ return 0;
+ }
+ GNUNET_assert (ack_msg->header.header.size <= size);
+
+ size = ack_msg->header.header.size;
+ memcpy (buf, ack_msg, size);
+ return size;
+}
+
+
+/**
+ * Task for sending ACK message
+ *
+ * @param cls the socket
+ * @param tc the Task context
+ */
+static void
+ack_task (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_AckMessage *ack_msg;
+
+ if (GNUNET_SCHEDULER_REASON_SHUTDOWN == tc->reason)
+ {
+ return;
+ }
+
+ socket->ack_task_id = 0;
+
+ /* Create the ACK Message */
+ ack_msg = GNUNET_malloc (sizeof (struct GNUNET_STREAM_AckMessage));
+ ack_msg->header.header.size = htons (sizeof (struct
+ GNUNET_STREAM_AckMessage));
+ ack_msg->header.header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_ACK);
+ ack_msg->bitmap = GNUNET_htonll (socket->ack_bitmap);
+ ack_msg->base_sequence_number = htonl (socket->read_sequence_number);
+ ack_msg->receive_window_remaining =
+ htonl (RECEIVE_BUFFER_SIZE - socket->receive_buffer_size);
+
+ /* Request MESH for sending ACK */
+ GNUNET_MESH_notify_transmit_ready (socket->tunnel,
+ 0, /* Corking */
+ 1, /* Priority */
+ socket->retransmit_timeout,
+ &socket->other_peer,
+ ntohs (ack_msg->header.header.size),
+ &send_ack_notify,
+ ack_msg);
+
+
+}
+
+
+/**
+ * Function to modify a bit in GNUNET_STREAM_AckBitmap
+ *
+ * @param bitmap the bitmap to modify
+ * @param bit the bit number to modify
+ * @param value GNUNET_YES to on, GNUNET_NO to off
+ */
+static void
+ackbitmap_modify_bit (GNUNET_STREAM_AckBitmap *bitmap,
+ unsigned int bit,
+ int value)
+{
+ GNUNET_assert (bit < 64);
+ if (GNUNET_YES == value)
+ *bitmap |= (1LL << bit);
+ else
+ *bitmap &= ~(1LL << bit);
+}
+
+
+/**
+ * Function to check if a bit is set in the GNUNET_STREAM_AckBitmap
+ *
+ * @param bitmap address of the bitmap that has to be checked
+ * @param bit the bit number to check
+ * @return GNUNET_YES if the bit is set; GNUNET_NO if not
+ */
+static uint8_t
+ackbitmap_is_bit_set (const GNUNET_STREAM_AckBitmap *bitmap,
+ unsigned int bit)
+{
+ GNUNET_assert (bit < 64);
+ return 0 != (*bitmap & (1LL << bit));
+}
+
+
+
+/**
+ * Function called when Data Message is sent
+ *
+ * @param cls the io_handle corresponding to the Data Message
+ * @param socket the socket which was used
+ */
+static void
+write_data_finish_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_IOWriteHandle *io_handle = cls;
+
+ io_handle->sent_packets++;
+}
+
+
+/**
+ * Writes data using the given socket. The amount of data written is limited by
+ * the receive_window_size
+ *
+ * @param socket the socket to use
+ */
+static void
+write_data (struct GNUNET_STREAM_Socket *socket)
+{
+ struct GNUNET_STREAM_IOWriteHandle *io_handle = socket->write_handle;
+ unsigned int packet;
+ int ack_packet;
+
+ ack_packet = -1;
+ /* Find the last acknowledged packet */
+ for (packet=0; packet < 64; packet++)
+ {
+ if (GNUNET_YES == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
+ ack_packet = packet;
+ else if (NULL == io_handle->messages[packet])
+ break;
+ }
+ /* Resend packets which weren't ack'ed */
+ for (packet=0; packet < ack_packet; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&io_handle->ack_bitmap,
+ packet))
+ {
+ queue_message (socket,
+ &io_handle->messages[packet]->header,
+ NULL,
+ NULL);
+ }
+ }
+ packet = ack_packet + 1;
+ /* Now send new packets if there is enough buffer space */
+ while ( (NULL != io_handle->messages[packet]) &&
+ (socket->receive_window_available >= ntohs (io_handle->messages[packet]->header.header.size)) )
+ {
+ socket->receive_window_available -= ntohs (io_handle->messages[packet]->header.header.size);
+ queue_message (socket,
+ &io_handle->messages[packet]->header,
+ &write_data_finish_cb,
+ io_handle);
+ packet++;
+ }
+}
+
+
+/**
+ * Task for calling the read processor
+ *
+ * @param cls the socket
+ * @param tc the task context
+ */
+static void
+call_read_processor (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ size_t read_size;
+ size_t valid_read_size;
+ unsigned int packet;
+ uint32_t sequence_increase;
+ uint32_t offset_increase;
+
+ socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
+ return;
+
+ GNUNET_assert (NULL != socket->read_handle);
+ GNUNET_assert (NULL != socket->read_handle->proc);
+
+ /* Check the bitmap for any holes */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (GNUNET_NO == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ packet))
+ break;
+ }
+ /* We only call read processor if we have the first packet */
+ GNUNET_assert (0 < packet);
+
+ valid_read_size =
+ socket->receive_buffer_boundaries[packet-1] - socket->copy_offset;
+
+ GNUNET_assert (0 != valid_read_size);
+
+ /* Cancel the read_io_timeout_task */
+ GNUNET_SCHEDULER_cancel (socket->read_io_timeout_task);
+ socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+
+ /* Call the data processor */
+ read_size =
+ socket->read_handle->proc (socket->read_handle->proc_cls,
+ socket->status,
+ socket->receive_buffer + socket->copy_offset,
+ valid_read_size);
+ /* Free the read handle */
+ GNUNET_free (socket->read_handle);
+ socket->read_handle = NULL;
+
+ GNUNET_assert (read_size <= valid_read_size);
+ socket->copy_offset += read_size;
+
+ /* Determine upto which packet we can remove from the buffer */
+ for (packet = 0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ if (socket->copy_offset < socket->receive_buffer_boundaries[packet])
+ break;
+
+ /* If no packets can be removed we can't move the buffer */
+ if (0 == packet) return;
+
+ sequence_increase = packet;
+
+ /* Shift the data in the receive buffer */
+ memmove (socket->receive_buffer,
+ socket->receive_buffer
+ + socket->receive_buffer_boundaries[sequence_increase-1],
+ socket->receive_buffer_size - socket->receive_buffer_boundaries[sequence_increase-1]);
+
+ /* Shift the bitmap */
+ socket->ack_bitmap = socket->ack_bitmap >> sequence_increase;
+
+ /* Set read_sequence_number */
+ socket->read_sequence_number += sequence_increase;
+
+ /* Set read_offset */
+ offset_increase = socket->receive_buffer_boundaries[sequence_increase-1];
+ socket->read_offset += offset_increase;
+
+ /* Fix copy_offset */
+ GNUNET_assert (offset_increase <= socket->copy_offset);
+ socket->copy_offset -= offset_increase;
+
+ /* Fix relative boundaries */
+ for (packet=0; packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH; packet++)
+ {
+ if (packet < GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH - sequence_increase)
+ {
+ socket->receive_buffer_boundaries[packet] =
+ socket->receive_buffer_boundaries[packet + sequence_increase]
+ - offset_increase;
+ }
+ else
+ socket->receive_buffer_boundaries[packet] = 0;
+ }
+}
+
+
+/**
+ * Cancels the existing read io handle
+ *
+ * @param cls the closure from the SCHEDULER call
+ * @param tc the task context
+ */
+static void
+read_io_timeout (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ socket->read_io_timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (socket->read_task);
+ socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_assert (NULL != socket->read_handle);
+
+ GNUNET_free (socket->read_handle);
+ socket->read_handle = NULL;
+}
+
+
+/**
+ * Handler for DATA messages; Same for both client and server
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param msg the data message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_data (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_DataMessage *msg,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ const void *payload;
+ uint32_t bytes_needed;
+ uint32_t relative_offset;
+ uint32_t relative_sequence_number;
+ uint16_t size;
+
+ size = htons (msg->header.header.size);
+ if (size < sizeof (struct GNUNET_STREAM_DataMessage))
+ {
+ GNUNET_break_op (0);
+ return GNUNET_SYSERR;
+ }
+
+ switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ case STATE_TRANSMIT_CLOSED:
+ case STATE_TRANSMIT_CLOSE_WAIT:
+
+ /* check if the message's sequence number is in the range we are
+ expecting */
+ relative_sequence_number =
+ ntohl (msg->sequence_number) - socket->read_sequence_number;
+ if ( relative_sequence_number > 64)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring received message with sequence number %d",
+ ntohl (msg->sequence_number));
+ return GNUNET_YES;
+ }
+
+ /* Check if we have to allocate the buffer */
+ size -= sizeof (struct GNUNET_STREAM_DataMessage);
+ relative_offset = ntohl (msg->offset) - socket->read_offset;
+ bytes_needed = relative_offset + size;
+
+ if (bytes_needed > socket->receive_buffer_size)
+ {
+ if (bytes_needed <= RECEIVE_BUFFER_SIZE)
+ {
+ socket->receive_buffer = GNUNET_realloc (socket->receive_buffer,
+ bytes_needed);
+ socket->receive_buffer_size = bytes_needed;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cannot accommodate packet %d as buffer is full\n",
+ ntohl (msg->sequence_number));
+ return GNUNET_YES;
+ }
+ }
+
+ /* Copy Data to buffer */
+ payload = &msg[1];
+ GNUNET_assert (relative_offset + size <= socket->receive_buffer_size);
+ memcpy (socket->receive_buffer + relative_offset,
+ payload,
+ size);
+ socket->receive_buffer_boundaries[relative_sequence_number] =
+ relative_offset + size;
+
+ /* Modify the ACK bitmap */
+ ackbitmap_modify_bit (&socket->ack_bitmap,
+ relative_sequence_number,
+ GNUNET_YES);
+
+ /* Start ACK sending task if one is not already present */
+ if (0 == socket->ack_task_id)
+ {
+ socket->ack_task_id =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_ntoh
+ (msg->ack_deadline),
+ &ack_task,
+ socket);
+ }
+
+ if ((NULL != socket->read_handle) /* A read handle is waiting */
+ /* There is no current read task */
+ && (GNUNET_SCHEDULER_NO_TASK == socket->read_task)
+ /* We have the first packet */
+ && (GNUNET_YES == ackbitmap_is_bit_set(&socket->ack_bitmap,
+ 0)))
+ {
+ socket->read_task =
+ GNUNET_SCHEDULER_add_now (&call_read_processor,
+ socket);
+ }
+
+ break;
+
+ default:
+ /* FIXME: call statistics */
+ break;
+ }
+ return GNUNET_YES;
+}
+
+/**
+ * Client's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx place to store local state associated with the tunnel
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_data (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return handle_data (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_DataMessage *) message,
+ atsi);
+}
+
+
+/**
+ * Callback to set state to ESTABLISHED
+ *
+ * @param cls the closure from queue_message FIXME: document
+ * @param socket the socket to requiring state change
+ */
+static void
+set_state_established (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining ESTABLISHED state\n");
+ socket->write_offset = 0;
+ socket->read_offset = 0;
+ socket->state = STATE_ESTABLISHED;
+}
+
+
+/**
+ * Callback to set state to HELLO_WAIT
+ *
+ * @param cls the closure from queue_message
+ * @param socket the socket to requiring state change
+ */
+static void
+set_state_hello_wait (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ GNUNET_assert (STATE_INIT == socket->state);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Attaining HELLO_WAIT state\n");
+ socket->state = STATE_HELLO_WAIT;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_hello_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ const struct GNUNET_STREAM_HelloAckMessage *ack_msg;
+ struct GNUNET_STREAM_HelloAckMessage *reply;
+
+ ack_msg = (const struct GNUNET_STREAM_HelloAckMessage *) message;
+ GNUNET_assert (socket->tunnel == tunnel);
+ switch (socket->state)
+ {
+ case STATE_HELLO_WAIT:
+ socket->read_sequence_number = ntohl (ack_msg->sequence_number);
+ socket->receive_window_available = ntohl (ack_msg->receive_window_size);
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ reply =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ reply->header.header.size =
+ htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ reply->sequence_number = htonl (socket->write_sequence_number);
+ reply->receive_window_size = htonl (RECEIVE_BUFFER_SIZE);
+ queue_message (socket,
+ &reply->header,
+ &set_state_established,
+ NULL);
+ return GNUNET_OK;
+ case STATE_ESTABLISHED:
+ case STATE_RECEIVE_CLOSE_WAIT:
+ // call statistics (# ACKs ignored++)
+ return GNUNET_OK;
+ case STATE_INIT:
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Server sent HELLO_ACK when in state %d\n", socket->state);
+ socket->state = STATE_CLOSED; // introduce STATE_ERROR?
+ return GNUNET_SYSERR;
+ }
+
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_reset (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Common message handler for handling TRANSMIT_CLOSE messages
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param msg the transmit close message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_transmit_close (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_MessageHeader *msg,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_MessageHeader *reply;
+
+ switch (socket->state)
+ {
+ case STATE_ESTABLISHED:
+ socket->state = STATE_RECEIVE_CLOSED;
+
+ /* Send TRANSMIT_CLOSE_ACK */
+ reply = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK);
+ reply->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ queue_message (socket, reply, NULL, NULL);
+ break;
+
+ default:
+ /* FIXME: Call statistics? */
+ break;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_transmit_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return handle_transmit_close (socket,
+ tunnel,
+ sender,
+ (struct GNUNET_STREAM_MessageHeader *)message,
+ atsi);
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_transmit_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_receive_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_receive_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Client's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
+ *
+ * @param cls the socket (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx this is NULL
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+
+ return GNUNET_OK;
+}
+
+/*****************************/
+/* Server's Message Handlers */
+/*****************************/
+
+/**
+ * Server's message Handler for GNUNET_MESSAGE_TYPE_STREAM_DATA
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_data (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return handle_data (socket,
+ tunnel,
+ sender,
+ (const struct GNUNET_STREAM_DataMessage *)message,
+ atsi);
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_hello (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+ struct GNUNET_STREAM_HelloAckMessage *reply;
+
+ GNUNET_assert (socket->tunnel == tunnel);
+ if (STATE_INIT == socket->state)
+ {
+ /* Get the random sequence number */
+ socket->write_sequence_number =
+ GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_NONCE, UINT32_MAX);
+ reply =
+ GNUNET_malloc (sizeof (struct GNUNET_STREAM_HelloAckMessage));
+ reply->header.header.size =
+ htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ reply->header.header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK);
+ reply->sequence_number = htonl (socket->write_sequence_number);
+ queue_message (socket,
+ &reply->header,
+ &set_state_hello_wait,
+ NULL);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client sent HELLO when in state %d\n", socket->state);
+ /* FIXME: Send RESET? */
+
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_hello_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+ const struct GNUNET_STREAM_HelloAckMessage *ack_message;
+
+ ack_message = (struct GNUNET_STREAM_HelloAckMessage *) message;
+ GNUNET_assert (socket->tunnel == tunnel);
+ if (STATE_HELLO_WAIT == socket->state)
+ {
+ socket->read_sequence_number = ntohl (ack_message->sequence_number);
+ socket->receive_window_available =
+ ntohl (ack_message->receive_window_size);
+ /* Attain ESTABLISHED state */
+ set_state_established (NULL, socket);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Client sent HELLO_ACK when in state %d\n", socket->state);
+ /* FIXME: Send RESET? */
+
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RESET
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_reset (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_transmit_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return handle_transmit_close (socket,
+ tunnel,
+ sender,
+ (struct GNUNET_STREAM_MessageHeader *)message,
+ atsi);
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_transmit_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_receive_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_receive_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_close (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Server's message handler for GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK
+ *
+ * @param cls the closure
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx the socket
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_close_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+
+ return GNUNET_OK;
+}
+
+
+/**
+ * Message Handler for mesh
+ *
+ * @param socket the socket through which the ack was received
+ * @param tunnel connection to the other end
+ * @param sender who sent the message
+ * @param ack the acknowledgment message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+handle_ack (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_STREAM_AckMessage *ack,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ switch (socket->state)
+ {
+ case (STATE_ESTABLISHED):
+ if (NULL == socket->write_handle)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received DATA ACK when write_handle is NULL\n");
+ return GNUNET_OK;
+ }
+
+ socket->write_handle->ack_bitmap = GNUNET_ntohll (ack->bitmap);
+ socket->receive_window_available =
+ ntohl (ack->receive_window_remaining);
+ write_data (socket);
+ break;
+ default:
+ break;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Message Handler for mesh
+ *
+ * @param cls the 'struct GNUNET_STREAM_Socket'
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx unused
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+client_handle_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
+
+ return handle_ack (socket, tunnel, sender, ack, atsi);
+}
+
+
+/**
+ * Message Handler for mesh
+ *
+ * @param cls the server's listen socket
+ * @param tunnel connection to the other end
+ * @param tunnel_ctx pointer to the 'struct GNUNET_STREAM_Socket*'
+ * @param sender who sent the message
+ * @param message the actual message
+ * @param atsi performance data for the connection
+ * @return GNUNET_OK to keep the connection open,
+ * GNUNET_SYSERR to close it (signal serious error)
+ */
+static int
+server_handle_ack (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ void **tunnel_ctx,
+ const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information*atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = *tunnel_ctx;
+ const struct GNUNET_STREAM_AckMessage *ack = (const struct GNUNET_STREAM_AckMessage *) message;
+
+ return handle_ack (socket, tunnel, sender, ack, atsi);
+}
+
+
+/**
+ * For client message handlers, the stream socket is in the
+ * closure argument.
+ */
+static struct GNUNET_MESH_MessageHandler client_message_handlers[] = {
+ {&client_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
+ {&client_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
+ sizeof (struct GNUNET_STREAM_AckMessage) },
+ {&client_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
+ sizeof (struct GNUNET_STREAM_HelloAckMessage)},
+ {&client_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&client_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {NULL, 0, 0}
+};
+
+
+/**
+ * For server message handlers, the stream socket is in the
+ * tunnel context, and the listen socket in the closure argument.
+ */
+static struct GNUNET_MESH_MessageHandler server_message_handlers[] = {
+ {&server_handle_data, GNUNET_MESSAGE_TYPE_STREAM_DATA, 0},
+ {&server_handle_ack, GNUNET_MESSAGE_TYPE_STREAM_ACK,
+ sizeof (struct GNUNET_STREAM_AckMessage) },
+ {&server_handle_hello, GNUNET_MESSAGE_TYPE_STREAM_HELLO,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_hello_ack, GNUNET_MESSAGE_TYPE_STREAM_HELLO_ACK,
+ sizeof (struct GNUNET_STREAM_HelloAckMessage)},
+ {&server_handle_reset, GNUNET_MESSAGE_TYPE_STREAM_RESET,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_transmit_close, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_transmit_close_ack, GNUNET_MESSAGE_TYPE_STREAM_TRANSMIT_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_receive_close, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_receive_close_ack, GNUNET_MESSAGE_TYPE_STREAM_RECEIVE_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_close, GNUNET_MESSAGE_TYPE_STREAM_CLOSE,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {&server_handle_close_ack, GNUNET_MESSAGE_TYPE_STREAM_CLOSE_ACK,
+ sizeof (struct GNUNET_STREAM_MessageHeader)},
+ {NULL, 0, 0}
+};
+
+
+/**
+ * Function called when our target peer is connected to our tunnel
+ *
+ * @param cls the socket for which this tunnel is created
+ * @param peer the peer identity of the target
+ * @param atsi performance data for the connection
+ */
+static void
+mesh_peer_connect_callback (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_ATS_Information * atsi)
+{
+ struct GNUNET_STREAM_Socket *socket = cls;
+ struct GNUNET_STREAM_MessageHeader *message;
+
+ if (0 != memcmp (&socket->other_peer,
+ peer,
+ sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "A peer (%s) which is not our target has connected to our tunnel",
+ GNUNET_i2s (peer));
+ return;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Target peer %s connected\n", GNUNET_i2s (peer));
+
+ /* Set state to INIT */
+ socket->state = STATE_INIT;
+
+ /* Send HELLO message */
+ message = GNUNET_malloc (sizeof (struct GNUNET_STREAM_MessageHeader));
+ message->header.type = htons (GNUNET_MESSAGE_TYPE_STREAM_HELLO);
+ message->header.size = htons (sizeof (struct GNUNET_STREAM_MessageHeader));
+ queue_message (socket,
+ message,
+ &set_state_hello_wait,
+ NULL);
+
+ /* Call open callback */
+ if (NULL == socket->open_cb)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "STREAM_open callback is NULL\n");
+ }
+ else
+ {
+ socket->open_cb (socket->open_cls, socket);
+ }
+}
+
+
+/**
+ * Function called when our target peer is disconnected from our tunnel
+ *
+ * @param cls the socket associated which this tunnel
+ * @param peer the peer identity of the target
+ */
+static void
+mesh_peer_disconnect_callback (void *cls,
+ const struct GNUNET_PeerIdentity *peer)
+{
+
+}
+
+
+/*****************/
+/* API functions */
+/*****************/
+
+
+/**
+ * Tries to open a stream to the target peer
+ *
+ * @param cfg configuration to use
+ * @param target the target peer to which the stream has to be opened
+ * @param app_port the application port number which uniquely identifies this
+ * stream
+ * @param open_cb this function will be called after stream has be established
+ * @param open_cb_cls the closure for open_cb
+ * @param ... options to the stream, terminated by GNUNET_STREAM_OPTION_END
+ * @return if successful it returns the stream socket; NULL if stream cannot be
+ * opened
+ */
+struct GNUNET_STREAM_Socket *
+GNUNET_STREAM_open (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ const struct GNUNET_PeerIdentity *target,
+ GNUNET_MESH_ApplicationType app_port,
+ GNUNET_STREAM_OpenCallback open_cb,
+ void *open_cb_cls,
+ ...)
+{
+ struct GNUNET_STREAM_Socket *socket;
+ enum GNUNET_STREAM_Option option;
+ va_list vargs; /* Variable arguments */
+
+ socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
+ socket->other_peer = *target;
+ socket->open_cb = open_cb;
+ socket->open_cls = open_cb_cls;
+
+ /* Set defaults */
+ socket->retransmit_timeout =
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, default_timeout);
+
+ va_start (vargs, open_cb_cls); /* Parse variable args */
+ do {
+ option = va_arg (vargs, enum GNUNET_STREAM_Option);
+ switch (option)
+ {
+ case GNUNET_STREAM_OPTION_INITIAL_RETRANSMIT_TIMEOUT:
+ /* Expect struct GNUNET_TIME_Relative */
+ socket->retransmit_timeout = va_arg (vargs,
+ struct GNUNET_TIME_Relative);
+ break;
+ case GNUNET_STREAM_OPTION_END:
+ break;
+ }
+ } while (GNUNET_STREAM_OPTION_END != option);
+ va_end (vargs); /* End of variable args parsing */
+
+ socket->mesh = GNUNET_MESH_connect (cfg, /* the configuration handle */
+ 1, /* QUEUE size as parameter? */
+ socket, /* cls */
+ NULL, /* No inbound tunnel handler */
+ NULL, /* No inbound tunnel cleaner */
+ client_message_handlers,
+ NULL); /* We don't get inbound tunnels */
+ // FIXME: if (NULL == socket->mesh) ...
+
+ /* Now create the mesh tunnel to target */
+ socket->tunnel = GNUNET_MESH_tunnel_create (socket->mesh,
+ NULL, /* Tunnel context */
+ &mesh_peer_connect_callback,
+ &mesh_peer_disconnect_callback,
+ socket);
+ // FIXME: if (NULL == socket->tunnel) ...
+
+ return socket;
+}
+
+
+/**
+ * Closes the stream
+ *
+ * @param socket the stream socket
+ */
+void
+GNUNET_STREAM_close (struct GNUNET_STREAM_Socket *socket)
+{
+ struct MessageQueue *head;
+
+ if (socket->read_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ /* socket closed with read task pending!? */
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_cancel (socket->read_task);
+ socket->read_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+
+ /* Clear Transmit handles */
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+
+ /* Clear existing message queue */
+ while (NULL != (head = socket->queue_head)) {
+ GNUNET_CONTAINER_DLL_remove (socket->queue_head,
+ socket->queue_tail,
+ head);
+ GNUNET_free (head->message);
+ GNUNET_free (head);
+ }
+
+ /* Close associated tunnel */
+ if (NULL != socket->tunnel)
+ {
+ GNUNET_MESH_tunnel_destroy (socket->tunnel);
+ socket->tunnel = NULL;
+ }
+
+ /* Close mesh connection */
+ if (NULL != socket->mesh)
+ {
+ GNUNET_MESH_disconnect (socket->mesh);
+ socket->mesh = NULL;
+ }
+
+ /* Release receive buffer */
+ if (NULL != socket->receive_buffer)
+ {
+ GNUNET_free (socket->receive_buffer);
+ }
+
+ GNUNET_free (socket);
+}
+
+
+/**
+ * Method called whenever a peer creates a tunnel to us
+ *
+ * @param cls closure
+ * @param tunnel new handle to the tunnel
+ * @param initiator peer that started the tunnel
+ * @param atsi performance information for the tunnel
+ * @return initial tunnel context for the tunnel
+ * (can be NULL -- that's not an error)
+ */
+static void *
+new_tunnel_notify (void *cls,
+ struct GNUNET_MESH_Tunnel *tunnel,
+ const struct GNUNET_PeerIdentity *initiator,
+ const struct GNUNET_ATS_Information *atsi)
+{
+ struct GNUNET_STREAM_ListenSocket *lsocket = cls;
+ struct GNUNET_STREAM_Socket *socket;
+
+ socket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_Socket));
+ socket->tunnel = tunnel;
+ socket->session_id = 0; /* FIXME */
+ socket->other_peer = *initiator;
+ socket->state = STATE_INIT;
+
+ if (GNUNET_SYSERR == lsocket->listen_cb (lsocket->listen_cb_cls,
+ socket,
+ &socket->other_peer))
+ {
+ socket->state = STATE_CLOSED;
+ /* FIXME: Send CLOSE message and then free */
+ GNUNET_free (socket);
+ GNUNET_MESH_tunnel_destroy (tunnel); /* Destroy the tunnel */
+ }
+ return socket;
+}
+
+
+/**
+ * Function called whenever an inbound tunnel is destroyed. Should clean up
+ * any associated state. This function is NOT called if the client has
+ * explicitly asked for the tunnel to be destroyed using
+ * GNUNET_MESH_tunnel_destroy. It must NOT call GNUNET_MESH_tunnel_destroy on
+ * the tunnel.
+ *
+ * @param cls closure (set from GNUNET_MESH_connect)
+ * @param tunnel connection to the other end (henceforth invalid)
+ * @param tunnel_ctx place where local state associated
+ * with the tunnel is stored
+ */
+static void
+tunnel_cleaner (void *cls,
+ const struct GNUNET_MESH_Tunnel *tunnel,
+ void *tunnel_ctx)
+{
+ struct GNUNET_STREAM_Socket *socket = tunnel_ctx;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer %s has terminated connection abruptly\n",
+ GNUNET_i2s (&socket->other_peer));
+
+ socket->status = GNUNET_STREAM_SHUTDOWN;
+ /* Clear Transmit handles */
+ if (NULL != socket->transmit_handle)
+ {
+ GNUNET_MESH_notify_transmit_ready_cancel (socket->transmit_handle);
+ socket->transmit_handle = NULL;
+ }
+ socket->tunnel = NULL;
+}
+
+
+/**
+ * Listens for stream connections for a specific application ports
+ *
+ * @param cfg the configuration to use
+ * @param app_port the application port for which new streams will be accepted
+ * @param listen_cb this function will be called when a peer tries to establish
+ * a stream with us
+ * @param listen_cb_cls closure for listen_cb
+ * @return listen socket, NULL for any error
+ */
+struct GNUNET_STREAM_ListenSocket *
+GNUNET_STREAM_listen (const struct GNUNET_CONFIGURATION_Handle *cfg,
+ GNUNET_MESH_ApplicationType app_port,
+ GNUNET_STREAM_ListenCallback listen_cb,
+ void *listen_cb_cls)
+{
+ /* FIXME: Add variable args for passing configration options? */
+ struct GNUNET_STREAM_ListenSocket *lsocket;
+ GNUNET_MESH_ApplicationType app_types[2];
+
+ app_types[0] = app_port;
+ app_types[1] = 0;
+ lsocket = GNUNET_malloc (sizeof (struct GNUNET_STREAM_ListenSocket));
+ lsocket->port = app_port;
+ lsocket->listen_cb = listen_cb;
+ lsocket->listen_cb_cls = listen_cb_cls;
+ lsocket->mesh = GNUNET_MESH_connect (cfg,
+ 10, /* FIXME: QUEUE size as parameter? */
+ lsocket, /* Closure */
+ &new_tunnel_notify,
+ &tunnel_cleaner,
+ server_message_handlers,
+ app_types);
+ return lsocket;
+}
+
+
+/**
+ * Closes the listen socket
+ *
+ * @param lsocket the listen socket
+ */
+void
+GNUNET_STREAM_listen_close (struct GNUNET_STREAM_ListenSocket *lsocket)
+{
+ /* Close MESH connection */
+ GNUNET_MESH_disconnect (lsocket->mesh);
+
+ GNUNET_free (lsocket);
+}
+
+
+/**
+ * Tries to write the given data to the stream
+ *
+ * @param socket the socket representing a stream
+ * @param data the data buffer from where the data is written into the stream
+ * @param size the number of bytes to be written from the data buffer
+ * @param timeout the timeout period
+ * @param write_cont the function to call upon writing some bytes into the stream
+ * @param write_cont_cls the closure
+ * @return handle to cancel the operation
+ */
+struct GNUNET_STREAM_IOWriteHandle *
+GNUNET_STREAM_write (struct GNUNET_STREAM_Socket *socket,
+ const void *data,
+ size_t size,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_STREAM_CompletionContinuation write_cont,
+ void *write_cont_cls)
+{
+ unsigned int num_needed_packets;
+ unsigned int packet;
+ struct GNUNET_STREAM_IOWriteHandle *io_handle;
+ uint32_t packet_size;
+ uint32_t payload_size;
+ struct GNUNET_STREAM_DataMessage *data_msg;
+ const void *sweep;
+
+ /* Return NULL if there is already a write request pending */
+ if (NULL != socket->write_handle)
+ {
+ GNUNET_break (0);
+ return NULL;
+ }
+ if (!((STATE_ESTABLISHED == socket->state)
+ || (STATE_RECEIVE_CLOSE_WAIT == socket->state)
+ || (STATE_RECEIVE_CLOSED == socket->state)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Attempting to write on a closed (OR) not-yet-established"
+ "stream\n");
+ return NULL;
+ }
+ if (GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size < size)
+ size = GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH * max_payload_size;
+ num_needed_packets = (size + (max_payload_size - 1)) / max_payload_size;
+ io_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOWriteHandle));
+ sweep = data;
+ /* Divide the given buffer into packets for sending */
+ for (packet=0; packet < num_needed_packets; packet++)
+ {
+ if ((packet + 1) * max_payload_size < size)
+ {
+ payload_size = max_payload_size;
+ packet_size = MAX_PACKET_SIZE;
+ }
+ else
+ {
+ payload_size = size - packet * max_payload_size;
+ packet_size = payload_size + sizeof (struct
+ GNUNET_STREAM_DataMessage);
+ }
+ io_handle->messages[packet] = GNUNET_malloc (packet_size);
+ io_handle->messages[packet]->header.header.size = htons (packet_size);
+ io_handle->messages[packet]->header.header.type =
+ htons (GNUNET_MESSAGE_TYPE_STREAM_DATA);
+ io_handle->messages[packet]->sequence_number =
+ htons (socket->write_sequence_number++);
+ io_handle->messages[packet]->offset = htons (socket->write_offset);
+
+ /* FIXME: Remove the fixed delay for ack deadline; Set it to the value
+ determined from RTT */
+ io_handle->messages[packet]->ack_deadline =
+ GNUNET_TIME_relative_hton (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5));
+ data_msg = io_handle->messages[packet];
+ /* Copy data from given buffer to the packet */
+ memcpy (&data_msg[1],
+ sweep,
+ payload_size);
+ sweep += payload_size;
+ socket->write_offset += payload_size;
+ }
+ socket->write_handle = io_handle;
+ write_data (socket);
+
+ return io_handle;
+}
+
+
+/**
+ * Tries to read data from the stream
+ *
+ * @param socket the socket representing a stream
+ * @param timeout the timeout period
+ * @param proc function to call with data (once only)
+ * @param proc_cls the closure for proc
+ * @return handle to cancel the operation
+ */
+struct GNUNET_STREAM_IOReadHandle *
+GNUNET_STREAM_read (struct GNUNET_STREAM_Socket *socket,
+ struct GNUNET_TIME_Relative timeout,
+ GNUNET_STREAM_DataProcessor proc,
+ void *proc_cls)
+{
+ struct GNUNET_STREAM_IOReadHandle *read_handle;
+
+ /* Return NULL if there is already a read handle; the user has to cancel that
+ first before continuing or has to wait until it is completed */
+ if (NULL != socket->read_handle) return NULL;
+
+ read_handle = GNUNET_malloc (sizeof (struct GNUNET_STREAM_IOReadHandle));
+ read_handle->proc = proc;
+ socket->read_handle = read_handle;
+
+ /* Check if we have a packet at bitmap 0 */
+ if (GNUNET_YES == ackbitmap_is_bit_set (&socket->ack_bitmap,
+ 0))
+ {
+ socket->read_task = GNUNET_SCHEDULER_add_now (&call_read_processor,
+ socket);
+
+ }
+
+ /* Setup the read timeout task */
+ socket->read_io_timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
+ &read_io_timeout,
+ socket);
+ return read_handle;
+}
diff --git a/src/stream/stream_protocol.h b/src/stream/stream_protocol.h
new file mode 100644
index 0000000..0c1987e
--- /dev/null
+++ b/src/stream/stream_protocol.h
@@ -0,0 +1,197 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file stream/stream_protocol.h
+ * @brief P2P protocol for the stream connections
+ * @author Sree Harsha Totakura
+ */
+
+#ifndef STREAM_PROTOCOL_H
+#define STREAM_PROTOCOL_H
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0 /* keep Emacsens' auto-indent happy */
+}
+#endif
+#endif
+
+#include "gnunet_util_lib.h"
+
+GNUNET_NETWORK_STRUCT_BEGIN
+
+
+/**
+ * The stream message header
+ * All messages of STREAM should commonly have this as header
+ */
+struct GNUNET_STREAM_MessageHeader
+{
+ /**
+ * The GNUNET message header, types are from GNUNET_MESSAGE_TYPE_STREAM_*-range.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * A number which identifies a session between the two peers. FIXME: not needed
+ */
+ uint32_t session_id GNUNET_PACKED;
+
+};
+
+
+/**
+ * The Data message, should be prefixed with stream header with its type set to
+ * GNUNET_STREAM_Data
+ */
+struct GNUNET_STREAM_DataMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_STREAM_DATA
+ */
+ struct GNUNET_STREAM_MessageHeader header;
+
+ /**
+ * Sequence number; starts with a random value. (Just in case
+ * someone breaks mesh and is able to try to do a Sequence
+ * Prediction Attack on us.)
+ */
+ uint32_t sequence_number GNUNET_PACKED;
+
+ /**
+ * number of milliseconds to the soft deadline for sending acknowledgement
+ * measured from the time this message is received. It is optimal for the
+ * communication to send the ack within the soft deadline
+ */
+ struct GNUNET_TIME_RelativeNBO ack_deadline;
+
+ /**
+ * Offset of the packet in the overall stream, modulo 2^32; allows
+ * the receiver to calculate where in the destination buffer the
+ * message should be placed. In network byte order.
+ */
+ uint32_t offset GNUNET_PACKED;
+
+ /**
+ * The data should be appended here
+ */
+};
+
+
+/**
+ * Number of bits in GNUNET_STREAM_AckBitmap
+ */
+#define GNUNET_STREAM_ACK_BITMAP_BIT_LENGTH 64
+
+/**
+ * The Selective Acknowledgement Bitmap
+ */
+typedef uint64_t GNUNET_STREAM_AckBitmap;
+
+
+/**
+ * The Acknowledgment Message to confirm receipt of DATA.
+ */
+struct GNUNET_STREAM_AckMessage
+{
+
+ /**
+ * Type is GNUNET_MESSAGE_TYPE_STREAM_ACK
+ */
+ struct GNUNET_STREAM_MessageHeader header;
+
+ /**
+ * The Selective Acknowledgement Bitmap. Computed relative to the base_seq
+ * (bit n corresponds to the Data message with sequence number base_seq+n)
+ */
+ GNUNET_STREAM_AckBitmap bitmap GNUNET_PACKED;
+
+ /**
+ * The sequence number of the Data Message upto which the receiver has filled
+ * its buffer without any missing packets
+ *
+ * FIXME: Do we need this?
+ */
+ uint32_t base_sequence_number GNUNET_PACKED;
+
+ /**
+ * Available buffer space past the last acknowledged buffer (for flow control),
+ * in bytes.
+ */
+ uint32_t receive_window_remaining GNUNET_PACKED;
+};
+
+
+/**
+ * Message for Acknowledging HELLO
+ */
+struct GNUNET_STREAM_HelloAckMessage
+{
+ /**
+ * The stream message header
+ */
+ struct GNUNET_STREAM_MessageHeader header;
+
+ /**
+ * The selected sequence number. Following data tranmissions from the sender
+ * start with this sequence
+ */
+ uint32_t sequence_number;
+
+ /**
+ * The size(in bytes) of the receive window on the peer sending this message
+ *
+ * FIXME: Remove if not needed
+ */
+ uint32_t receive_window_size;
+};
+
+
+/**
+ * The Transmit close message(used to signal transmission is closed)
+ */
+struct GNUNET_STREAM_TransmitCloseMessage
+{
+ /**
+ * The stream message header
+ */
+ struct GNUNET_STREAM_MessageHeader header;
+
+ /**
+ * The last sequence number of the packet after which the transmission has
+ * ended
+ */
+ uint32_t final_sequence_number GNUNET_PACKED;
+};
+
+GNUNET_NETWORK_STRUCT_END
+
+
+#if 0 /** keep Emacsens' auto-indent happy */
+{
+#endif
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* STREAM_PROTOCOL_H */
diff --git a/src/stream/test_stream_local.c b/src/stream/test_stream_local.c
new file mode 100644
index 0000000..4da1258
--- /dev/null
+++ b/src/stream/test_stream_local.c
@@ -0,0 +1,386 @@
+/*
+ This file is part of GNUnet.
+ (C) 2011, 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file stream/test_stream_local.c
+ * @brief Stream API testing between local peers
+ * @author Sree Harsha Totakura
+ */
+
+#include <string.h>
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_mesh_service.h"
+#include "gnunet_stream_lib.h"
+
+#define VERBOSE 1
+
+/**
+ * Structure for holding peer's sockets and IO Handles
+ */
+struct PeerData
+{
+ /**
+ * Peer's stream socket
+ */
+ struct GNUNET_STREAM_Socket *socket;
+
+ /**
+ * Peer's io handle
+ */
+ struct GNUNET_STREAM_IOHandle *io_handle;
+
+ /**
+ * Bytes the peer has written
+ */
+ unsigned int bytes_wrote;
+
+ /**
+ * Byte the peer has read
+ */
+ unsigned int bytes_read;
+};
+
+static struct GNUNET_OS_Process *arm_pid;
+static struct PeerData peer1;
+static struct PeerData peer2;
+static struct GNUNET_STREAM_ListenSocket *peer2_listen_socket;
+
+static GNUNET_SCHEDULER_TaskIdentifier abort_task;
+static GNUNET_SCHEDULER_TaskIdentifier test_task;
+static GNUNET_SCHEDULER_TaskIdentifier read_task;
+
+static char *data = "ABCD";
+static int result;
+
+/**
+ * Shutdown nicely
+ */
+static void
+do_shutdown (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_STREAM_close (peer1.socket);
+ GNUNET_STREAM_close (peer2.socket);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: shutdown\n");
+ if (0 != abort_task)
+ {
+ GNUNET_SCHEDULER_cancel (abort_task);
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: arm\n");
+ if (0 != GNUNET_OS_process_kill (arm_pid, SIGTERM))
+ {
+ GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Wait\n");
+ GNUNET_assert (GNUNET_OK == GNUNET_OS_process_wait (arm_pid));
+ GNUNET_OS_process_close (arm_pid);
+}
+
+
+/**
+ * Something went wrong and timed out. Kill everything and set error flag
+ */
+static void
+do_abort (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: ABORT\n");
+ if (0 != test_task)
+ {
+ GNUNET_SCHEDULER_cancel (test_task);
+ }
+ if (0 != read_task)
+ {
+ GNUNET_SCHEDULER_cancel (read_task);
+ }
+ result = GNUNET_SYSERR;
+ abort_task = 0;
+ do_shutdown (cls, tc);
+}
+
+
+/**
+ * The write completion function; called upon writing some data to stream or
+ * upon error
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param size the number of bytes read or written
+ */
+static void
+write_completion (void *cls,
+ enum GNUNET_STREAM_Status status,
+ size_t size)
+{
+ struct PeerData *peer;
+
+ peer = (struct PeerData *) cls;
+ GNUNET_assert (GNUNET_STREAM_OK == status);
+ GNUNET_assert (size < strlen (data));
+ peer->bytes_wrote += size;
+
+ if (peer->bytes_wrote < strlen(data)) /* Have more data to send */
+ {
+ peer->io_handle = GNUNET_STREAM_write (peer->socket,
+ (void *) data,
+ strlen(data) - peer->bytes_wrote,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
+ GNUNET_assert (NULL != peer->io_handle);
+ }
+ else
+ {
+ if (&peer1 == peer) /* Peer1 has finished writing; should read now */
+ {
+ peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ cls);
+ GNUNET_assert (NULL!=peer->io_handle);
+ }
+ }
+}
+
+
+/**
+ * Function executed after stream has been established
+ *
+ * @param cls the closure from GNUNET_STREAM_open
+ * @param socket socket to use to communicate with the other side (read/write)
+ */
+static void
+stream_open_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket)
+{
+ struct PeerData *peer;
+
+ peer = (struct PeerData *) cls;
+ peer->bytes_wrote = 0;
+ GNUNET_assert (socket == peer1.socket);
+ GNUNET_assert (socket == peer->socket);
+ peer->io_handle = GNUNET_STREAM_write (peer->socket, /* socket */
+ (void *) data, /* data */
+ strlen(data),
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
+ GNUNET_assert (NULL != peer->io_handle);
+}
+
+
+/**
+ * Input processor
+ *
+ * @param cls the closure from GNUNET_STREAM_write/read
+ * @param status the status of the stream at the time this function is called
+ * @param data traffic from the other side
+ * @param size the number of bytes available in data read
+ * @return number of bytes of processed from 'data' (any data remaining should be
+ * given to the next time the read processor is called).
+ */
+static size_t
+input_processor (void *cls,
+ enum GNUNET_STREAM_Status status,
+ const void *input_data,
+ size_t size)
+{
+ struct PeerData *peer;
+
+ peer = (struct PeerData *) cls;
+
+ GNUNET_assert (GNUNET_STERAM_OK == status);
+ GNUNET_assert (size < strlen (data));
+ GNUNET_assert (strncmp ((const char *) data + peer->bytes_read,
+ (const char *) input_data,
+ size));
+ peer->bytes_read += size;
+
+ if (peer->bytes_read < strlen (data))
+ {
+ peer->io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ cls);
+ GNUNET_assert (NULL != peer->io_handle);
+ }
+ else
+ {
+ if (&peer2 == peer) /* Peer2 has completed reading; should write */
+ {
+ peer->bytes_wrote = 0;
+ peer->io_handle = GNUNET_STREAM_write ((struct GNUNET_STREAM_Socket *)
+ peer->socket,
+ (void *) data,
+ strlen(data),
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &write_completion,
+ cls);
+ }
+ else /* Peer1 has completed reading. End of tests */
+ {
+ GNUNET_SCHEDULER_add_now (&do_shutdown, NULL);
+ }
+ }
+ return size;
+}
+
+
+/**
+ * Scheduler call back; to be executed when a new stream is connected
+ * Called from listen connect for peer2
+ */
+static void
+stream_read (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ read_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (NULL != cls);
+ peer2.bytes_read = 0;
+ GNUNET_STREAM_listen_close (peer2_listen_socket); /* Close listen socket */
+ peer2.io_handle = GNUNET_STREAM_read ((struct GNUNET_STREAM_Socket *) cls,
+ GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 5),
+ &input_processor,
+ (void *) &peer2);
+ GNUNET_assert (NULL != peer2.io_handle);
+}
+
+
+/**
+ * Functions of this type are called upon new stream connection from other peers
+ *
+ * @param cls the closure from GNUNET_STREAM_listen
+ * @param socket the socket representing the stream
+ * @param initiator the identity of the peer who wants to establish a stream
+ * with us
+ * @return GNUNET_OK to keep the socket open, GNUNET_SYSERR to close the
+ * stream (the socket will be invalid after the call)
+ */
+static int
+stream_listen_cb (void *cls,
+ struct GNUNET_STREAM_Socket *socket,
+ const struct GNUNET_PeerIdentity *initiator)
+{
+ GNUNET_assert (NULL != socket);
+ GNUNET_assert (NULL == initiator); /* Local peer=NULL? */
+ GNUNET_assert (socket != peer1.socket);
+
+ peer2.socket = socket;
+ read_task = GNUNET_SCHEDULER_add_now (&stream_read, (void *) socket);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Testing function
+ */
+static void
+test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ test_task = GNUNET_SCHEDULER_NO_TASK;
+
+ /* Connect to stream library */
+ peer1.socket = GNUNET_STREAM_open (NULL, /* Null for local peer? */
+ 10, /* App port */
+ &stream_open_cb,
+ (void *) &peer1);
+ GNUNET_assert (NULL != peer1.socket);
+ peer2_listen_socket = GNUNET_STREAM_listen (10 /* App port */
+ &stream_listen_cb,
+ NULL);
+ GNUNET_assert (NULL != peer2_listen_socket);
+
+}
+
+/**
+ * Initialize framework and start test
+ */
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+{
+ GNUNET_log_setup ("test_stream_local",
+#if VERBOSE
+ "DEBUG",
+#else
+ "WARNING",
+#endif
+ NULL);
+ arm_pid =
+ GNUNET_OS_start_process (GNUNET_YES, NULL, NULL, "gnunet-service-arm",
+ "gnunet-service-arm",
+#if VERBOSE_ARM
+ "-L", "DEBUG",
+#endif
+ "-c", "test_stream_local.conf", NULL);
+
+ abort_task =
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
+ (GNUNET_TIME_UNIT_SECONDS, 20), &do_abort,
+ NULL);
+
+ test_task = GNUNET_SCHEDULER_add_now (&test, (void *) cfg);
+
+}
+
+/**
+ * Main function
+ */
+int main (int argc, char **argv)
+{
+ int ret;
+
+ char *const argv2[] = { "test-stream-local",
+ "-c", "test_stream.conf",
+#if VERBOSE
+ "-L", "DEBUG",
+#endif
+ NULL
+ };
+
+ struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_OPTION_END
+ };
+
+ ret =
+ GNUNET_PROGRAM_run ((sizeof (argv2) / sizeof (char *)) - 1, argv2,
+ "test-stream-local", "nohelp", options, &run, NULL);
+
+ if (GNUNET_OK != ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "run failed with error code %d\n",
+ ret);
+ return 1;
+ }
+ if (GNUNET_SYSERR == result)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "test failed\n");
+ return 1;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test ok\n");
+ return 0;
+}
diff --git a/src/stream/test_stream_local.conf b/src/stream/test_stream_local.conf
new file mode 100644
index 0000000..3d955f0
--- /dev/null
+++ b/src/stream/test_stream_local.conf
@@ -0,0 +1,69 @@
+[fs]
+AUTOSTART = NO
+
+[resolver]
+AUTOSTART = NO
+
+[mesh]
+DEBUG = YES
+AUTOSTART = YES
+ACCEPT_FROM = 127.0.0.1;
+HOSTNAME = localhost
+PORT = 10511
+# PREFIX = valgrind --leak-check=full
+# PREFIX = xterm -geometry 100x85 -T peer1 -e gdb --args
+
+[dht]
+DEBUG = NO
+AUTOSTART = YES
+ACCEPT_FROM6 = ::1;
+ACCEPT_FROM = 127.0.0.1;
+HOSTNAME = localhost
+PORT = 2100
+
+[block]
+plugins = dht test
+
+[dhtcache]
+QUOTA = 1 MB
+DATABASE = sqlite
+
+[transport]
+PLUGINS = tcp
+DEBUG = NO
+ACCEPT_FROM6 = ::1;
+ACCEPT_FROM = 127.0.0.1;
+NEIGHBOUR_LIMIT = 50
+PORT = 12365
+
+[ats]
+WAN_QUOTA_OUT = 3932160
+WAN_QUOTA_IN = 3932160
+
+[core]
+PORT = 12092
+
+[arm]
+DEFAULTSERVICES = core
+PORT = 12366
+DEBUG = NO
+
+[transport-tcp]
+TIMEOUT = 300 s
+PORT = 12368
+
+[TESTING]
+WEAKRANDOM = YES
+
+[gnunetd]
+HOSTKEY = $SERVICEHOME/.hostkey
+
+[PATHS]
+DEFAULTCONFIG = test_mesh.conf
+SERVICEHOME = /tmp/test-mesh/
+
+[dns]
+AUTOSTART = NO
+
+[nse]
+AUTOSTART = NO