aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
authorBertrand Marc <beberking@gmail.com>2012-06-06 20:47:48 +0200
committerBertrand Marc <beberking@gmail.com>2012-06-06 20:47:48 +0200
commit740b30688bd745a527f96f9116c19acb3480971a (patch)
tree2709a3f4dba11c174aa9e1ba3612e30c578e76a9 /src/dht
parent2b81464a43485fcc8ce079fafdee7b7a171835f4 (diff)
Imported Upstream version 0.9.3upstream/0.9.3
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/Makefile.am11
-rw-r--r--src/dht/Makefile.in41
-rw-r--r--src/dht/dht.conf.in2
-rw-r--r--src/dht/dht.h179
-rw-r--r--src/dht/dht_api.c605
-rw-r--r--src/dht/gnunet-dht-get.c2
-rw-r--r--src/dht/gnunet-dht-monitor.c325
-rw-r--r--src/dht/gnunet-dht-put.c30
-rw-r--r--src/dht/gnunet-service-dht_clients.c437
-rw-r--r--src/dht/gnunet-service-dht_clients.h86
-rw-r--r--src/dht/gnunet-service-dht_datacache.c2
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c84
-rw-r--r--src/dht/gnunet-service-dht_neighbours.h9
-rw-r--r--src/dht/gnunet-service-dht_routing.c67
-rw-r--r--src/dht/plugin_block_dht.c14
-rw-r--r--src/dht/test_dht_2dtorus.conf4
-rw-r--r--src/dht/test_dht_api.c8
-rw-r--r--src/dht/test_dht_api_data.conf3
-rw-r--r--src/dht/test_dht_api_peer1.conf2
-rw-r--r--src/dht/test_dht_line.conf2
-rw-r--r--src/dht/test_dht_monitor.c177
-rw-r--r--src/dht/test_dht_multipeer.c31
-rw-r--r--src/dht/test_dht_multipeer_data.conf7
-rw-r--r--src/dht/test_dht_topo.c107
-rw-r--r--src/dht/test_dht_twopeer.c21
-rw-r--r--src/dht/test_dht_twopeer_data.conf8
-rw-r--r--src/dht/test_dht_twopeer_get_put.c21
-rw-r--r--src/dht/test_dht_twopeer_path_tracking.c4
-rw-r--r--src/dht/test_dht_twopeer_put_get.c42
29 files changed, 1740 insertions, 591 deletions
diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am
index 93880c2..b2d18d2 100644
--- a/src/dht/Makefile.am
+++ b/src/dht/Makefile.am
@@ -50,6 +50,7 @@ libgnunet_plugin_block_dht_la_DEPENDENCIES = \
bin_PROGRAMS = \
gnunet-service-dht \
+ gnunet-dht-monitor \
gnunet-dht-get \
gnunet-dht-put
@@ -92,6 +93,16 @@ gnunet_dht_put_LDADD = \
gnunet_dht_put_DEPENDENCIES = \
libgnunetdht.la
+gnunet_dht_monitor_SOURCES = \
+ gnunet-dht-monitor.c
+gnunet_dht_monitor_LDADD = \
+ $(top_builddir)/src/dht/libgnunetdht.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+gnunet_dht_monitor_DEPENDENCIES = \
+ libgnunetdht.la
+
+
check_PROGRAMS = \
test_dht_api \
test_dht_twopeer \
diff --git a/src/dht/Makefile.in b/src/dht/Makefile.in
index 97e9c59..12ce558 100644
--- a/src/dht/Makefile.in
+++ b/src/dht/Makefile.in
@@ -37,8 +37,8 @@ POST_UNINSTALL = :
build_triplet = @build@
host_triplet = @host@
target_triplet = @target@
-bin_PROGRAMS = gnunet-service-dht$(EXEEXT) gnunet-dht-get$(EXEEXT) \
- gnunet-dht-put$(EXEEXT)
+bin_PROGRAMS = gnunet-service-dht$(EXEEXT) gnunet-dht-monitor$(EXEEXT) \
+ gnunet-dht-get$(EXEEXT) gnunet-dht-put$(EXEEXT)
check_PROGRAMS = test_dht_api$(EXEEXT) test_dht_twopeer$(EXEEXT) \
test_dht_twopeer_put_get$(EXEEXT) \
test_dht_twopeer_get_put$(EXEEXT) \
@@ -123,6 +123,8 @@ libgnunetdht_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \
PROGRAMS = $(bin_PROGRAMS)
am_gnunet_dht_get_OBJECTS = gnunet-dht-get.$(OBJEXT)
gnunet_dht_get_OBJECTS = $(am_gnunet_dht_get_OBJECTS)
+am_gnunet_dht_monitor_OBJECTS = gnunet-dht-monitor.$(OBJEXT)
+gnunet_dht_monitor_OBJECTS = $(am_gnunet_dht_monitor_OBJECTS)
am_gnunet_dht_put_OBJECTS = gnunet-dht-put.$(OBJEXT)
gnunet_dht_put_OBJECTS = $(am_gnunet_dht_put_OBJECTS)
am_gnunet_service_dht_OBJECTS = gnunet-service-dht.$(OBJEXT) \
@@ -208,19 +210,21 @@ am__v_GEN_ = $(am__v_GEN_$(AM_DEFAULT_VERBOSITY))
am__v_GEN_0 = @echo " GEN " $@;
SOURCES = $(libgnunet_plugin_block_dht_la_SOURCES) \
$(libgnunetdht_la_SOURCES) $(gnunet_dht_get_SOURCES) \
- $(gnunet_dht_put_SOURCES) $(gnunet_service_dht_SOURCES) \
- $(test_dht_2dtorus_SOURCES) $(test_dht_api_SOURCES) \
- $(test_dht_line_SOURCES) $(test_dht_monitor_SOURCES) \
- $(test_dht_multipeer_SOURCES) $(test_dht_twopeer_SOURCES) \
+ $(gnunet_dht_monitor_SOURCES) $(gnunet_dht_put_SOURCES) \
+ $(gnunet_service_dht_SOURCES) $(test_dht_2dtorus_SOURCES) \
+ $(test_dht_api_SOURCES) $(test_dht_line_SOURCES) \
+ $(test_dht_monitor_SOURCES) $(test_dht_multipeer_SOURCES) \
+ $(test_dht_twopeer_SOURCES) \
$(test_dht_twopeer_get_put_SOURCES) \
$(test_dht_twopeer_path_tracking_SOURCES) \
$(test_dht_twopeer_put_get_SOURCES)
DIST_SOURCES = $(libgnunet_plugin_block_dht_la_SOURCES) \
$(libgnunetdht_la_SOURCES) $(gnunet_dht_get_SOURCES) \
- $(gnunet_dht_put_SOURCES) $(gnunet_service_dht_SOURCES) \
- $(test_dht_2dtorus_SOURCES) $(test_dht_api_SOURCES) \
- $(test_dht_line_SOURCES) $(test_dht_monitor_SOURCES) \
- $(test_dht_multipeer_SOURCES) $(test_dht_twopeer_SOURCES) \
+ $(gnunet_dht_monitor_SOURCES) $(gnunet_dht_put_SOURCES) \
+ $(gnunet_service_dht_SOURCES) $(test_dht_2dtorus_SOURCES) \
+ $(test_dht_api_SOURCES) $(test_dht_line_SOURCES) \
+ $(test_dht_monitor_SOURCES) $(test_dht_multipeer_SOURCES) \
+ $(test_dht_twopeer_SOURCES) \
$(test_dht_twopeer_get_put_SOURCES) \
$(test_dht_twopeer_path_tracking_SOURCES) \
$(test_dht_twopeer_put_get_SOURCES)
@@ -285,6 +289,7 @@ INSTALL_SCRIPT = @INSTALL_SCRIPT@
INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
INTLLIBS = @INTLLIBS@
INTL_MACOSX_LIBS = @INTL_MACOSX_LIBS@
+JAVAPORT = @JAVAPORT@
LD = @LD@
LDFLAGS = @LDFLAGS@
LIBADD_DL = @LIBADD_DL@
@@ -318,6 +323,7 @@ LT_DLLOADERS = @LT_DLLOADERS@
LT_DLPREOPEN = @LT_DLPREOPEN@
MAKEINFO = @MAKEINFO@
MKDIR_P = @MKDIR_P@
+MONKEYPREFIX = @MONKEYPREFIX@
MSGFMT = @MSGFMT@
MSGFMT_015 = @MSGFMT_015@
MSGMERGE = @MSGMERGE@
@@ -519,6 +525,17 @@ gnunet_dht_put_LDADD = \
gnunet_dht_put_DEPENDENCIES = \
libgnunetdht.la
+gnunet_dht_monitor_SOURCES = \
+ gnunet-dht-monitor.c
+
+gnunet_dht_monitor_LDADD = \
+ $(top_builddir)/src/dht/libgnunetdht.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+gnunet_dht_monitor_DEPENDENCIES = \
+ libgnunetdht.la
+
test_dht_api_SOURCES = \
test_dht_api.c
@@ -778,6 +795,9 @@ clean-checkPROGRAMS:
gnunet-dht-get$(EXEEXT): $(gnunet_dht_get_OBJECTS) $(gnunet_dht_get_DEPENDENCIES)
@rm -f gnunet-dht-get$(EXEEXT)
$(AM_V_CCLD)$(LINK) $(gnunet_dht_get_OBJECTS) $(gnunet_dht_get_LDADD) $(LIBS)
+gnunet-dht-monitor$(EXEEXT): $(gnunet_dht_monitor_OBJECTS) $(gnunet_dht_monitor_DEPENDENCIES)
+ @rm -f gnunet-dht-monitor$(EXEEXT)
+ $(AM_V_CCLD)$(LINK) $(gnunet_dht_monitor_OBJECTS) $(gnunet_dht_monitor_LDADD) $(LIBS)
gnunet-dht-put$(EXEEXT): $(gnunet_dht_put_OBJECTS) $(gnunet_dht_put_DEPENDENCIES)
@rm -f gnunet-dht-put$(EXEEXT)
$(AM_V_CCLD)$(LINK) $(gnunet_dht_put_OBJECTS) $(gnunet_dht_put_LDADD) $(LIBS)
@@ -820,6 +840,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/dht_api.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-get.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-monitor.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-put.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-service-dht.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-service-dht_clients.Po@am__quote@
diff --git a/src/dht/dht.conf.in b/src/dht/dht.conf.in
index 17c13e9..59581dc 100644
--- a/src/dht/dht.conf.in
+++ b/src/dht/dht.conf.in
@@ -1,6 +1,6 @@
[dht]
AUTOSTART = YES
-@UNIXONLY@ PORT = 2095
+@JAVAPORT@PORT = 2095
HOSTNAME = localhost
HOME = $SERVICEHOME
CONFIG = $DEFAULTCONFIG
diff --git a/src/dht/dht.h b/src/dht/dht.h
index 9894be8..8adf49f 100644
--- a/src/dht/dht.h
+++ b/src/dht/dht.h
@@ -181,6 +181,11 @@ struct GNUNET_DHT_ClientPutMessage
uint32_t desired_replication_level GNUNET_PACKED;
/**
+ * Unique ID for the PUT message.
+ */
+ uint64_t unique_id GNUNET_PACKED;
+
+ /**
* How long should this data persist?
*/
struct GNUNET_TIME_AbsoluteNBO expiration;
@@ -196,20 +201,38 @@ struct GNUNET_DHT_ClientPutMessage
/**
- * Message to monitor requests going through peer, clients <--> DHT service.
+ * Message to confirming receipt of PUT, sent from DHT service to clients.
*/
-struct GNUNET_DHT_MonitorMessage
+struct GNUNET_DHT_ClientPutConfirmationMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_{GET, PUT, GET_RESP, PUT_RESP*}
- * (*) not yet implemented, necessary for key randomization
+ * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK
*/
struct GNUNET_MessageHeader header;
/**
- * The type of data in the request.
+ * Always zero.
*/
- uint32_t type GNUNET_PACKED;
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * Unique ID from the PUT message that is being confirmed.
+ */
+ uint64_t unique_id GNUNET_PACKED;
+
+};
+
+
+
+/**
+ * Message to monitor put requests going through peer, DHT service -> clients.
+ */
+struct GNUNET_DHT_MonitorPutMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT
+ */
+ struct GNUNET_MessageHeader header;
/**
* Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
@@ -217,37 +240,165 @@ struct GNUNET_DHT_MonitorMessage
uint32_t options GNUNET_PACKED;
/**
+ * The type of data in the request.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Hop count so far.
+ */
+ uint32_t hop_count GNUNET_PACKED;
+
+ /**
* Replication level for this message
*/
uint32_t desired_replication_level GNUNET_PACKED;
/**
* Number of peers recorded in the outgoing path from source to the
- * storgage location of this message.
+ * storage location of this message.
*/
uint32_t put_path_length GNUNET_PACKED;
/**
- * The number of peer identities recorded from the storage location
- * to this peer.
+ * How long should this data persist?
*/
- uint32_t get_path_length GNUNET_PACKED;
+ struct GNUNET_TIME_AbsoluteNBO expiration_time;
/**
- * Unique ID for GET / GET responses.
+ * The key to store the value under.
*/
- uint64_t unique_id GNUNET_PACKED;
+ GNUNET_HashCode key;
+
+ /* put path (if tracked) */
+
+ /* Payload */
+};
+
+
+/**
+ * Message to request monitoring messages, clients -> DHT service.
+ */
+struct GNUNET_DHT_MonitorStartStopMessage
+{
/**
- * How long should this data persist?
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_(START|STOP)
*/
- struct GNUNET_TIME_AbsoluteNBO expiration;
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Flag whether to notify about GET messages.
+ */
+ int16_t get GNUNET_PACKED;
+
+ /**
+ * Flag whether to notify about GET_REPONSE messages.
+ */
+ int16_t get_resp GNUNET_PACKED;
+
+ /**
+ * Flag whether to notify about PUT messages.
+ */
+ int16_t put GNUNET_PACKED;
+
+ /**
+ * Flag whether to use the provided key to filter messages.
+ */
+ int16_t filter_key GNUNET_PACKED;
+
+ /**
+ * The key to filter messages by.
+ */
+ GNUNET_HashCode key;
+};
+
+
+/**
+ * Message to monitor get requests going through peer, DHT service -> clients.
+ */
+struct GNUNET_DHT_MonitorGetMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
+ */
+ uint32_t options GNUNET_PACKED;
+
+ /**
+ * The type of data in the request.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Hop count
+ */
+ uint32_t hop_count GNUNET_PACKED;
+
+ /**
+ * Replication level for this message
+ */
+ uint32_t desired_replication_level GNUNET_PACKED;
+
+ /**
+ * Number of peers recorded in the outgoing path from source to the
+ * storage location of this message.
+ */
+ uint32_t get_path_length GNUNET_PACKED;
/**
* The key to store the value under.
*/
GNUNET_HashCode key;
+ /* get path (if tracked) */
+
+};
+
+/**
+ * Message to monitor get results going through peer, DHT service -> clients.
+ */
+struct GNUNET_DHT_MonitorGetRespMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Content type.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Length of the PUT path that follows (if tracked).
+ */
+ uint32_t put_path_length GNUNET_PACKED;
+
+ /**
+ * Length of the GET path that follows (if tracked).
+ */
+ uint32_t get_path_length GNUNET_PACKED;
+
+ /**
+ * When does the content expire?
+ */
+ struct GNUNET_TIME_AbsoluteNBO expiration_time;
+
+ /**
+ * The key of the corresponding GET request.
+ */
+ GNUNET_HashCode key;
+
/* put path (if tracked) */
/* get path (if tracked) */
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 3cb13b4..420eacb 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -74,11 +74,6 @@ struct PendingMessage
void *cont_cls;
/**
- * Timeout task for this message
- */
- GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
- /**
* Unique ID for this request
*/
uint64_t unique_id;
@@ -100,6 +95,56 @@ struct PendingMessage
/**
+ * Handle to a PUT request.
+ */
+struct GNUNET_DHT_PutHandle
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_DHT_PutHandle *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_DHT_PutHandle *prev;
+
+ /**
+ * Continuation to call when done.
+ */
+ GNUNET_DHT_PutContinuation cont;
+
+ /**
+ * Pending message associated with this PUT operation,
+ * NULL after the message has been transmitted to the service.
+ */
+ struct PendingMessage *pending;
+
+ /**
+ * Main handle to this DHT api
+ */
+ struct GNUNET_DHT_Handle *dht_handle;
+
+ /**
+ * Closure for 'cont'.
+ */
+ void *cont_cls;
+
+ /**
+ * Timeout task for this operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Unique ID for the PUT operation.
+ */
+ uint64_t unique_id;
+
+};
+
+
+
+/**
* Handle to a GET request
*/
struct GNUNET_DHT_GetHandle
@@ -171,9 +216,19 @@ struct GNUNET_DHT_MonitorHandle
GNUNET_HashCode *key;
/**
- * Callback for each received message of interest.
+ * Callback for each received message of type get.
+ */
+ GNUNET_DHT_MonitorGetCB get_cb;
+
+ /**
+ * Callback for each received message of type get response.
+ */
+ GNUNET_DHT_MonitorGetRespCB get_resp_cb;
+
+ /**
+ * Callback for each received message of type put.
*/
- GNUNET_DHT_MonitorCB cb;
+ GNUNET_DHT_MonitorPutCB put_cb;
/**
* Closure for cb.
@@ -225,8 +280,18 @@ struct GNUNET_DHT_Handle
struct GNUNET_DHT_MonitorHandle *monitor_tail;
/**
- * Hash map containing the current outstanding unique requests
- * (values are of type 'struct GNUNET_DHT_RouteHandle').
+ * Head of active PUT requests.
+ */
+ struct GNUNET_DHT_PutHandle *put_head;
+
+ /**
+ * Tail of active PUT requests.
+ */
+ struct GNUNET_DHT_PutHandle *put_tail;
+
+ /**
+ * Hash map containing the current outstanding unique GET requests
+ * (values are of type 'struct GNUNET_DHT_GetHandle').
*/
struct GNUNET_CONTAINER_MultiHashMap *active_requests;
@@ -257,6 +322,8 @@ struct GNUNET_DHT_Handle
* Handler for messages received from the DHT service
* a demultiplexer which handles numerous message types
*
+ * @param cls the 'struct GNUNET_DHT_Handle'
+ * @param msg the incoming message
*/
static void
service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
@@ -265,16 +332,17 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
/**
* Try to (re)connect to the DHT service.
*
+ * @param handle DHT handle to reconnect
* @return GNUNET_YES on success, GNUNET_NO on failure.
*/
static int
try_connect (struct GNUNET_DHT_Handle *handle)
{
- if (handle->client != NULL)
+ if (NULL != handle->client)
return GNUNET_OK;
handle->in_receive = GNUNET_NO;
handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
- if (handle->client == NULL)
+ if (NULL == handle->client)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
_("Failed to connect to the DHT service!\n"));
@@ -314,6 +382,7 @@ add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
/**
* Try to send messages from list of messages to send
+ *
* @param handle DHT_Handle
*/
static void
@@ -359,17 +428,34 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
static void
do_disconnect (struct GNUNET_DHT_Handle *handle)
{
- if (handle->client == NULL)
+ struct GNUNET_DHT_PutHandle *ph;
+ struct GNUNET_DHT_PutHandle *next;
+
+ if (NULL == handle->client)
return;
- GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
if (NULL != handle->th)
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
handle->th = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Disconnecting from DHT service, will try to reconnect in %llu ms\n",
(unsigned long long) handle->retry_time.rel_value);
- GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
+ GNUNET_CLIENT_disconnect (handle->client);
handle->client = NULL;
+
+ /* signal disconnect to all PUT requests that were transmitted but waiting
+ for the put confirmation */
+ next = handle->put_head;
+ while (NULL != (ph = next))
+ {
+ next = ph->next;
+ if (NULL == ph->pending)
+ {
+ if (NULL != ph->cont)
+ ph->cont (ph->cont_cls, GNUNET_SYSERR);
+ GNUNET_DHT_put_cancel (ph);
+ }
+ }
handle->reconnect_task =
GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
}
@@ -377,6 +463,11 @@ do_disconnect (struct GNUNET_DHT_Handle *handle)
/**
* Transmit the next pending message, called by notify_transmit_ready
+ *
+ * @param cls the DHT handle
+ * @param size number of bytes available in 'buf' for transmission
+ * @param buf where to copy messages for the service
+ * @return number of bytes written to 'buf'
*/
static size_t
transmit_pending (void *cls, size_t size, void *buf);
@@ -384,20 +475,22 @@ transmit_pending (void *cls, size_t size, void *buf);
/**
* Try to send messages from list of messages to send
+ *
+ * @param handle handle to DHT
*/
static void
process_pending_messages (struct GNUNET_DHT_Handle *handle)
{
struct PendingMessage *head;
- if (handle->client == NULL)
+ if (NULL == handle->client)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "process_pending_messages called, but client is null, reconnecting\n");
+ "process_pending_messages called, but client is NULL, reconnecting\n");
do_disconnect (handle);
return;
}
- if (handle->th != NULL)
+ if (NULL != handle->th)
return;
if (NULL == (head = handle->pending_head))
return;
@@ -417,6 +510,11 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle)
/**
* Transmit the next pending message, called by notify_transmit_ready
+ *
+ * @param cls the DHT handle
+ * @param size number of bytes available in 'buf' for transmission
+ * @param buf where to copy messages for the service
+ * @return number of bytes written to 'buf'
*/
static size_t
transmit_pending (void *cls, size_t size, void *buf)
@@ -426,7 +524,7 @@ transmit_pending (void *cls, size_t size, void *buf)
size_t tsize;
handle->th = NULL;
- if (buf == NULL)
+ if (NULL == buf)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Transmission to DHT service failed! Reconnecting!\n");
@@ -446,11 +544,6 @@ transmit_pending (void *cls, size_t size, void *buf)
GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
head);
head->in_pending_queue = GNUNET_NO;
- if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (head->timeout_task);
- head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- }
if (NULL != head->cont)
{
head->cont (head->cont_cls, NULL);
@@ -533,63 +626,178 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
return GNUNET_YES;
}
-
/**
- * Process a monitoring message from the service.
+ * Process a get monitor message from the service.
*
* @param handle The DHT handle.
- * @param msg Message from the service.
+ * @param msg Monitor get message from the service.
*
* @return GNUNET_OK if everything went fine,
* GNUNET_SYSERR if the message is malformed.
*/
static int
-process_monitor_message (struct GNUNET_DHT_Handle *handle,
- const struct GNUNET_MessageHeader *msg)
+process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorGetMessage *msg)
{
- struct GNUNET_DHT_MonitorMessage *m;
struct GNUNET_DHT_MonitorHandle *h;
+
+ for (h = handle->monitor_head; NULL != h; h = h->next)
+ {
+ int type_ok;
+ int key_ok;
+
+ type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
+ key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)));
+ if (type_ok && key_ok && (NULL != h->get_cb))
+ h->get_cb (h->cb_cls,
+ ntohl (msg->options),
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ ntohl (msg->hop_count),
+ ntohl (msg->desired_replication_level),
+ ntohl (msg->get_path_length),
+ (struct GNUNET_PeerIdentity *) &msg[1],
+ &msg->key);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process a get response monitor message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg monitor get response message from the service
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorGetRespMessage
+ *msg)
+{
+ struct GNUNET_DHT_MonitorHandle *h;
+ struct GNUNET_PeerIdentity *path;
+ uint32_t getl;
+ uint32_t putl;
size_t msize;
- if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET ||
- ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT)
- return GNUNET_SYSERR;
- msize = ntohs (msg->size);
- if (msize < sizeof (struct GNUNET_DHT_MonitorMessage))
+ msize = ntohs (msg->header.size);
+ path = (struct GNUNET_PeerIdentity *) &msg[1];
+ getl = ntohl (msg->get_path_length);
+ putl = ntohl (msg->put_path_length);
+ if ( (getl + putl < getl) ||
+ ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
+ {
+ GNUNET_break (0);
return GNUNET_SYSERR;
+ }
+ for (h = handle->monitor_head; NULL != h; h = h->next)
+ {
+ int type_ok;
+ int key_ok;
+
+ type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
+ key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)));
+ if (type_ok && key_ok && (NULL != h->get_resp_cb))
+ h->get_resp_cb (h->cb_cls,
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ path, getl,
+ &path[getl], putl,
+ GNUNET_TIME_absolute_ntoh(msg->expiration_time),
+ &msg->key,
+ (void *) &path[getl + putl],
+ msize -
+ sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
+ sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
+ }
+ return GNUNET_OK;
+}
- m = (struct GNUNET_DHT_MonitorMessage *) msg;
- h = handle->monitor_head;
- while (NULL != h)
+
+/**
+ * Process a put monitor message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg Monitor put message from the service.
+ *
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorPutMessage *msg)
+{
+ struct GNUNET_DHT_MonitorHandle *h;
+ size_t msize;
+ struct GNUNET_PeerIdentity *path;
+ uint32_t putl;
+
+ msize = ntohs (msg->header.size);
+ path = (struct GNUNET_PeerIdentity *) &msg[1];
+ putl = ntohl (msg->put_path_length);
+ if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
{
- if (h->type == ntohl(m->type) &&
- (NULL == h->key ||
- memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0))
- {
- struct GNUNET_PeerIdentity *path;
- uint32_t getl;
- uint32_t putl;
-
- path = (struct GNUNET_PeerIdentity *) &m[1];
- getl = ntohl (m->get_path_length);
- putl = ntohl (m->put_path_length);
- h->cb (h->cb_cls, ntohs(msg->type),
- GNUNET_TIME_absolute_ntoh(m->expiration),
- &m->key,
- &path[getl], putl, path, getl,
- ntohl (m->desired_replication_level),
- ntohl (m->options), ntohl (m->type),
- (void *) &path[getl + putl],
- ntohs (msg->size) -
- sizeof (struct GNUNET_DHT_MonitorMessage) -
- sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
- }
- h = h->next;
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ for (h = handle->monitor_head; NULL != h; h = h->next)
+ {
+ int type_ok;
+ int key_ok;
+
+ type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
+ key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)));
+ if (type_ok && key_ok && (NULL != h->put_cb))
+ h->put_cb (h->cb_cls,
+ ntohl (msg->options),
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ ntohl (msg->hop_count),
+ ntohl (msg->desired_replication_level),
+ putl, path,
+ GNUNET_TIME_absolute_ntoh(msg->expiration_time),
+ &msg->key,
+ (void *) &path[putl],
+ msize -
+ sizeof (struct GNUNET_DHT_MonitorPutMessage) -
+ sizeof (struct GNUNET_PeerIdentity) * putl);
}
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process a put confirmation message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg confirmation message from the service.
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
+{
+ struct GNUNET_DHT_PutHandle *ph;
+ GNUNET_DHT_PutContinuation cont;
+ void *cont_cls;
+ for (ph = handle->put_head; NULL != ph; ph = ph->next)
+ if (ph->unique_id == msg->unique_id)
+ break;
+ if (NULL == ph)
+ return GNUNET_OK;
+ cont = ph->cont;
+ cont_cls = ph->cont_cls;
+ GNUNET_DHT_put_cancel (ph);
+ if (NULL != cont)
+ cont (cont_cls, GNUNET_OK);
retur