aboutsummaryrefslogtreecommitdiff
path: root/src/dht
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht')
-rw-r--r--src/dht/Makefile.am11
-rw-r--r--src/dht/Makefile.in41
-rw-r--r--src/dht/dht.conf.in2
-rw-r--r--src/dht/dht.h179
-rw-r--r--src/dht/dht_api.c605
-rw-r--r--src/dht/gnunet-dht-get.c2
-rw-r--r--src/dht/gnunet-dht-monitor.c325
-rw-r--r--src/dht/gnunet-dht-put.c30
-rw-r--r--src/dht/gnunet-service-dht_clients.c437
-rw-r--r--src/dht/gnunet-service-dht_clients.h86
-rw-r--r--src/dht/gnunet-service-dht_datacache.c2
-rw-r--r--src/dht/gnunet-service-dht_neighbours.c84
-rw-r--r--src/dht/gnunet-service-dht_neighbours.h9
-rw-r--r--src/dht/gnunet-service-dht_routing.c67
-rw-r--r--src/dht/plugin_block_dht.c14
-rw-r--r--src/dht/test_dht_2dtorus.conf4
-rw-r--r--src/dht/test_dht_api.c8
-rw-r--r--src/dht/test_dht_api_data.conf3
-rw-r--r--src/dht/test_dht_api_peer1.conf2
-rw-r--r--src/dht/test_dht_line.conf2
-rw-r--r--src/dht/test_dht_monitor.c177
-rw-r--r--src/dht/test_dht_multipeer.c31
-rw-r--r--src/dht/test_dht_multipeer_data.conf7
-rw-r--r--src/dht/test_dht_topo.c107
-rw-r--r--src/dht/test_dht_twopeer.c21
-rw-r--r--src/dht/test_dht_twopeer_data.conf8
-rw-r--r--src/dht/test_dht_twopeer_get_put.c21
-rw-r--r--src/dht/test_dht_twopeer_path_tracking.c4
-rw-r--r--src/dht/test_dht_twopeer_put_get.c42
29 files changed, 1740 insertions, 591 deletions
diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am
index 93880c2..b2d18d2 100644
--- a/src/dht/Makefile.am
+++ b/src/dht/Makefile.am
@@ -50,6 +50,7 @@ libgnunet_plugin_block_dht_la_DEPENDENCIES = \
bin_PROGRAMS = \
gnunet-service-dht \
+ gnunet-dht-monitor \
gnunet-dht-get \
gnunet-dht-put
@@ -92,6 +93,16 @@ gnunet_dht_put_LDADD = \
gnunet_dht_put_DEPENDENCIES = \
libgnunetdht.la
+gnunet_dht_monitor_SOURCES = \
+ gnunet-dht-monitor.c
+gnunet_dht_monitor_LDADD = \
+ $(top_builddir)/src/dht/libgnunetdht.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+gnunet_dht_monitor_DEPENDENCIES = \
+ libgnunetdht.la
+
+
check_PROGRAMS = \
test_dht_api \
test_dht_twopeer \
diff --git a/src/dht/Makefile.in b/src/dht/Makefile.in
index 97e9c59..12ce558 100644
--- a/src/dht/Makefile.in
+++ b/src/dht/Makefile.in
@@ -37,8 +37,8 @@ POST_UNINSTALL = :
build_triplet = @build@
host_triplet = @host@
target_triplet = @target@
-bin_PROGRAMS = gnunet-service-dht$(EXEEXT) gnunet-dht-get$(EXEEXT) \
- gnunet-dht-put$(EXEEXT)
+bin_PROGRAMS = gnunet-service-dht$(EXEEXT) gnunet-dht-monitor$(EXEEXT) \
+ gnunet-dht-get$(EXEEXT) gnunet-dht-put$(EXEEXT)
check_PROGRAMS = test_dht_api$(EXEEXT) test_dht_twopeer$(EXEEXT) \
test_dht_twopeer_put_get$(EXEEXT) \
test_dht_twopeer_get_put$(EXEEXT) \
@@ -123,6 +123,8 @@ libgnunetdht_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \
PROGRAMS = $(bin_PROGRAMS)
am_gnunet_dht_get_OBJECTS = gnunet-dht-get.$(OBJEXT)
gnunet_dht_get_OBJECTS = $(am_gnunet_dht_get_OBJECTS)
+am_gnunet_dht_monitor_OBJECTS = gnunet-dht-monitor.$(OBJEXT)
+gnunet_dht_monitor_OBJECTS = $(am_gnunet_dht_monitor_OBJECTS)
am_gnunet_dht_put_OBJECTS = gnunet-dht-put.$(OBJEXT)
gnunet_dht_put_OBJECTS = $(am_gnunet_dht_put_OBJECTS)
am_gnunet_service_dht_OBJECTS = gnunet-service-dht.$(OBJEXT) \
@@ -208,19 +210,21 @@ am__v_GEN_ = $(am__v_GEN_$(AM_DEFAULT_VERBOSITY))
am__v_GEN_0 = @echo " GEN " $@;
SOURCES = $(libgnunet_plugin_block_dht_la_SOURCES) \
$(libgnunetdht_la_SOURCES) $(gnunet_dht_get_SOURCES) \
- $(gnunet_dht_put_SOURCES) $(gnunet_service_dht_SOURCES) \
- $(test_dht_2dtorus_SOURCES) $(test_dht_api_SOURCES) \
- $(test_dht_line_SOURCES) $(test_dht_monitor_SOURCES) \
- $(test_dht_multipeer_SOURCES) $(test_dht_twopeer_SOURCES) \
+ $(gnunet_dht_monitor_SOURCES) $(gnunet_dht_put_SOURCES) \
+ $(gnunet_service_dht_SOURCES) $(test_dht_2dtorus_SOURCES) \
+ $(test_dht_api_SOURCES) $(test_dht_line_SOURCES) \
+ $(test_dht_monitor_SOURCES) $(test_dht_multipeer_SOURCES) \
+ $(test_dht_twopeer_SOURCES) \
$(test_dht_twopeer_get_put_SOURCES) \
$(test_dht_twopeer_path_tracking_SOURCES) \
$(test_dht_twopeer_put_get_SOURCES)
DIST_SOURCES = $(libgnunet_plugin_block_dht_la_SOURCES) \
$(libgnunetdht_la_SOURCES) $(gnunet_dht_get_SOURCES) \
- $(gnunet_dht_put_SOURCES) $(gnunet_service_dht_SOURCES) \
- $(test_dht_2dtorus_SOURCES) $(test_dht_api_SOURCES) \
- $(test_dht_line_SOURCES) $(test_dht_monitor_SOURCES) \
- $(test_dht_multipeer_SOURCES) $(test_dht_twopeer_SOURCES) \
+ $(gnunet_dht_monitor_SOURCES) $(gnunet_dht_put_SOURCES) \
+ $(gnunet_service_dht_SOURCES) $(test_dht_2dtorus_SOURCES) \
+ $(test_dht_api_SOURCES) $(test_dht_line_SOURCES) \
+ $(test_dht_monitor_SOURCES) $(test_dht_multipeer_SOURCES) \
+ $(test_dht_twopeer_SOURCES) \
$(test_dht_twopeer_get_put_SOURCES) \
$(test_dht_twopeer_path_tracking_SOURCES) \
$(test_dht_twopeer_put_get_SOURCES)
@@ -285,6 +289,7 @@ INSTALL_SCRIPT = @INSTALL_SCRIPT@
INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@
INTLLIBS = @INTLLIBS@
INTL_MACOSX_LIBS = @INTL_MACOSX_LIBS@
+JAVAPORT = @JAVAPORT@
LD = @LD@
LDFLAGS = @LDFLAGS@
LIBADD_DL = @LIBADD_DL@
@@ -318,6 +323,7 @@ LT_DLLOADERS = @LT_DLLOADERS@
LT_DLPREOPEN = @LT_DLPREOPEN@
MAKEINFO = @MAKEINFO@
MKDIR_P = @MKDIR_P@
+MONKEYPREFIX = @MONKEYPREFIX@
MSGFMT = @MSGFMT@
MSGFMT_015 = @MSGFMT_015@
MSGMERGE = @MSGMERGE@
@@ -519,6 +525,17 @@ gnunet_dht_put_LDADD = \
gnunet_dht_put_DEPENDENCIES = \
libgnunetdht.la
+gnunet_dht_monitor_SOURCES = \
+ gnunet-dht-monitor.c
+
+gnunet_dht_monitor_LDADD = \
+ $(top_builddir)/src/dht/libgnunetdht.la \
+ $(top_builddir)/src/core/libgnunetcore.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+gnunet_dht_monitor_DEPENDENCIES = \
+ libgnunetdht.la
+
test_dht_api_SOURCES = \
test_dht_api.c
@@ -778,6 +795,9 @@ clean-checkPROGRAMS:
gnunet-dht-get$(EXEEXT): $(gnunet_dht_get_OBJECTS) $(gnunet_dht_get_DEPENDENCIES)
@rm -f gnunet-dht-get$(EXEEXT)
$(AM_V_CCLD)$(LINK) $(gnunet_dht_get_OBJECTS) $(gnunet_dht_get_LDADD) $(LIBS)
+gnunet-dht-monitor$(EXEEXT): $(gnunet_dht_monitor_OBJECTS) $(gnunet_dht_monitor_DEPENDENCIES)
+ @rm -f gnunet-dht-monitor$(EXEEXT)
+ $(AM_V_CCLD)$(LINK) $(gnunet_dht_monitor_OBJECTS) $(gnunet_dht_monitor_LDADD) $(LIBS)
gnunet-dht-put$(EXEEXT): $(gnunet_dht_put_OBJECTS) $(gnunet_dht_put_DEPENDENCIES)
@rm -f gnunet-dht-put$(EXEEXT)
$(AM_V_CCLD)$(LINK) $(gnunet_dht_put_OBJECTS) $(gnunet_dht_put_LDADD) $(LIBS)
@@ -820,6 +840,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/dht_api.Plo@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-get.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-monitor.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-put.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-service-dht.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-service-dht_clients.Po@am__quote@
diff --git a/src/dht/dht.conf.in b/src/dht/dht.conf.in
index 17c13e9..59581dc 100644
--- a/src/dht/dht.conf.in
+++ b/src/dht/dht.conf.in
@@ -1,6 +1,6 @@
[dht]
AUTOSTART = YES
-@UNIXONLY@ PORT = 2095
+@JAVAPORT@PORT = 2095
HOSTNAME = localhost
HOME = $SERVICEHOME
CONFIG = $DEFAULTCONFIG
diff --git a/src/dht/dht.h b/src/dht/dht.h
index 9894be8..8adf49f 100644
--- a/src/dht/dht.h
+++ b/src/dht/dht.h
@@ -181,6 +181,11 @@ struct GNUNET_DHT_ClientPutMessage
uint32_t desired_replication_level GNUNET_PACKED;
/**
+ * Unique ID for the PUT message.
+ */
+ uint64_t unique_id GNUNET_PACKED;
+
+ /**
* How long should this data persist?
*/
struct GNUNET_TIME_AbsoluteNBO expiration;
@@ -196,20 +201,38 @@ struct GNUNET_DHT_ClientPutMessage
/**
- * Message to monitor requests going through peer, clients <--> DHT service.
+ * Message to confirming receipt of PUT, sent from DHT service to clients.
*/
-struct GNUNET_DHT_MonitorMessage
+struct GNUNET_DHT_ClientPutConfirmationMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_{GET, PUT, GET_RESP, PUT_RESP*}
- * (*) not yet implemented, necessary for key randomization
+ * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK
*/
struct GNUNET_MessageHeader header;
/**
- * The type of data in the request.
+ * Always zero.
*/
- uint32_t type GNUNET_PACKED;
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * Unique ID from the PUT message that is being confirmed.
+ */
+ uint64_t unique_id GNUNET_PACKED;
+
+};
+
+
+
+/**
+ * Message to monitor put requests going through peer, DHT service -> clients.
+ */
+struct GNUNET_DHT_MonitorPutMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT
+ */
+ struct GNUNET_MessageHeader header;
/**
* Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
@@ -217,37 +240,165 @@ struct GNUNET_DHT_MonitorMessage
uint32_t options GNUNET_PACKED;
/**
+ * The type of data in the request.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Hop count so far.
+ */
+ uint32_t hop_count GNUNET_PACKED;
+
+ /**
* Replication level for this message
*/
uint32_t desired_replication_level GNUNET_PACKED;
/**
* Number of peers recorded in the outgoing path from source to the
- * storgage location of this message.
+ * storage location of this message.
*/
uint32_t put_path_length GNUNET_PACKED;
/**
- * The number of peer identities recorded from the storage location
- * to this peer.
+ * How long should this data persist?
*/
- uint32_t get_path_length GNUNET_PACKED;
+ struct GNUNET_TIME_AbsoluteNBO expiration_time;
/**
- * Unique ID for GET / GET responses.
+ * The key to store the value under.
*/
- uint64_t unique_id GNUNET_PACKED;
+ GNUNET_HashCode key;
+
+ /* put path (if tracked) */
+
+ /* Payload */
+};
+
+
+/**
+ * Message to request monitoring messages, clients -> DHT service.
+ */
+struct GNUNET_DHT_MonitorStartStopMessage
+{
/**
- * How long should this data persist?
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_(START|STOP)
*/
- struct GNUNET_TIME_AbsoluteNBO expiration;
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Flag whether to notify about GET messages.
+ */
+ int16_t get GNUNET_PACKED;
+
+ /**
+ * Flag whether to notify about GET_REPONSE messages.
+ */
+ int16_t get_resp GNUNET_PACKED;
+
+ /**
+ * Flag whether to notify about PUT messages.
+ */
+ int16_t put GNUNET_PACKED;
+
+ /**
+ * Flag whether to use the provided key to filter messages.
+ */
+ int16_t filter_key GNUNET_PACKED;
+
+ /**
+ * The key to filter messages by.
+ */
+ GNUNET_HashCode key;
+};
+
+
+/**
+ * Message to monitor get requests going through peer, DHT service -> clients.
+ */
+struct GNUNET_DHT_MonitorGetMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Message options, actually an 'enum GNUNET_DHT_RouteOption' value.
+ */
+ uint32_t options GNUNET_PACKED;
+
+ /**
+ * The type of data in the request.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Hop count
+ */
+ uint32_t hop_count GNUNET_PACKED;
+
+ /**
+ * Replication level for this message
+ */
+ uint32_t desired_replication_level GNUNET_PACKED;
+
+ /**
+ * Number of peers recorded in the outgoing path from source to the
+ * storage location of this message.
+ */
+ uint32_t get_path_length GNUNET_PACKED;
/**
* The key to store the value under.
*/
GNUNET_HashCode key;
+ /* get path (if tracked) */
+
+};
+
+/**
+ * Message to monitor get results going through peer, DHT service -> clients.
+ */
+struct GNUNET_DHT_MonitorGetRespMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Content type.
+ */
+ uint32_t type GNUNET_PACKED;
+
+ /**
+ * Length of the PUT path that follows (if tracked).
+ */
+ uint32_t put_path_length GNUNET_PACKED;
+
+ /**
+ * Length of the GET path that follows (if tracked).
+ */
+ uint32_t get_path_length GNUNET_PACKED;
+
+ /**
+ * When does the content expire?
+ */
+ struct GNUNET_TIME_AbsoluteNBO expiration_time;
+
+ /**
+ * The key of the corresponding GET request.
+ */
+ GNUNET_HashCode key;
+
/* put path (if tracked) */
/* get path (if tracked) */
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 3cb13b4..420eacb 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -74,11 +74,6 @@ struct PendingMessage
void *cont_cls;
/**
- * Timeout task for this message
- */
- GNUNET_SCHEDULER_TaskIdentifier timeout_task;
-
- /**
* Unique ID for this request
*/
uint64_t unique_id;
@@ -100,6 +95,56 @@ struct PendingMessage
/**
+ * Handle to a PUT request.
+ */
+struct GNUNET_DHT_PutHandle
+{
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_DHT_PutHandle *next;
+
+ /**
+ * Kept in a DLL.
+ */
+ struct GNUNET_DHT_PutHandle *prev;
+
+ /**
+ * Continuation to call when done.
+ */
+ GNUNET_DHT_PutContinuation cont;
+
+ /**
+ * Pending message associated with this PUT operation,
+ * NULL after the message has been transmitted to the service.
+ */
+ struct PendingMessage *pending;
+
+ /**
+ * Main handle to this DHT api
+ */
+ struct GNUNET_DHT_Handle *dht_handle;
+
+ /**
+ * Closure for 'cont'.
+ */
+ void *cont_cls;
+
+ /**
+ * Timeout task for this operation.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier timeout_task;
+
+ /**
+ * Unique ID for the PUT operation.
+ */
+ uint64_t unique_id;
+
+};
+
+
+
+/**
* Handle to a GET request
*/
struct GNUNET_DHT_GetHandle
@@ -171,9 +216,19 @@ struct GNUNET_DHT_MonitorHandle
GNUNET_HashCode *key;
/**
- * Callback for each received message of interest.
+ * Callback for each received message of type get.
+ */
+ GNUNET_DHT_MonitorGetCB get_cb;
+
+ /**
+ * Callback for each received message of type get response.
+ */
+ GNUNET_DHT_MonitorGetRespCB get_resp_cb;
+
+ /**
+ * Callback for each received message of type put.
*/
- GNUNET_DHT_MonitorCB cb;
+ GNUNET_DHT_MonitorPutCB put_cb;
/**
* Closure for cb.
@@ -225,8 +280,18 @@ struct GNUNET_DHT_Handle
struct GNUNET_DHT_MonitorHandle *monitor_tail;
/**
- * Hash map containing the current outstanding unique requests
- * (values are of type 'struct GNUNET_DHT_RouteHandle').
+ * Head of active PUT requests.
+ */
+ struct GNUNET_DHT_PutHandle *put_head;
+
+ /**
+ * Tail of active PUT requests.
+ */
+ struct GNUNET_DHT_PutHandle *put_tail;
+
+ /**
+ * Hash map containing the current outstanding unique GET requests
+ * (values are of type 'struct GNUNET_DHT_GetHandle').
*/
struct GNUNET_CONTAINER_MultiHashMap *active_requests;
@@ -257,6 +322,8 @@ struct GNUNET_DHT_Handle
* Handler for messages received from the DHT service
* a demultiplexer which handles numerous message types
*
+ * @param cls the 'struct GNUNET_DHT_Handle'
+ * @param msg the incoming message
*/
static void
service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
@@ -265,16 +332,17 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
/**
* Try to (re)connect to the DHT service.
*
+ * @param handle DHT handle to reconnect
* @return GNUNET_YES on success, GNUNET_NO on failure.
*/
static int
try_connect (struct GNUNET_DHT_Handle *handle)
{
- if (handle->client != NULL)
+ if (NULL != handle->client)
return GNUNET_OK;
handle->in_receive = GNUNET_NO;
handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg);
- if (handle->client == NULL)
+ if (NULL == handle->client)
{
LOG (GNUNET_ERROR_TYPE_WARNING,
_("Failed to connect to the DHT service!\n"));
@@ -314,6 +382,7 @@ add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
/**
* Try to send messages from list of messages to send
+ *
* @param handle DHT_Handle
*/
static void
@@ -359,17 +428,34 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
static void
do_disconnect (struct GNUNET_DHT_Handle *handle)
{
- if (handle->client == NULL)
+ struct GNUNET_DHT_PutHandle *ph;
+ struct GNUNET_DHT_PutHandle *next;
+
+ if (NULL == handle->client)
return;
- GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
+ GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task);
if (NULL != handle->th)
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
handle->th = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Disconnecting from DHT service, will try to reconnect in %llu ms\n",
(unsigned long long) handle->retry_time.rel_value);
- GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO);
+ GNUNET_CLIENT_disconnect (handle->client);
handle->client = NULL;
+
+ /* signal disconnect to all PUT requests that were transmitted but waiting
+ for the put confirmation */
+ next = handle->put_head;
+ while (NULL != (ph = next))
+ {
+ next = ph->next;
+ if (NULL == ph->pending)
+ {
+ if (NULL != ph->cont)
+ ph->cont (ph->cont_cls, GNUNET_SYSERR);
+ GNUNET_DHT_put_cancel (ph);
+ }
+ }
handle->reconnect_task =
GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle);
}
@@ -377,6 +463,11 @@ do_disconnect (struct GNUNET_DHT_Handle *handle)
/**
* Transmit the next pending message, called by notify_transmit_ready
+ *
+ * @param cls the DHT handle
+ * @param size number of bytes available in 'buf' for transmission
+ * @param buf where to copy messages for the service
+ * @return number of bytes written to 'buf'
*/
static size_t
transmit_pending (void *cls, size_t size, void *buf);
@@ -384,20 +475,22 @@ transmit_pending (void *cls, size_t size, void *buf);
/**
* Try to send messages from list of messages to send
+ *
+ * @param handle handle to DHT
*/
static void
process_pending_messages (struct GNUNET_DHT_Handle *handle)
{
struct PendingMessage *head;
- if (handle->client == NULL)
+ if (NULL == handle->client)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "process_pending_messages called, but client is null, reconnecting\n");
+ "process_pending_messages called, but client is NULL, reconnecting\n");
do_disconnect (handle);
return;
}
- if (handle->th != NULL)
+ if (NULL != handle->th)
return;
if (NULL == (head = handle->pending_head))
return;
@@ -417,6 +510,11 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle)
/**
* Transmit the next pending message, called by notify_transmit_ready
+ *
+ * @param cls the DHT handle
+ * @param size number of bytes available in 'buf' for transmission
+ * @param buf where to copy messages for the service
+ * @return number of bytes written to 'buf'
*/
static size_t
transmit_pending (void *cls, size_t size, void *buf)
@@ -426,7 +524,7 @@ transmit_pending (void *cls, size_t size, void *buf)
size_t tsize;
handle->th = NULL;
- if (buf == NULL)
+ if (NULL == buf)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Transmission to DHT service failed! Reconnecting!\n");
@@ -446,11 +544,6 @@ transmit_pending (void *cls, size_t size, void *buf)
GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
head);
head->in_pending_queue = GNUNET_NO;
- if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (head->timeout_task);
- head->timeout_task = GNUNET_SCHEDULER_NO_TASK;
- }
if (NULL != head->cont)
{
head->cont (head->cont_cls, NULL);
@@ -533,63 +626,178 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
return GNUNET_YES;
}
-
/**
- * Process a monitoring message from the service.
+ * Process a get monitor message from the service.
*
* @param handle The DHT handle.
- * @param msg Message from the service.
+ * @param msg Monitor get message from the service.
*
* @return GNUNET_OK if everything went fine,
* GNUNET_SYSERR if the message is malformed.
*/
static int
-process_monitor_message (struct GNUNET_DHT_Handle *handle,
- const struct GNUNET_MessageHeader *msg)
+process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorGetMessage *msg)
{
- struct GNUNET_DHT_MonitorMessage *m;
struct GNUNET_DHT_MonitorHandle *h;
+
+ for (h = handle->monitor_head; NULL != h; h = h->next)
+ {
+ int type_ok;
+ int key_ok;
+
+ type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
+ key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)));
+ if (type_ok && key_ok && (NULL != h->get_cb))
+ h->get_cb (h->cb_cls,
+ ntohl (msg->options),
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ ntohl (msg->hop_count),
+ ntohl (msg->desired_replication_level),
+ ntohl (msg->get_path_length),
+ (struct GNUNET_PeerIdentity *) &msg[1],
+ &msg->key);
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process a get response monitor message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg monitor get response message from the service
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorGetRespMessage
+ *msg)
+{
+ struct GNUNET_DHT_MonitorHandle *h;
+ struct GNUNET_PeerIdentity *path;
+ uint32_t getl;
+ uint32_t putl;
size_t msize;
- if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET ||
- ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT)
- return GNUNET_SYSERR;
- msize = ntohs (msg->size);
- if (msize < sizeof (struct GNUNET_DHT_MonitorMessage))
+ msize = ntohs (msg->header.size);
+ path = (struct GNUNET_PeerIdentity *) &msg[1];
+ getl = ntohl (msg->get_path_length);
+ putl = ntohl (msg->put_path_length);
+ if ( (getl + putl < getl) ||
+ ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) )
+ {
+ GNUNET_break (0);
return GNUNET_SYSERR;
+ }
+ for (h = handle->monitor_head; NULL != h; h = h->next)
+ {
+ int type_ok;
+ int key_ok;
+
+ type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
+ key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)));
+ if (type_ok && key_ok && (NULL != h->get_resp_cb))
+ h->get_resp_cb (h->cb_cls,
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ path, getl,
+ &path[getl], putl,
+ GNUNET_TIME_absolute_ntoh(msg->expiration_time),
+ &msg->key,
+ (void *) &path[getl + putl],
+ msize -
+ sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
+ sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
+ }
+ return GNUNET_OK;
+}
- m = (struct GNUNET_DHT_MonitorMessage *) msg;
- h = handle->monitor_head;
- while (NULL != h)
+
+/**
+ * Process a put monitor message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg Monitor put message from the service.
+ *
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorPutMessage *msg)
+{
+ struct GNUNET_DHT_MonitorHandle *h;
+ size_t msize;
+ struct GNUNET_PeerIdentity *path;
+ uint32_t putl;
+
+ msize = ntohs (msg->header.size);
+ path = (struct GNUNET_PeerIdentity *) &msg[1];
+ putl = ntohl (msg->put_path_length);
+ if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl)
{
- if (h->type == ntohl(m->type) &&
- (NULL == h->key ||
- memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0))
- {
- struct GNUNET_PeerIdentity *path;
- uint32_t getl;
- uint32_t putl;
-
- path = (struct GNUNET_PeerIdentity *) &m[1];
- getl = ntohl (m->get_path_length);
- putl = ntohl (m->put_path_length);
- h->cb (h->cb_cls, ntohs(msg->type),
- GNUNET_TIME_absolute_ntoh(m->expiration),
- &m->key,
- &path[getl], putl, path, getl,
- ntohl (m->desired_replication_level),
- ntohl (m->options), ntohl (m->type),
- (void *) &path[getl + putl],
- ntohs (msg->size) -
- sizeof (struct GNUNET_DHT_MonitorMessage) -
- sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
- }
- h = h->next;
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ for (h = handle->monitor_head; NULL != h; h = h->next)
+ {
+ int type_ok;
+ int key_ok;
+
+ type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
+ key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)));
+ if (type_ok && key_ok && (NULL != h->put_cb))
+ h->put_cb (h->cb_cls,
+ ntohl (msg->options),
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ ntohl (msg->hop_count),
+ ntohl (msg->desired_replication_level),
+ putl, path,
+ GNUNET_TIME_absolute_ntoh(msg->expiration_time),
+ &msg->key,
+ (void *) &path[putl],
+ msize -
+ sizeof (struct GNUNET_DHT_MonitorPutMessage) -
+ sizeof (struct GNUNET_PeerIdentity) * putl);
}
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process a put confirmation message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg confirmation message from the service.
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_put_confirmation_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_ClientPutConfirmationMessage *msg)
+{
+ struct GNUNET_DHT_PutHandle *ph;
+ GNUNET_DHT_PutContinuation cont;
+ void *cont_cls;
+ for (ph = handle->put_head; NULL != ph; ph = ph->next)
+ if (ph->unique_id == msg->unique_id)
+ break;
+ if (NULL == ph)
+ return GNUNET_OK;
+ cont = ph->cont;
+ cont_cls = ph->cont_cls;
+ GNUNET_DHT_put_cancel (ph);
+ if (NULL != cont)
+ cont (cont_cls, GNUNET_OK);
return GNUNET_OK;
}
+
/**
* Handler for messages received from the DHT service
* a demultiplexer which handles numerous message types
@@ -602,38 +810,84 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_DHT_Handle *handle = cls;
const struct GNUNET_DHT_ClientResultMessage *dht_msg;
+ uint16_t msize;
+ int ret;
- if (msg == NULL)
+ if (NULL == msg)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Error receiving data from DHT service, reconnecting\n");
do_disconnect (handle);
return;
}
- if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT)
+ ret = GNUNET_SYSERR;
+ msize = ntohs (msg->size);
+ switch (ntohs (msg->type))
{
- if (process_monitor_message (handle, msg) == GNUNET_OK)
+ case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
+ if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
{
- GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
- GNUNET_TIME_UNIT_FOREVER_REL);
- return;
+ GNUNET_break (0);
+ break;
}
- GNUNET_break (0);
- do_disconnect (handle);
- return;
+ ret = process_monitor_get_message(handle,
+ (const struct GNUNET_DHT_MonitorGetMessage *) msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
+ if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
+ {
+ GNUNET_break (0);
+ break;
+ }
+ ret = process_monitor_get_resp_message(handle,
+ (const struct GNUNET_DHT_MonitorGetRespMessage *) msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
+ if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
+ {
+ GNUNET_break (0);
+ break;
+ }
+ ret = process_monitor_put_message(handle,
+ (const struct GNUNET_DHT_MonitorPutMessage *) msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
+ /* Not implemented yet */
+ GNUNET_break(0);
+ break;
+ case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT:
+ if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
+ {
+ GNUNET_break (0);
+ break;
+ }
+ ret = GNUNET_OK;
+ dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
+ GNUNET_h2s (&dht_msg->key), handle);
+ GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
+ &dht_msg->key, &process_reply,
+ (void *) dht_msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
+ {
+ GNUNET_break (0);
+ break;
+ }
+ ret = process_put_confirmation_message (handle,
+ (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg);
+ break;
+ default:
+ GNUNET_break(0);
+ break;
}
- if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage))
+ if (GNUNET_OK != ret)
{
GNUNET_break (0);
do_disconnect (handle);
return;
}
- dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
- GNUNET_h2s (&dht_msg->key), handle);
- GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
- &dht_msg->key, &process_reply,
- (void *) dht_msg);
GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
GNUNET_TIME_UNIT_FOREVER_REL);
}
@@ -677,11 +931,12 @@ void
GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
{
struct PendingMessage *pm;
+ struct GNUNET_DHT_PutHandle *ph;
- GNUNET_assert (handle != NULL);
+ GNUNET_assert (NULL != handle);
GNUNET_assert (0 ==
GNUNET_CONTAINER_multihashmap_size (handle->active_requests));
- if (handle->th != NULL)
+ if (NULL != handle->th)
{
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
handle->th = NULL;
@@ -693,18 +948,24 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
pm);
pm->in_pending_queue = GNUNET_NO;
GNUNET_assert (GNUNET_YES == pm->free_on_send);
- if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task)
- GNUNET_SCHEDULER_cancel (pm->timeout_task);
if (NULL != pm->cont)
pm->cont (pm->cont_cls, NULL);
GNUNET_free (pm);
}
- if (handle->client != NULL)
+ while (NULL != (ph = handle->put_head))
{
- GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES);
+ GNUNET_break (NULL == ph->pending);
+ if (NULL != ph->cont)
+ ph->cont (ph->cont_cls, GNUNET_SYSERR);
+ GNUNET_DHT_put_cancel (ph);
+ }
+
+ if (NULL != handle->client)
+ {
+ GNUNET_CLIENT_disconnect (handle->client);
handle->client = NULL;
}
- if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
+ if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task)
GNUNET_SCHEDULER_cancel (handle->reconnect_task);
GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests);
GNUNET_free (handle);
@@ -720,22 +981,49 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle)
static void
timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- struct PendingMessage *pending = cls;
- struct GNUNET_DHT_Handle *handle;
+ struct GNUNET_DHT_PutHandle *ph = cls;
+ struct GNUNET_DHT_Handle *handle = ph->dht_handle;
- handle = pending->handle;
- GNUNET_assert (GNUNET_YES == pending->in_pending_queue);
- GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
- pending);
- pending->in_pending_queue = GNUNET_NO;
- if (pending->cont != NULL)
- pending->cont (pending->cont_cls, tc);
- GNUNET_free (pending);
+ ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ if (NULL != ph->pending)
+ {
+ GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail,
+ ph->pending);
+ ph->pending->in_pending_queue = GNUNET_NO;
+ GNUNET_free (ph->pending);
+ }
+ if (NULL != ph->cont)
+ ph->cont (ph->cont_cls, GNUNET_NO);
+ GNUNET_CONTAINER_DLL_remove (handle->put_head,
+ handle->put_tail,
+ ph);
+ GNUNET_free (ph);
+}
+
+
+/**
+ * Function called whenever the PUT message leaves the queue. Sets
+ * the message pointer in the put handle to NULL.
+ *
+ * @param cls the 'struct GNUNET_DHT_PutHandle'
+ * @param tc unused
+ */
+static void
+mark_put_message_gone (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_DHT_PutHandle *ph = cls;
+
+ ph->pending = NULL;
}
/**
- * Perform a PUT operation storing data in the DHT.
+ * Perform a PUT operation storing data in the DHT. FIXME: we should
+ * change the protocol to get a confirmation for the PUT from the DHT
+ * and call 'cont' only after getting the confirmation; otherwise, the
+ * client has no good way of telling if the 'PUT' message actually got
+ * to the DHT service!
*
* @param handle handle to DHT service
* @param key the key to store under
@@ -748,51 +1036,97 @@ timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
* @param exp desired expiration time for the value
* @param timeout how long to wait for transmission of this request
* @param cont continuation to call when done (transmitting request to service)
+ * You must not call GNUNET_DHT_DISCONNECT in this continuation
* @param cont_cls closure for cont
*/
-void
+struct GNUNET_DHT_PutHandle *
GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
uint32_t desired_replication_level,
enum GNUNET_DHT_RouteOption options,
enum GNUNET_BLOCK_Type type, size_t size, const char *data,
struct GNUNET_TIME_Absolute exp,
- struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont,
+ struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont,
void *cont_cls)
{
struct GNUNET_DHT_ClientPutMessage *put_msg;
size_t msize;
struct PendingMessage *pending;
+ struct GNUNET_DHT_PutHandle *ph;
msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size;
if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) ||
(size >= GNUNET_SERVER_MAX_MESSAGE_SIZE))
{
GNUNET_break (0);
- if (NULL != cont)
- cont (cont_cls, NULL);
- return;
+ return NULL;
}
+ ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle));
+ ph->dht_handle = handle;
+ ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph);
+ ph->cont = cont;
+ ph->cont_cls = cont_cls;
+ ph->unique_id = ++handle->uid_gen;
pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+ ph->pending = pending;
put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1];
pending->msg = &put_msg->header;
pending->handle = handle;
- pending->cont = cont;
- pending->cont_cls = cont_cls;
+ pending->cont = &mark_put_message_gone;
+ pending->cont_cls = ph;
pending->free_on_send = GNUNET_YES;
- pending->timeout_task =
- GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending);
put_msg->header.size = htons (msize);
put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT);
put_msg->type = htonl (type);
put_msg->options = htonl ((uint32_t) options);
put_msg->desired_replication_level = htonl (desired_replication_level);
+ put_msg->unique_id = ph->unique_id;
put_msg->expiration = GNUNET_TIME_absolute_hton (exp);
put_msg->key = *key;
memcpy (&put_msg[1], data, size);
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
pending);
pending->in_pending_queue = GNUNET_YES;
+ GNUNET_CONTAINER_DLL_insert_tail (handle->put_head,
+ handle->put_tail,
+ ph);
process_pending_messages (handle);
+ return ph;
+}
+
+
+/**
+ * Cancels a DHT PUT operation. Note that the PUT request may still
+ * go out over the network (we can't stop that); However, if the PUT
+ * has not yet been sent to the service, cancelling the PUT will stop
+ * this from happening (but there is no way for the user of this API
+ * to tell if that is the case). The only use for this API is to
+ * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because
+ * the system is shutting down).
+ *
+ * @param ph put operation to cancel ('cont' will no longer be called)
+ */
+void
+GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
+{
+ struct GNUNET_DHT_Handle *handle = ph->dht_handle;
+
+ if (NULL != ph->pending)
+ {
+ GNUNET_CONTAINER_DLL_remove (handle->pending_head,
+ handle->pending_tail,
+ ph->pending);
+ GNUNET_free (ph->pending);
+ ph->pending = NULL;
+ }
+ if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (ph->timeout_task);
+ ph->timeout_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_CONTAINER_DLL_remove (handle->put_head,
+ handle->put_tail,
+ ph);
+ GNUNET_free (ph);
}
@@ -801,7 +1135,6 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
* also "GNUNET_BLOCK_evaluate".
*
* @param handle handle to the DHT service
- * @param timeout how long to wait for transmission of this request to the service
* @param type expected type of the response object
* @param key the key to look up
* @param desired_replication_level estimate of how many
@@ -815,7 +1148,6 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
*/
struct GNUNET_DHT_GetHandle *
GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
- struct GNUNET_TIME_Relative timeout,
enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
uint32_t desired_replication_level,
enum GNUNET_DHT_RouteOption options, const void *xquery,
@@ -847,8 +1179,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
get_msg->desired_replication_level = htonl (desired_replication_level);
get_msg->type = htonl (type);
get_msg->key = *key;
- handle->uid_gen++;
- get_msg->unique_id = handle->uid_gen;
+ get_msg->unique_id = ++handle->uid_gen;
memcpy (&get_msg[1], xquery, xquery_size);
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
pending);
@@ -925,7 +1256,9 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
* @param handle Handle to the DHT service.
* @param type Type of blocks that are of interest.
* @param key Key of data of interest, NULL for all.
- * @param cb Callback to process all monitored data.
+ * @param get_cb Callback to process monitored get messages.
+ * @param get_resp_cb Callback to process monitored get response messages.
+ * @param put_cb Callback to process monitored put messages.
* @param cb_cls Closure for cb.
*
* @return Handle to stop monitoring.
@@ -934,18 +1267,21 @@ struct GNUNET_DHT_MonitorHandle *
GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
enum GNUNET_BLOCK_Type type,
const GNUNET_HashCode *key,
- GNUNET_DHT_MonitorCB cb,
+ GNUNET_DHT_MonitorGetCB get_cb,
+ GNUNET_DHT_MonitorGetRespCB get_resp_cb,
+ GNUNET_DHT_MonitorPutCB put_cb,
void *cb_cls)
{
struct GNUNET_DHT_MonitorHandle *h;
- struct GNUNET_DHT_MonitorMessage *m;
+ struct GNUNET_DHT_MonitorStartStopMessage *m;
struct PendingMessage *pending;
h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
- GNUNET_assert (NULL != cb);
- h->cb = cb;
+ h->get_cb = get_cb;
+ h->get_resp_cb = get_resp_cb;
+ h->put_cb = put_cb;
h->cb_cls = cb_cls;
h->type = type;
h->dht_handle = handle;
@@ -955,17 +1291,22 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
memcpy (h->key, key, sizeof(GNUNET_HashCode));
}
- pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) +
+ pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
sizeof (struct PendingMessage));
- m = (struct GNUNET_DHT_MonitorMessage *) &pending[1];
+ m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
pending->msg = &m->header;
pending->handle = handle;
pending->free_on_send = GNUNET_YES;
- m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
- m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage));
+ m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START);
+ m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
m->type = htonl(type);
- if (NULL != key)
+ m->get = htons(NULL != get_cb);
+ m->get_resp = htons(NULL != get_resp_cb);
+ m->put = htons(NULL != put_cb);
+ if (NULL != key) {
+ m->filter_key = htons(1);
memcpy (&m->key, key, sizeof(GNUNET_HashCode));
+ }
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
pending);
pending->in_pending_queue = GNUNET_YES;
@@ -985,10 +1326,36 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
void
GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
{
- GNUNET_free_non_null (handle->key);
+ struct GNUNET_DHT_MonitorStartStopMessage *m;
+ struct PendingMessage *pending;
+
GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head,
handle->dht_handle->monitor_tail,
handle);
+
+ pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
+ sizeof (struct PendingMessage));
+ m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1];
+ pending->msg = &m->header;
+ pending->handle = handle->dht_handle;
+ pending->free_on_send = GNUNET_YES;
+ m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP);
+ m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage));
+ m->type = htonl(handle->type);
+ m->get = htons(NULL != handle->get_cb);
+ m->get_resp = htons(NULL != handle->get_resp_cb);
+ m->put = htons(NULL != handle->put_cb);
+ if (NULL != handle->key) {
+ m->filter_key = htons(1);
+ memcpy (&m->key, handle->key, sizeof(GNUNET_HashCode));
+ }
+ GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head,
+ handle->dht_handle->pending_tail,
+ pending);
+ pending->in_pending_queue = GNUNET_YES;
+ process_pending_messages (handle->dht_handle);
+
+ GNUNET_free_non_null (handle->key);
GNUNET_free (handle);
}
diff --git a/src/dht/gnunet-dht-get.c b/src/dht/gnunet-dht-get.c
index 6ad4b30..fb185c4 100644
--- a/src/dht/gnunet-dht-get.c
+++ b/src/dht/gnunet-dht-get.c
@@ -186,7 +186,7 @@ run (void *cls, char *const *args, const char *cfgfile,
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining
(absolute_timeout), &cleanup_task, NULL);
get_handle =
- GNUNET_DHT_get_start (dht_handle, timeout, query_type, &key, replication,
+ GNUNET_DHT_get_start (dht_handle, query_type, &key, replication,
GNUNET_DHT_RO_NONE, NULL, 0, &get_result_iterator,
NULL);
diff --git a/src/dht/gnunet-dht-monitor.c b/src/dht/gnunet-dht-monitor.c
new file mode 100644
index 0000000..8ca3beb
--- /dev/null
+++ b/src/dht/gnunet-dht-monitor.c
@@ -0,0 +1,325 @@
+/*
+ This file is part of GNUnet.
+ (C) 2012 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+/**
+ * @file dht/gnunet-dht-monitor.c
+ * @brief search for data in DHT
+ * @author Christian Grothoff
+ * @author Bartlomiej Polot
+ */
+#include "platform.h"
+#include "gnunet_dht_service.h"
+
+/**
+ * The type of the query
+ */
+static unsigned int block_type;
+
+/**
+ * The key to be monitored
+ */
+static char *query_key;
+
+/**
+ * User supplied timeout value (in seconds)
+ */
+static unsigned long long timeout_request = 5;
+
+/**
+ * Be verbose
+ */
+static int verbose;
+
+/**
+* Handle to the DHT
+ */
+static struct GNUNET_DHT_Handle *dht_handle;
+
+/**
+ * Global handle of the configuration
+ */
+static const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+/**
+ * Handle for the get request
+ */
+static struct GNUNET_DHT_MonitorHandle *monitor_handle;
+
+/**
+ * Count of messages received
+ */
+static unsigned int result_count;
+
+/**
+ * Global status value
+ */
+static int ret;
+
+
+/**
+ * Function called on shutdown, disconnects from DHT if necessary.
+ *
+ * @param cls closure (unused)
+ * @param tc Task Context
+ */
+static void
+shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (verbose)
+ FPRINTF (stderr, "%s", "Shutting down!\n");
+ if (dht_handle != NULL)
+ {
+ GNUNET_DHT_disconnect (dht_handle);
+ dht_handle = NULL;
+ }
+}
+
+
+/**
+ * Stop monitoring request and start shutdown
+ *
+ * @param cls closure (unused)
+ * @param tc Task Context
+ */
+static void
+cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ if (verbose)
+ FPRINTF (stderr, "%s", "Cleaning up!\n");
+ if (monitor_handle != NULL)
+ {
+ GNUNET_DHT_monitor_stop (monitor_handle);
+ monitor_handle = NULL;
+ }
+ GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
+}
+
+
+/**
+ * Callback called on each GET request going through the DHT.
+ *
+ * @param cls Closure.
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the GET path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param key Key of the requested data.
+ */
+void
+get_callback (void *cls,
+ enum GNUNET_DHT_RouteOption options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ const GNUNET_HashCode * key)
+{
+ FPRINTF (stdout, "Result %d, operation: %s, type %d\n Key: %s",
+ result_count,
+ "GET",
+ type,
+ GNUNET_h2s_full(key));
+ result_count++;
+}
+
+/**
+ * Callback called on each GET reply going through the DHT.
+ *
+ * @param cls Closure.
+ * @param type The type of data in the result.
+ * @param get_path Peers on GET path (or NULL if not recorded).
+ * @param get_path_length number of entries in get_path.
+ * @param put_path peers on the PUT path (or NULL if not recorded).
+ * @param put_path_length number of entries in get_path.
+ * @param exp Expiration time of the data.
+ * @param key Key of the data.
+ * @param data Pointer to the result data.
+ * @param size Number of bytes in data.
+ */
+void
+get_resp_callback (void *cls,
+ enum GNUNET_BLOCK_Type type,
+ const struct GNUNET_PeerIdentity *get_path,
+ unsigned int get_path_length,
+ const struct GNUNET_PeerIdentity *put_path,
+ unsigned int put_path_length,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const void *data,
+ size_t size)
+{
+ FPRINTF (stdout, "Result %d, operation: %s, type %d:\n Key: %s\n %.*s\n",
+ result_count,
+ "GET_RESP",
+ type,
+ GNUNET_h2s_full(key),
+ (unsigned int) size,
+ (char *) data);
+ result_count++;
+}
+
+/**
+ * Callback called on each PUT request going through the DHT.
+ *
+ * @param cls Closure.
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the PUT path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param exp Expiration time of the data.
+ * @param key Key under which data is to be stored.
+ * @param data Pointer to the data carried.
+ * @param size Number of bytes in data.
+ */
+void
+put_callback (void *cls,
+ enum GNUNET_DHT_RouteOption options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const void *data,
+ size_t size)
+{
+ FPRINTF (stdout, "Result %d, operation: %s, type %d:\n Key: %s\n %.*s\n",
+ result_count,
+ "PUT",
+ type,
+ GNUNET_h2s_full(key),
+ (unsigned int) size,
+ (char *) data);
+ result_count++;
+}
+
+/**
+ * Main function that will be run by the scheduler.
+ *
+ * @param cls closure
+ * @param args remaining command-line arguments
+ * @param cfgfile name of the configuration file used (for saving, can be NULL!)
+ * @param c configuration
+ */
+static void
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *c)
+{
+ struct GNUNET_TIME_Relative timeout;
+ GNUNET_HashCode *key;
+
+ cfg = c;
+
+ dht_handle = GNUNET_DHT_connect (cfg, 1);
+
+ if (dht_handle == NULL)
+ {
+ if (verbose)
+ FPRINTF (stderr, "%s", "Couldn't connect to DHT service!\n");
+ ret = 1;
+ return;
+ }
+ else if (verbose)
+ FPRINTF (stderr, "%s", "Connected to DHT service!\n");
+
+ if (block_type == GNUNET_BLOCK_TYPE_ANY) /* Type of data not set */
+ block_type = GNUNET_BLOCK_TYPE_TEST;
+
+ if (query_key != NULL) {
+ key = GNUNET_malloc (sizeof(GNUNET_HashCode));
+ GNUNET_CRYPTO_hash (query_key, strlen (query_key), key);
+ }
+ else
+ key = NULL;
+
+ if (0 != timeout_request)
+ {
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS,
+ timeout_request);
+ if (verbose)
+ FPRINTF (stderr, "Monitoring for %llus\n", timeout_request);
+ }
+ else
+ {
+ timeout = GNUNET_TIME_UNIT_FOREVER_REL;
+ if (verbose)
+ FPRINTF (stderr, "%s", "Monitoring indefinitely (close with Ctrl+C)\n");
+ }
+
+ GNUNET_SCHEDULER_add_delayed (timeout, &cleanup_task, NULL);
+ if (verbose)
+ FPRINTF (stderr, "Issuing MONITOR request for %s!\n", query_key);
+ monitor_handle = GNUNET_DHT_monitor_start (dht_handle,
+ block_type,
+ key,
+ &get_callback,
+ &get_resp_callback,
+ &put_callback,
+ NULL);
+ if (verbose)
+ FPRINTF (stderr, "%s", "MONITOR started!\n");
+ GNUNET_free_non_null (key);
+
+}
+
+
+/**
+ * gnunet-dht-get command line options
+ */
+static struct GNUNET_GETOPT_CommandLineOption options[] = {
+ {'k', "key", "KEY",
+ gettext_noop ("the query key"),
+ 1, &GNUNET_GETOPT_set_string, &query_key},
+ {'t', "type", "TYPE",
+ gettext_noop ("the type of data to look for"),
+ 1, &GNUNET_GETOPT_set_uint, &block_type},
+ {'T', "timeout", "TIMEOUT",
+ gettext_noop ("how long to execute? 0 = forever"),
+ 1, &GNUNET_GETOPT_set_ulong, &timeout_request},
+ {'V', "verbose", NULL,
+ gettext_noop ("be verbose (print progress information)"),
+ 0, &GNUNET_GETOPT_set_one, &verbose},
+ GNUNET_GETOPT_OPTION_END
+};
+
+
+/**
+ * Entry point for gnunet-dht-monitor
+ *
+ * @param argc number of arguments from the command line
+ * @param argv command line arguments
+ * @return 0 ok, 1 on error
+ */
+int
+main (int argc, char *const *argv)
+{
+ return (GNUNET_OK ==
+ GNUNET_PROGRAM_run (argc, argv, "gnunet-dht-get",
+ gettext_noop
+ ("Prints all packets that go through the DHT."),
+ options, &run, NULL)) ? ret : 1;
+}
+
+/* end of gnunet-dht-monitor.c */
diff --git a/src/dht/gnunet-dht-put.c b/src/dht/gnunet-dht-put.c
index ef5ae5e..59acc79 100644
--- a/src/dht/gnunet-dht-put.c
+++ b/src/dht/gnunet-dht-put.c
@@ -80,7 +80,7 @@ static char *data;
static void
shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- if (dht_handle != NULL)
+ if (NULL != dht_handle)
{
GNUNET_DHT_disconnect (dht_handle);
dht_handle = NULL;
@@ -91,13 +91,33 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
* Signature of the main function of a task.
*
* @param cls closure
- * @param tc context information (why was this task triggered now)
+ * @param success GNUNET_OK if the PUT was transmitted,
+ * GNUNET_NO on timeout,
+ * GNUNET_SYSERR on disconnect from service
+ * after the PUT message was transmitted
+ * (so we don't know if it was received or not)
*/
-void
-message_sent_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+static void
+message_sent_cont (void *cls, int success)
{
if (verbose)
- FPRINTF (stderr, "%s", _("PUT request sent!\n"));
+ {
+ switch (success)
+ {
+ case GNUNET_OK:
+ FPRINTF (stderr, "%s", _("PUT request sent!\n"));
+ break;
+ case GNUNET_NO:
+ FPRINTF (stderr, "%s", _("Timeout sending PUT request!\n"));
+ break;
+ case GNUNET_SYSERR:
+ FPRINTF (stderr, "%s", _("PUT request not confirmed!\n"));
+ break;
+ default:
+ GNUNET_break (0);
+ break;
+ }
+ }
GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
}
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index 96fcd34..d897d1f 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -87,7 +87,7 @@ struct ClientList
* Handle to the current transmission request, NULL
* if none pending.
*/
- struct GNUNET_CONNECTION_TransmitHandle *transmit_handle;
+ struct GNUNET_SERVER_TransmitHandle *transmit_handle;
/**
* Linked list of pending messages for this client
@@ -204,6 +204,21 @@ struct ClientMonitorRecord
GNUNET_HashCode *key;
/**
+ * Flag whether to notify about GET messages.
+ */
+ int16_t get;
+
+ /**
+ * Flag whether to notify about GET_REPONSE messages.
+ */
+ int16_t get_resp;
+
+ /**
+ * Flag whether to notify about PUT messages.
+ */
+ uint16_t put;
+
+ /**
* Client to notify of these requests.
*/
struct ClientList *client;
@@ -247,6 +262,31 @@ static GNUNET_SCHEDULER_TaskIdentifier retry_task;
/**
+ * Task run to check for messages that need to be sent to a client.
+ *
+ * @param client a ClientList, containing the client and any messages to be sent to it
+ */
+static void
+process_pending_messages (struct ClientList *client);
+
+
+/**
+ * Add a PendingMessage to the clients list of messages to be sent
+ *
+ * @param client the active client to send the message to
+ * @param pending_message the actual message to send
+ */
+static void
+add_pending_message (struct ClientList *client,
+ struct PendingMessage *pending_message)
+{
+ GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
+ pending_message);
+ process_pending_messages (client);
+}
+
+
+/**
* Find a client if it exists, add it otherwise.
*
* @param client the server handle to the client
@@ -289,11 +329,9 @@ remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
if (record->client != client)
return GNUNET_YES;
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Removing client %p's record for key %s\n", client,
GNUNET_h2s (key));
-#endif
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (forward_map, key,
record));
@@ -320,13 +358,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
struct PendingMessage *reply;
struct ClientMonitorRecord *monitor;
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client);
-#endif
pos = find_active_client (client);
GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos);
if (pos->transmit_handle != NULL)
- GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle);
+ GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle);
while (NULL != (reply = pos->pending_head))
{
GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply);
@@ -449,6 +485,8 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_DHT_ClientPutMessage *dht_msg;
struct GNUNET_CONTAINER_BloomFilter *peer_bf;
uint16_t size;
+ struct PendingMessage *pm;
+ struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
size = ntohs (message->size);
if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
@@ -463,12 +501,10 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
GNUNET_NO);
dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
/* give to local clients */
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Handling local PUT of %u-bytes for query %s\n",
size - sizeof (struct GNUNET_DHT_ClientPutMessage),
GNUNET_h2s (&dht_msg->key));
-#endif
GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
&dht_msg->key, 0, NULL, 0, NULL,
ntohl (dht_msg->type),
@@ -490,7 +526,26 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1],
size -
sizeof (struct GNUNET_DHT_ClientPutMessage));
+ GDS_CLIENTS_process_put (ntohl (dht_msg->options),
+ ntohl (dht_msg->type),
+ 0,
+ ntohl (dht_msg->desired_replication_level),
+ 1,
+ GDS_NEIGHBOURS_get_id(),
+ GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
+ &dht_msg->key,
+ &dht_msg[1],
+ size - sizeof (struct GNUNET_DHT_ClientPutMessage));
GNUNET_CONTAINER_bloomfilter_free (peer_bf);
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) +
+ sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
+ conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1];
+ conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage));
+ conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
+ conf->reserved = htonl (0);
+ conf->unique_id = dht_msg->unique_id;
+ pm->msg = &conf->header;
+ add_pending_message (find_active_client (client), pm);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -528,11 +583,9 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
gettext_noop
("# GET requests received from clients"), 1,
GNUNET_NO);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received request for %s from local client %p\n",
GNUNET_h2s (&get->key), client);
-#endif
cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
cqr->key = get->key;
cqr->client = find_active_client (client);
@@ -548,6 +601,13 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
cqr->type = ntohl (get->type);
GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GDS_CLIENTS_process_get (ntohl (get->options),
+ ntohl (get->type),
+ 0,
+ ntohl (get->desired_replication_level),
+ 1,
+ GDS_NEIGHBOURS_get_id(),
+ &get->key);
/* start remote requests */
if (GNUNET_SCHEDULER_NO_TASK != retry_task)
GNUNET_SCHEDULER_cancel (retry_task);
@@ -593,11 +653,9 @@ remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
if (record->unique_id != ctx->unique_id)
return GNUNET_YES;
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Removing client %p's record for key %s (by unique id)\n",
ctx->client->client_handle, GNUNET_h2s (key));
-#endif
return remove_client_records (ctx->client, key, record);
}
@@ -623,10 +681,8 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
gettext_noop
("# GET STOP requests received from clients"), 1,
GNUNET_NO);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key %s\n",
client, GNUNET_h2s (&dht_stop_msg->key));
-#endif
ctx.client = find_active_client (client);
ctx.unique_id = dht_stop_msg->unique_id;
GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key,
@@ -636,7 +692,7 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client,
/**
- * Handler for monitor messages
+ * Handler for monitor start messages
*
* @param cls closure for the service
* @param client the client we received this message from
@@ -648,37 +704,74 @@ handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message)
{
struct ClientMonitorRecord *r;
- const struct GNUNET_DHT_MonitorMessage *msg;
- unsigned int i;
- char *c;
+ const struct GNUNET_DHT_MonitorStartStopMessage *msg;
- msg = (struct GNUNET_DHT_MonitorMessage *) message;
+ msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
r = GNUNET_malloc (sizeof(struct ClientMonitorRecord));
r->client = find_active_client(client);
r->type = ntohl(msg->type);
- c = (char *) &msg->key;
- for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++);
- if (sizeof (GNUNET_HashCode) == i)
- r->key = NULL;
+ r->get = ntohs(msg->get);
+ r->get_resp = ntohs(msg->get_resp);
+ r->put = ntohs(msg->put);
+ if (0 == ntohs(msg->filter_key))
+ r->key = NULL;
else
{
r->key = GNUNET_malloc (sizeof (GNUNET_HashCode));
memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode));
}
GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r);
- // FIXME add remove somewhere
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
-
/**
- * Task run to check for messages that need to be sent to a client.
+ * Handler for monitor stop messages
+ *
+ * @param cls closure for the service
+ * @param client the client we received this message from
+ * @param message the actual message received
*
- * @param client a ClientList, containing the client and any messages to be sent to it
*/
static void
-process_pending_messages (struct ClientList *client);
+handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct ClientMonitorRecord *r;
+ const struct GNUNET_DHT_MonitorStartStopMessage *msg;
+ int keys_match;
+
+ msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
+ r = monitor_head;
+
+ while (NULL != r)
+ {
+ if (NULL == r->key)
+ keys_match = (0 == ntohs(msg->filter_key));
+ else
+ {
+ keys_match = (0 != ntohs(msg->filter_key)
+ && !memcmp(r->key, &msg->key, sizeof(GNUNET_HashCode)));
+ }
+ if (find_active_client(client) == r->client
+ && ntohl(msg->type) == r->type
+ && r->get == msg->get
+ && r->get_resp == msg->get_resp
+ && r->put == msg->put
+ && keys_match
+ )
+ {
+ GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r);
+ GNUNET_free_non_null (r->key);
+ GNUNET_free (r);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+ return; /* Delete only ONE entry */
+ }
+ r = r->next;
+ }
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
/**
@@ -706,11 +799,9 @@ send_reply_to_client (void *cls, size_t size, void *buf)
if (buf == NULL)
{
/* client disconnected */
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Client %p disconnected, pending messages will be discarded\n",
client->client_handle);
-#endif
return 0;
}
off = 0;
@@ -721,17 +812,13 @@ send_reply_to_client (void *cls, size_t size, void *buf)
reply);
memcpy (&cbuf[off], reply->msg, msize);
GNUNET_free (reply);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n",
msize, client->client_handle);
-#endif
off += msize;
}
process_pending_messages (client);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n",
(unsigned int) off, (unsigned int) size, client->client_handle);
-#endif
return off;
}
@@ -746,20 +833,16 @@ process_pending_messages (struct ClientList *client)
{
if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Not asking for transmission to %p now: %s\n",
client->client_handle,
client->pending_head ==
NULL ? "no more messages" : "request already pending");
-#endif
return;
}
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking for transmission of %u bytes to client %p\n",
ntohs (client->pending_head->msg->size), client->client_handle);
-#endif
client->transmit_handle =
GNUNET_SERVER_notify_transmit_ready (client->client_handle,
ntohs (client->pending_head->
@@ -770,22 +853,6 @@ process_pending_messages (struct ClientList *client)
/**
- * Add a PendingMessage to the clients list of messages to be sent
- *
- * @param client the active client to send the message to
- * @param pending_message the actual message to send
- */
-static void
-add_pending_message (struct ClientList *client,
- struct PendingMessage *pending_message)
-{
- GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail,
- pending_message);
- process_pending_messages (client);
-}
-
-
-/**
* Closure for 'forward_reply'
*/
struct ForwardReplyContext
@@ -844,11 +911,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type))
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Record type missmatch, not passing request for key %s to local client\n",
GNUNET_h2s (key));
-#endif
GNUNET_STATISTICS_update (GDS_stats,
gettext_noop
("# Key match, type mismatches in REPLY to CLIENT"),
@@ -859,11 +924,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
for (i = 0; i < record->seen_replies_count; i++)
if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode)))
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Duplicate reply, not passing request for key %s to local client\n",
GNUNET_h2s (key));
-#endif
GNUNET_STATISTICS_update (GDS_stats,
gettext_noop
("# Duplicate REPLIES to CLIENT request dropped"),
@@ -874,11 +937,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0,
record->xquery, record->xquery_size, frc->data,
frc->data_size);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Evaluation result is %d for key %s for local client's query\n",
(int) eval, GNUNET_h2s (key));
-#endif
switch (eval)
{
case GNUNET_BLOCK_EVALUATION_OK_LAST:
@@ -929,11 +990,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value)
GNUNET_NO);
reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
reply->unique_id = record->unique_id;
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Queueing reply to query %s for client %p\n", GNUNET_h2s (key),
record->client->client_handle);
-#endif
add_pending_message (record->client, pm);
if (GNUNET_YES == do_free)
remove_client_records (record->client, key, record);
@@ -1027,33 +1086,101 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
/**
- * Check if some client is monitoring messages of this type and notify
- * him in that case.
+ * Check if some client is monitoring GET messages and notify
+ * them in that case.
*
- * @param mtype Type of the DHT message.
- * @param exp When will this value expire.
- * @param key Key of the result/request.
- * @param putl number of entries in get_path.
- * @param put_path peers on the PUT path (or NULL if not recorded).
- * @param getl number of entries in get_path.
- * @param get_path Peers on reply path (or NULL if not recorded).
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the GET path (or NULL if not recorded).
* @param desired_replication_level Desired replication level.
- * @param type Type of the result/request.
+ * @param key Key of the requested data.
+ */
+void
+GDS_CLIENTS_process_get (uint32_t options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ const GNUNET_HashCode * key)
+{
+ struct ClientMonitorRecord *m;
+ struct ClientList **cl;
+ unsigned int cl_size;
+
+ cl = NULL;
+ cl_size = 0;
+ for (m = monitor_head; NULL != m; m = m->next)
+ {
+ if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
+ (NULL == m->key ||
+ memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
+ {
+ struct PendingMessage *pm;
+ struct GNUNET_DHT_MonitorGetMessage *mmsg;
+ struct GNUNET_PeerIdentity *msg_path;
+ size_t msize;
+ unsigned int i;
+
+ /* Don't send duplicates */
+ for (i = 0; i < cl_size; i++)
+ if (cl[i] == m->client)
+ break;
+ if (i < cl_size)
+ continue;
+ GNUNET_array_append (cl, cl_size, m->client);
+
+ msize = path_length * sizeof (struct GNUNET_PeerIdentity);
+ msize += sizeof (struct GNUNET_DHT_MonitorGetMessage);
+ msize += sizeof (struct PendingMessage);
+ pm = (struct PendingMessage *) GNUNET_malloc (msize);
+ mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1];
+ pm->msg = &mmsg->header;
+ mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
+ mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
+ mmsg->options = htonl(options);
+ mmsg->type = htonl(type);
+ mmsg->hop_count = htonl(hop_count);
+ mmsg->desired_replication_level = htonl(desired_replication_level);
+ mmsg->get_path_length = htonl(path_length);
+ memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
+ msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+ if (path_length > 0)
+ memcpy (msg_path, path,
+ path_length * sizeof (struct GNUNET_PeerIdentity));
+ add_pending_message (m->client, pm);
+ }
+ }
+ GNUNET_free_non_null (cl);
+}
+
+
+/**
+ * Check if some client is monitoring GET RESP messages and notify
+ * them in that case.
+ *
+ * @param type The type of data in the result.
+ * @param get_path Peers on GET path (or NULL if not recorded).
+ * @param get_path_length number of entries in get_path.
+ * @param put_path peers on the PUT path (or NULL if not recorded).
+ * @param put_path_length number of entries in get_path.
+ * @param exp Expiration time of the data.
+ * @param key Key of the data.
* @param data Pointer to the result data.
* @param size Number of bytes in data.
*/
void
-GDS_CLIENTS_process_monitor (uint16_t mtype,
- const struct GNUNET_TIME_Absolute exp,
- const GNUNET_HashCode *key,
- uint32_t putl,
- const struct GNUNET_PeerIdentity *put_path,
- uint32_t getl,
- const struct GNUNET_PeerIdentity *get_path,
- uint32_t desired_replication_level,
- enum GNUNET_BLOCK_Type type,
- const struct GNUNET_MessageHeader *data,
- uint16_t size)
+GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
+ const struct GNUNET_PeerIdentity *get_path,
+ unsigned int get_path_length,
+ const struct GNUNET_PeerIdentity *put_path,
+ unsigned int put_path_length,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const void *data,
+ size_t size)
{
struct ClientMonitorRecord *m;
struct ClientList **cl;
@@ -1068,7 +1195,7 @@ GDS_CLIENTS_process_monitor (uint16_t mtype,
memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
{
struct PendingMessage *pm;
- struct GNUNET_DHT_MonitorMessage *mmsg;
+ struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
struct GNUNET_PeerIdentity *path;
size_t msize;
unsigned int i;
@@ -1082,29 +1209,116 @@ GDS_CLIENTS_process_monitor (uint16_t mtype,
GNUNET_array_append (cl, cl_size, m->client);
msize = size;
- msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity);
- msize += sizeof (struct GNUNET_DHT_MonitorMessage);
+ msize += (get_path_length + put_path_length)
+ * sizeof (struct GNUNET_PeerIdentity);
+ msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage);
msize += sizeof (struct PendingMessage);
pm = (struct PendingMessage *) GNUNET_malloc (msize);
- mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1];
+ mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1];
pm->msg = (struct GNUNET_MessageHeader *) mmsg;
mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
- mmsg->header.type = htons (mtype);
- mmsg->expiration = GNUNET_TIME_absolute_hton(exp);
- memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
- mmsg->put_path_length = htonl(putl);
- mmsg->get_path_length = htonl(getl);
- mmsg->desired_replication_level = htonl (desired_replication_level);
+ mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
+ mmsg->type = htonl(type);
+ mmsg->put_path_length = htonl(put_path_length);
+ mmsg->get_path_length = htonl(get_path_length);
path = (struct GNUNET_PeerIdentity *) &mmsg[1];
- if (putl > 0)
+ if (put_path_length > 0)
+ {
+ memcpy (path, put_path,
+ put_path_length * sizeof (struct GNUNET_PeerIdentity));
+ path = &path[put_path_length];
+ }
+ if (get_path_length > 0)
+ memcpy (path, get_path,
+ get_path_length * sizeof (struct GNUNET_PeerIdentity));
+ mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
+ memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
+ if (size > 0)
+ memcpy (&path[get_path_length], data, size);
+ add_pending_message (m->client, pm);
+ }
+ }
+ GNUNET_free_non_null (cl);
+}
+
+
+/**
+ * Check if some client is monitoring PUT messages and notify
+ * them in that case.
+ *
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the PUT path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param exp Expiration time of the data.
+ * @param key Key under which data is to be stored.
+ * @param data Pointer to the data carried.
+ * @param size Number of bytes in data.
+ */
+void
+GDS_CLIENTS_process_put (uint32_t options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const void *data,
+ size_t size)
+{
+ struct ClientMonitorRecord *m;
+ struct ClientList **cl;
+ unsigned int cl_size;
+
+ cl = NULL;
+ cl_size = 0;
+ for (m = monitor_head; NULL != m; m = m->next)
+ {
+ if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
+ (NULL == m->key ||
+ memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
+ {
+ struct PendingMessage *pm;
+ struct GNUNET_DHT_MonitorPutMessage *mmsg;
+ struct GNUNET_PeerIdentity *msg_path;
+ size_t msize;
+ unsigned int i;
+
+ /* Don't send duplicates */
+ for (i = 0; i < cl_size; i++)
+ if (cl[i] == m->client)
+ break;
+ if (i < cl_size)
+ continue;
+ GNUNET_array_append (cl, cl_size, m->client);
+
+ msize = size;
+ msize += path_length * sizeof (struct GNUNET_PeerIdentity);
+ msize += sizeof (struct GNUNET_DHT_MonitorPutMessage);
+ msize += sizeof (struct PendingMessage);
+ pm = (struct PendingMessage *) GNUNET_malloc (msize);
+ mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1];
+ pm->msg = (struct GNUNET_MessageHeader *) mmsg;
+ mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
+ mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
+ mmsg->options = htonl(options);
+ mmsg->type = htonl(type);
+ mmsg->hop_count = htonl(hop_count);
+ mmsg->desired_replication_level = htonl(desired_replication_level);
+ mmsg->put_path_length = htonl(path_length);
+ msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
+ if (path_length > 0)
{
- memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity));
- path = &path[putl];
+ memcpy (msg_path, path,
+ path_length * sizeof (struct GNUNET_PeerIdentity));
}
- if (getl > 0)
- memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity));
+ mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
+ memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
if (size > 0)
- memcpy (&path[getl], data, size);
+ memcpy (&msg_path[path_length], data, size);
add_pending_message (m->client, pm);
}
}
@@ -1129,8 +1343,11 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
{&handle_dht_local_monitor, NULL,
- GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
- sizeof (struct GNUNET_DHT_MonitorMessage)},
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_START,
+ sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
+ {&handle_dht_local_monitor_stop, NULL,
+ GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
+ sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
{NULL, NULL, 0, 0}
};
forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
@@ -1153,12 +1370,18 @@ GDS_CLIENTS_done ()
GNUNET_SCHEDULER_cancel (retry_task);
retry_task = GNUNET_SCHEDULER_NO_TASK;
}
- GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
- GNUNET_CONTAINER_heap_destroy (retry_heap);
- retry_heap = NULL;
- GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
- GNUNET_CONTAINER_multihashmap_destroy (forward_map);
- forward_map = NULL;
+ if (NULL != retry_heap)
+ {
+ GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
+ GNUNET_CONTAINER_heap_destroy (retry_heap);
+ retry_heap = NULL;
+ }
+ if (NULL != forward_map)
+ {
+ GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
+ GNUNET_CONTAINER_multihashmap_destroy (forward_map);
+ forward_map = NULL;
+ }
}
/* end of gnunet-service-dht_clients.c */
diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h
index a477456..9f3d2dd 100644
--- a/src/dht/gnunet-service-dht_clients.h
+++ b/src/dht/gnunet-service-dht_clients.h
@@ -57,33 +57,77 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration,
/**
- * Check if some client is monitoring messages of this type and notify
- * him in that case.
+ * Check if some client is monitoring GET messages and notify
+ * them in that case.
*
- * @param mtype Type of the DHT message.
- * @param exp When will this value expire.
- * @param key Key of the result/request.
- * @param putl number of entries in get_path.
- * @param put_path peers on the PUT path (or NULL if not recorded).
- * @param getl number of entries in get_path.
- * @param get_path Peers on reply path (or NULL if not recorded).
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the GET path (or NULL if not recorded).
* @param desired_replication_level Desired replication level.
- * @param type Type of the result/request.
+ * @param key Key of the requested data.
+ */
+void
+GDS_CLIENTS_process_get (uint32_t options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ const GNUNET_HashCode * key);
+
+/**
+ * Check if some client is monitoring GET RESP messages and notify
+ * them in that case.
+ *
+ * @param type The type of data in the result.
+ * @param get_path Peers on GET path (or NULL if not recorded).
+ * @param get_path_length number of entries in get_path.
+ * @param put_path peers on the PUT path (or NULL if not recorded).
+ * @param put_path_length number of entries in get_path.
+ * @param exp Expiration time of the data.
+ * @param key Key of the data.
* @param data Pointer to the result data.
* @param size Number of bytes in data.
*/
void
-GDS_CLIENTS_process_monitor (uint16_t mtype,
- const struct GNUNET_TIME_Absolute exp,
- const GNUNET_HashCode *key,
- uint32_t putl,
- const struct GNUNET_PeerIdentity *put_path,
- uint32_t getl,
- const struct GNUNET_PeerIdentity *get_path,
- uint32_t desired_replication_level,
- enum GNUNET_BLOCK_Type type,
- const struct GNUNET_MessageHeader *data,
- uint16_t size);
+GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
+ const struct GNUNET_PeerIdentity *get_path,
+ unsigned int get_path_length,
+ const struct GNUNET_PeerIdentity *put_path,
+ unsigned int put_path_length,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const void *data,
+ size_t size);
+
+/**
+ * Check if some client is monitoring PUT messages and notify
+ * them in that case.
+ *
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the PUT path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param exp Expiration time of the data.
+ * @param key Key under which data is to be stored.
+ * @param data Pointer to the data carried.
+ * @param size Number of bytes in data.
+ */
+void
+GDS_CLIENTS_process_put (uint32_t options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const void *data,
+ size_t size);
/**
* Initialize client subsystem.
diff --git a/src/dht/gnunet-service-dht_datacache.c b/src/dht/gnunet-service-dht_datacache.c
index 82cd067..4d1dd6f 100644
--- a/src/dht/gnunet-service-dht_datacache.c
+++ b/src/dht/gnunet-service-dht_datacache.c
@@ -193,11 +193,9 @@ datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
GNUNET_BLOCK_evaluate (GDS_block_context, type, key, ctx->reply_bf,
ctx->reply_bf_mutator, ctx->xquery,
ctx->xquery_size, rdata, rdata_size);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Found reply for query %s in datacache, evaluation result is %d\n",
GNUNET_h2s (key), (int) eval);
-#endif
ctx->eval = eval;
switch (eval)
{
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c
index 4ea5dd6..083b499 100644
--- a/src/dht/gnunet-service-dht_neighbours.c
+++ b/src/dht/gnunet-service-dht_neighbours.c
@@ -524,11 +524,9 @@ add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value)
GNUNET_HashCode mh;
GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n",
GNUNET_h2s (key), ctx->bf_mutator);
-#endif
GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh);
return GNUNET_YES;
}
@@ -615,10 +613,8 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
/* Check for connect to self message */
if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
return;
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected %s to %s\n",
GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
-#endif
if (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_contains (all_known_peers,
&peer->hashPubKey))
@@ -626,7 +622,7 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer,
GNUNET_break (0);
return;
}
- GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), 1,
+ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1,
GNUNET_NO);
peer_bucket = find_bucket (&peer->hashPubKey);
GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS));
@@ -675,10 +671,8 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
/* Check for disconnect from self message */
if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity)))
return;
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s from %s\n",
GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey));
-#endif
to_remove =
GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey);
if (NULL == to_remove)
@@ -686,7 +680,7 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer)
GNUNET_break (0);
return;
}
- GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), -1,
+ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1,
GNUNET_NO);
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (all_known_peers,
@@ -1030,11 +1024,9 @@ select_peer (const GNUNET_HashCode * key,
}
else
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Excluded peer `%s' due to BF match in greedy routing for %s\n",
GNUNET_i2s (&pos->id), GNUNET_h2s (key));
-#endif
GNUNET_STATISTICS_update (GDS_stats,
gettext_noop
("# Peers excluded from routing due to Bloomfilter"),
@@ -1067,11 +1059,9 @@ select_peer (const GNUNET_HashCode * key,
gettext_noop
("# Peers excluded from routing due to Bloomfilter"),
1, GNUNET_NO);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Excluded peer `%s' due to BF match in random routing for %s\n",
GNUNET_i2s (&pos->id), GNUNET_h2s (key));
-#endif
pos = pos->next;
continue; /* Ignore bloomfiltered peers */
}
@@ -1154,12 +1144,10 @@ get_target_peers (const GNUNET_HashCode * key,
&nxt->id.hashPubKey));
GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey);
}
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Selected %u/%u peers at hop %u for %s (target was %u)\n", off,
GNUNET_CONTAINER_multihashmap_size (all_known_peers),
(unsigned int) hop_count, GNUNET_h2s (key), ret);
-#endif
if (0 == off)
{
GNUNET_free (rtargets);
@@ -1212,11 +1200,9 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
struct GNUNET_PeerIdentity *pp;
GNUNET_assert (NULL != bf);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding myself (%s) to PUT bloomfilter for %s\n",
GNUNET_i2s (&my_identity), GNUNET_h2s (key));
-#endif
GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey);
GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"),
1, GNUNET_NO);
@@ -1225,12 +1211,10 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
&targets);
if (0 == target_count)
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing PUT for %s terminates after %u hops at %s\n",
GNUNET_h2s (key), (unsigned int) hop_count,
GNUNET_i2s (&my_identity));
-#endif
return;
}
msize =
@@ -1254,11 +1238,9 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type,
for (i = 0; i < target_count; i++)
{
target = targets[i];
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key),
(unsigned int) hop_count, GNUNET_i2s (&target->id));
-#endif
pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
pending->importance = 0; /* FIXME */
pending->timeout = expiration_time;
@@ -1335,20 +1317,16 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
target_count =
get_target_peers (key, peer_bf, hop_count, desired_replication_level,
&targets);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding myself (%s) to GET bloomfilter for %s\n",
GNUNET_i2s (&my_identity), GNUNET_h2s (key));
-#endif
GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey);
if (0 == target_count)
{
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing GET for %s terminates after %u hops at %s\n",
GNUNET_h2s (key), (unsigned int) hop_count,
GNUNET_i2s (&my_identity));
-#endif
return;
}
reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf);
@@ -1367,11 +1345,9 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type,
for (i = 0; i < target_count; i++)
{
target = targets[i];
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key),
(unsigned int) hop_count, GNUNET_i2s (&target->id));
-#endif
pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize);
pending->importance = 0; /* FIXME */
pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT);
@@ -1578,10 +1554,8 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
/* cannot verify, good luck */
break;
}
-#if DEBUG_DHT
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for %s at %s\n",
- GNUNET_h2s (&put->key), GNUNET_i2s (&my_identity));
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n",
+ GNUNET_h2s (&put->key), GNUNET_i2s (peer));
bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE,
GNUNET_CONSTANTS_BLOOMFILTER_K);
GNUNET_break_op (GNUNET_YES ==
@@ -1617,10 +1591,15 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer,
pp, payload, payload_size);
}
GNUNET_CONTAINER_bloomfilter_free (bf);
- GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT,
- GNUNET_TIME_absolute_ntoh (put->expiration_time), &put->key,
- putlen, put_path, 0, NULL, ntohl(put->desired_replication_level),
- ntohl (put->type), payload, payload_size);
+ GDS_CLIENTS_process_put (options,
+ ntohl (put->type),
+ ntohl (put->hop_count),
+ ntohl (put->desired_replication_level),
+ putlen, put_path,
+ GNUNET_TIME_absolute_ntoh (put->expiration_time),
+ &put->key,
+ payload,
+ payload_size);
return GNUNET_YES;
}
@@ -1795,11 +1774,9 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
/* remember request for routing replies */
GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size,
reply_bf, get->bf_mutator);
-#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n",
GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity),
(unsigned int) ntohl (get->hop_count));
-#endif
/* local lookup (this may update the reply_bf) */
if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) ||
(am_closest_peer (&get->key, peer_bf)))
@@ -1826,9 +1803,13 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer,
1, GNUNET_NO);
}
- GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET,
- GNUNET_TIME_UNIT_FOREVER_ABS, &get->key, 0, NULL, 0, NULL,
- ntohl (get->desired_replication_level), type, NULL, 0);
+ /* FIXME Path */
+ GDS_CLIENTS_process_get (options,
+ type,
+ ntohl(get->hop_count),
+ ntohl(get->desired_replication_level),
+ 0, NULL,
+ &get->key);
/* P2P forwarding */
if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST)
@@ -1962,10 +1943,16 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer,
xget_path, data, data_size);
}
- GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP,
- GNUNET_TIME_absolute_ntoh (prm->expiration_time), &prm->key,
- put_path_length, put_path, get_path_length, get_path,
- 0, type, data, data_size);
+ GDS_CLIENTS_process_get_resp (type,
+ get_path,
+ get_path_length,
+ put_path,
+ put_path_length,
+ GNUNET_TIME_absolute_ntoh (
+ prm->expiration_time),
+ &prm->key,
+ data,
+ data_size);
return GNUNET_YES;
}
@@ -2025,5 +2012,16 @@ GDS_NEIGHBOURS_done ()
}
}
+/**
+ * Get the ID of the local node.
+ *
+ * @return identity of the local node
+ */
+struct GNUNET_PeerIdentity *
+GDS_NEIGHBOURS_get_id ()
+{
+ return &my_identity;
+}
+
/* end of gnunet-service-dht_neighbours.c */
diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h
index b6e0f0e..3297638 100644
--- a/src/dht/gnunet-service-dht_neighbours.h
+++ b/src/dht/gnunet-service-dht_neighbours.h
@@ -135,4 +135,13 @@ void
GDS_NEIGHBOURS_done (void);
+/**
+ * Get the ID of the local node.
+ *
+ * @return identity of the local node
+ */
+struct GNUNET_PeerIdentity *
+GDS_NEIGHBOURS_get_id ();
+
+
#endif
diff --git a/src/dht/gnunet-service-dht_routing.c b/src/dht/gnunet-service-dht_routing.c
index a880bf7..013d856 100644
--- a/src/dht/gnunet-service-dht_routing.c
+++ b/src/dht/gnunet-service-dht_routing.c
@@ -227,6 +227,8 @@ process (void *cls, const GNUNET_HashCode * key, void *value)
1, GNUNET_NO);
return GNUNET_SYSERR;
case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
+ GNUNET_break (0);
+ return GNUNET_OK;
case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID:
GNUNET_break (0);
return GNUNET_OK;
@@ -280,11 +282,48 @@ GDS_ROUTING_process (enum GNUNET_BLOCK_Type type,
pc.get_path = get_path;
pc.data = data;
pc.data_size = data_size;
+ if (NULL == data)
+ {
+ /* Some apps might have an 'empty' reply as a valid reply; however,
+ 'process' will call GNUNET_BLOCK_evaluate' which treats a 'NULL'
+ reply as request-validation (but we need response-validation).
+ So we set 'data' to a 0-byte non-NULL value just to be sure */
+ GNUNET_break (0 == data_size);
+ data_size = 0;
+ pc.data = ""; /* something not null */
+ }
GNUNET_CONTAINER_multihashmap_get_multiple (recent_map, key, &process, &pc);
}
/**
+ * Remove the oldest entry from the DHT routing table. Must only
+ * be called if it is known that there is at least one entry
+ * in the heap and hashmap.
+ */
+static void
+expire_oldest_entry ()
+{
+ struct RecentRequest *recent_req;
+
+ GNUNET_STATISTICS_update (GDS_stats,
+ gettext_noop
+ ("# Entries removed from routing table"), 1,
+ GNUNET_NO);
+ recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
+ GNUNET_assert (recent_req != NULL);
+ GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
+ GNUNET_CONTAINER_bloomfilter_free (recent_req->reply_bf);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (recent_map,
+ &recent_req->key,
+ recent_req));
+ GNUNET_free (recent_req);
+}
+
+
+
+/**
* Add a new entry to our routing table.
*
* @param sender peer that originated the request
@@ -308,18 +347,7 @@ GDS_ROUTING_add (const struct GNUNET_PeerIdentity *sender,
struct RecentRequest *recent_req;
while (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT)
- {
- GNUNET_STATISTICS_update (GDS_stats,
- gettext_noop
- ("# Entries removed from routing table"), 1,
- GNUNET_NO);
- recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
- GNUNET_assert (recent_req != NULL);
- GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
- GNUNET_CONTAINER_bloomfilter_free (recent_req->reply_bf);
- GNUNET_free (recent_req);
- }
-
+ expire_oldest_entry ();
GNUNET_STATISTICS_update (GDS_stats,
gettext_noop ("# Entries added to routing table"),
1, GNUNET_NO);
@@ -359,23 +387,12 @@ GDS_ROUTING_init ()
void
GDS_ROUTING_done ()
{
- struct RecentRequest *recent_req;
-
while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0)
- {
- GNUNET_STATISTICS_update (GDS_stats,
- gettext_noop
- ("# Entries removed from routing table"), 1,
- GNUNET_NO);
- recent_req = GNUNET_CONTAINER_heap_peek (recent_heap);
- GNUNET_assert (recent_req != NULL);
- GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node);
- GNUNET_CONTAINER_bloomfilter_free (recent_req->reply_bf);
- GNUNET_free (recent_req);
- }
+ expire_oldest_entry ();
GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (recent_heap));
GNUNET_CONTAINER_heap_destroy (recent_heap);
recent_heap = NULL;
+ GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (recent_map));
GNUNET_CONTAINER_multihashmap_destroy (recent_map);
recent_map = NULL;
}
diff --git a/src/dht/plugin_block_dht.c b/src/dht/plugin_block_dht.c
index 19467b9..3c016ae 100644
--- a/src/dht/plugin_block_dht.c
+++ b/src/dht/plugin_block_dht.c
@@ -65,17 +65,29 @@ block_plugin_dht_evaluate (void *cls, enum GNUNET_BLOCK_Type type,
if (type != GNUNET_BLOCK_TYPE_DHT_HELLO)
return GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED;
if (xquery_size != 0)
+ {
+ GNUNET_break_op (0);
return GNUNET_BLOCK_EVALUATION_REQUEST_INVALID;
- if (reply_block_size == 0)
+ }
+ if (NULL == reply_block)
return GNUNET_BLOCK_EVALUATION_REQUEST_VALID;
if (reply_block_size < sizeof (struct GNUNET_MessageHeader))
+ {
+ GNUNET_break_op (0);
return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
+ }
msg = reply_block;
if (reply_block_size != ntohs (msg->size))
+ {
+ GNUNET_break_op (0);
return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
+ }
hello = reply_block;
if (GNUNET_OK != GNUNET_HELLO_get_id (hello, &pid))
+ {
+ GNUNET_break_op (0);
return GNUNET_BLOCK_EVALUATION_RESULT_INVALID;
+ }
if (NULL != bf)
{
GNUNET_BLOCK_mingle_hash (&pid.hashPubKey, bf_mutator, &mhash);
diff --git a/src/dht/test_dht_2dtorus.conf b/src/dht/test_dht_2dtorus.conf
index d420b29..d7a3d8a 100644
--- a/src/dht/test_dht_2dtorus.conf
+++ b/src/dht/test_dht_2dtorus.conf
@@ -65,7 +65,7 @@ CONNECT_TIMEOUT = 60 s
CONNECT_ATTEMPTS = 3
DEBUG = YES
HOSTKEYSFILE = ../../contrib/testing_hostkeys.dat
-MAX_CONCURRENT_SSH = 10
+MAX_CONCURRENT_SSH = 20
USE_PROGRESSBARS = YES
PEERGROUP_TIMEOUT = 2400 s
TOPOLOGY_OUTPUT_FILE = 2dtorus_topo_initial
@@ -77,7 +77,7 @@ MAX_OUTSTANDING_CONNECTIONS = 75
DELETE_FILES = YES
[test_dht_topo]
-CONNECTION_LIMIT = 16
+CONNECTION_LIMIT = 20
#DATA_OUTPUT_FILE=data_output
diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c
index 182856a..000ad33 100644
--- a/src/dht/test_dht_api.c
+++ b/src/dht/test_dht_api.c
@@ -121,7 +121,7 @@ stop_arm (struct PeerContext *p)
if (0 != GNUNET_OS_process_kill (p->arm_proc, SIGTERM))
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");
GNUNET_OS_process_wait (p->arm_proc);
- GNUNET_OS_process_close (p->arm_proc);
+ GNUNET_OS_process_destroy (p->arm_proc);
p->arm_proc = NULL;
#endif
GNUNET_CONFIGURATION_destroy (p->cfg);
@@ -195,10 +195,10 @@ test_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
* Signature of the main function of a task.
*
* @param cls closure
- * @param tc context information (why was this task triggered now)
+ * @param success result of PUT
*/
static void
-test_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+test_get (void *cls, int success)
{
struct PeerContext *peer = cls;
GNUNET_HashCode hash;
@@ -212,7 +212,7 @@ test_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
retry_context.next_timeout = BASE_TIMEOUT;
peer->get_handle =
- GNUNET_DHT_get_start (peer->dht_handle, TOTAL_TIMEOUT,
+ GNUNET_DHT_get_start (peer->dht_handle,
GNUNET_BLOCK_TYPE_TEST, &hash, 1,
GNUNET_DHT_RO_NONE, NULL, 0, &test_get_iterator,
NULL);
diff --git a/src/dht/test_dht_api_data.conf b/src/dht/test_dht_api_data.conf
index 0e34eb7..032416c 100644
--- a/src/dht/test_dht_api_data.conf
+++ b/src/dht/test_dht_api_data.conf
@@ -77,7 +77,8 @@ EXTERNAL_ADDRESS = 127.0.0.1
[dns]
AUTOSTART = NO
-
+[namestore]
+AUTOSTART = NO
[nse]
AUTOSTART = NO
diff --git a/src/dht/test_dht_api_peer1.conf b/src/dht/test_dht_api_peer1.conf
index cacc4da..d9db7c4 100644
--- a/src/dht/test_dht_api_peer1.conf
+++ b/src/dht/test_dht_api_peer1.conf
@@ -10,7 +10,7 @@ AUTOSTART = YES
ACCEPT_FROM6 = ::1;
ACCEPT_FROM = 127.0.0.1;
HOSTNAME = localhost
-PORT = 2100
+PORT = 12100
BINARY = gnunet-service-dht
[block]
diff --git a/src/dht/test_dht_line.conf b/src/dht/test_dht_line.conf
index 8bcb12c..6be3559 100644
--- a/src/dht/test_dht_line.conf
+++ b/src/dht/test_dht_line.conf
@@ -80,6 +80,8 @@ DELETE_FILES = YES
CONNECTION_LIMIT = 5
#DATA_OUTPUT_FILE=data_output
+[namestore]
+AUTOSTART = NO
[nse]
WORKDELAY = 500 ms
diff --git a/src/dht/test_dht_monitor.c b/src/dht/test_dht_monitor.c
index 63af7e9..ca6704a 100644
--- a/src/dht/test_dht_monitor.c
+++ b/src/dht/test_dht_monitor.c
@@ -31,8 +31,6 @@
#include "gnunet_testing_lib.h"
#include "gnunet_dht_service.h"
-#define VERBOSE GNUNET_YES
-
#define REMOVE_DIR GNUNET_YES
@@ -137,17 +135,14 @@ shutdown_callback (void *cls, const char *emsg)
{
if (emsg != NULL)
{
-#if VERBOSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Shutdown of peers failed!\n");
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "test: Shutdown of peers failed: %s\n",
+ emsg);
ok++;
}
else
{
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"test: All peers successfully shut down!\n");
-#endif
}
GNUNET_CONFIGURATION_destroy (testing_cfg);
}
@@ -156,16 +151,12 @@ shutdown_callback (void *cls, const char *emsg)
static void
shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Ending test.\n");
-#endif
-
if (disconnect_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (disconnect_task);
disconnect_task = GNUNET_SCHEDULER_NO_TASK;
}
-
if (data_file != NULL)
GNUNET_DISK_file_close (data_file);
GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL);
@@ -184,6 +175,7 @@ disconnect_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_DHT_get_stop (get_h_far);
for (i = 0; i < num_peers; i++)
{
+ GNUNET_DHT_monitor_stop(mhs[i]);
GNUNET_DHT_disconnect (hs[i]);
}
GNUNET_SCHEDULER_cancel (shutdown_handle);
@@ -258,7 +250,7 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_h2s_full (&d_far->id.hashPubKey));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: from %s\n",
GNUNET_h2s_full (&o->id.hashPubKey));
- get_h_far = GNUNET_DHT_get_start (hs[0], GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */
+ get_h_far = GNUNET_DHT_get_start (hs[0],
GNUNET_BLOCK_TYPE_TEST, /* type */
&d_far->id.hashPubKey, /*key to search */
4U, /* replication level */
@@ -302,65 +294,123 @@ put_id (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
}
/**
- * Callback called on each request going through the DHT.
+ * Callback called on each GET request going through the DHT.
+ * Prints the info about the intercepted packet and increments a counter.
+ *
+ * @param cls Closure.
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the GET path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param key Key of the requested data.
+ */
+void
+monitor_get_cb (void *cls,
+ enum GNUNET_DHT_RouteOption options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ const GNUNET_HashCode * key)
+{
+ const char *s_key;
+ unsigned int i;
+
+ i = (unsigned int) (long) cls;
+ s_key = GNUNET_h2s(key);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "%u got a GET message for key %s\n",
+ i, s_key);
+
+ if (strncmp (s_key, id_far, 4) == 0 && in_test == GNUNET_YES)
+ monitor_counter++;
+}
+
+
+/**
+ * Callback called on each PUT request going through the DHT.
+ * Prints the info about the intercepted packet and increments a counter.
+ *
+ * @param cls Closure.
+ * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
+ * @param type The type of data in the request.
+ * @param hop_count Hop count so far.
+ * @param path_length number of entries in path (or 0 if not recorded).
+ * @param path peers on the PUT path (or NULL if not recorded).
+ * @param desired_replication_level Desired replication level.
+ * @param exp Expiration time of the data.
+ * @param key Key under which data is to be stored.
+ * @param data Pointer to the data carried.
+ * @param size Number of bytes in data.
+ */
+void
+monitor_put_cb (void *cls,
+ enum GNUNET_DHT_RouteOption options,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t hop_count,
+ uint32_t desired_replication_level,
+ unsigned int path_length,
+ const struct GNUNET_PeerIdentity *path,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
+ const void *data,
+ size_t size)
+{
+ const char *s_key;
+ unsigned int i;
+
+ i = (unsigned int) (long) cls;
+ s_key = GNUNET_h2s(key);
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "%u got a PUT message for key %s with %u bytes\n",
+ i, s_key, size);
+
+ if (strncmp (s_key, id_far, 4) == 0 && in_test == GNUNET_YES)
+ monitor_counter++;
+}
+
+
+/**
+ * Callback called on each GET reply going through the DHT.
* Prints the info about the intercepted packet and increments a counter.
*
- * @param cls Closure (long) # of daemon that got the monitor event.
- * @param mtype Type of the DHT message monitored.
- * @param exp When will this value expire.
- * @param key Key of the result/request.
- * @param get_path Peers on reply path (or NULL if not recorded).
+ * @param cls Closure.
+ * @param type The type of data in the result.
+ * @param get_path Peers on GET path (or NULL if not recorded).
* @param get_path_length number of entries in get_path.
* @param put_path peers on the PUT path (or NULL if not recorded).
* @param put_path_length number of entries in get_path.
- * @param desired_replication_level Desired replication level.
- * @param type Type of the result/request.
+ * @param exp Expiration time of the data.
+ * @param key Key of the data.
* @param data Pointer to the result data.
* @param size Number of bytes in data.
*/
void
-monitor_dht_cb (void *cls,
- uint16_t mtype,
- struct GNUNET_TIME_Absolute exp,
- const GNUNET_HashCode * key,
- const struct GNUNET_PeerIdentity * get_path,
+monitor_res_cb (void *cls,
+ enum GNUNET_BLOCK_Type type,
+ const struct GNUNET_PeerIdentity *get_path,
unsigned int get_path_length,
- const struct GNUNET_PeerIdentity * put_path,
+ const struct GNUNET_PeerIdentity *put_path,
unsigned int put_path_length,
- uint32_t desired_replication_level,
- enum GNUNET_DHT_RouteOption options,
- enum GNUNET_BLOCK_Type type,
+ struct GNUNET_TIME_Absolute exp,
+ const GNUNET_HashCode * key,
const void *data,
size_t size)
{
const char *s_key;
- const char *mtype_s;
unsigned int i;
i = (unsigned int) (long) cls;
s_key = GNUNET_h2s(key);
- switch (mtype)
- {
- case 149:
- mtype_s = "GET ";
- break;
- case 150:
- mtype_s = "RESULT";
- break;
- case 151:
- mtype_s = "PUT ";
- break;
- default:
- GNUNET_break (0);
- mtype_s = "UNKNOWN!!!";
- }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "%u got a message of type %s for key %s\n",
- i, mtype_s, s_key);
+ "%u got a REPLY message for key %s with %u bytes\n",
+ i, s_key, size);
- if ((mtype == GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET ||
- mtype == GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) &&
- strncmp (s_key, id_far, 4) == 0 && in_test == GNUNET_YES)
+ if (strncmp (s_key, id_far, 4) == 0 && in_test == GNUNET_YES)
monitor_counter++;
}
@@ -389,15 +439,9 @@ peergroup_ready (void *cls, const char *emsg)
GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL);
return;
}
-#if VERBOSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "************************************************************\n");
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "test: Peer Group started successfully!\n");
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Have %u connections\n",
+ "test: Peer Group started successfully with %u connections\n",
total_connections);
-#endif
-
if (data_file != NULL)
{
buf = NULL;
@@ -419,8 +463,13 @@ peergroup_ready (void *cls, const char *emsg)
{
d = GNUNET_TESTING_daemon_get (pg, i);
hs[i] = GNUNET_DHT_connect (d->cfg, 32);
- mhs[i] = GNUNET_DHT_monitor_start(hs[i], GNUNET_BLOCK_TYPE_ANY, NULL,
- &monitor_dht_cb, (void *)(long)i);
+ mhs[i] = GNUNET_DHT_monitor_start(hs[i],
+ GNUNET_BLOCK_TYPE_ANY,
+ NULL,
+ &monitor_get_cb,
+ &monitor_res_cb,
+ &monitor_put_cb,
+ (void *)(long)i);
}
if ((NULL == o) || (NULL == d_far))
@@ -500,19 +549,12 @@ run (void *cls, char *const *args, const char *cfgfile,
testing_cfg = GNUNET_CONFIGURATION_dup (cfg);
GNUNET_log_setup ("test_dht_monitor",
-#if VERBOSE
- "DEBUG",
-#else
"WARNING",
-#endif
NULL);
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Starting daemons.\n");
GNUNET_CONFIGURATION_set_value_string (testing_cfg, "testing",
"use_progressbars", "YES");
-#endif
-
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (testing_cfg, "testing",
"num_peers", &num_peers))
@@ -600,9 +642,6 @@ main (int xargc, char *xargv[])
char *const argv[] = { "test-dht-monitor",
"-c",
"test_dht_line.conf",
-#if VERBOSE
- "-L", "DEBUG",
-#endif
NULL
};
diff --git a/src/dht/test_dht_multipeer.c b/src/dht/test_dht_multipeer.c
index 94e39d2..ab7d90e 100644
--- a/src/dht/test_dht_multipeer.c
+++ b/src/dht/test_dht_multipeer.c
@@ -27,9 +27,6 @@
#include "gnunet_core_service.h"
#include "gnunet_dht_service.h"
-/* DEFINES */
-#define VERBOSE GNUNET_NO
-
/* Timeout for entire testcase */
#define TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 30)
@@ -279,7 +276,7 @@ static struct StatValues stats[] = {
{"core", "# bytes encrypted", 0},
{"core", "# type maps received", 0},
{"core", "# session keys confirmed via PONG", 0},
- {"core", "# entries in session map", 0},
+ {"core", "# peers connected", 0},
{"core", "# key exchanges initiated", 0},
{"core", "# send requests dropped (disconnected)", 0},
{"core", "# transmissions delayed due to corking", 0},
@@ -624,7 +621,7 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_assert (test_get->dht_handle != NULL);
outstanding_gets++;
test_get->get_handle =
- GNUNET_DHT_get_start (test_get->dht_handle, GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_DHT_get_start (test_get->dht_handle,
GNUNET_BLOCK_TYPE_TEST, &key, 1, route_option, NULL,
0, &get_result_iterator, test_get);
test_get->task =
@@ -658,10 +655,9 @@ start_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
unsigned long long j;
struct TestGetContext *test_get;
-#if VERBOSE
- FPRINTF (stderr, "Issuing %llu GETs\n",
- (unsigned long long) (num_peers * num_peers));
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Issuing %llu GETs\n",
+ (unsigned long long) (num_peers * num_peers));
for (i = 0; i < num_peers; i++)
for (j = 0; j < num_peers; j++)
{
@@ -678,7 +674,7 @@ start_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
* Called when the PUT request has been transmitted to the DHT service.
*/
static void
-put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+put_finished (void *cls, int success)
{
struct TestPutContext *test_put = cls;
@@ -719,10 +715,9 @@ do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
test_put->dht_handle = GNUNET_DHT_connect (test_put->daemon->cfg, 10);
GNUNET_assert (test_put->dht_handle != NULL);
outstanding_puts++;
-#if VERBOSE > 2
- FPRINTF (stderr, "PUT %u at `%s'\n", test_put->uid,
- GNUNET_i2s (&test_put->daemon->id));
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "PUT %u at `%s'\n", test_put->uid,
+ GNUNET_i2s (&test_put->daemon->id));
GNUNET_DHT_put (test_put->dht_handle, &key, 1, route_option,
GNUNET_BLOCK_TYPE_TEST, sizeof (data), data,
GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_TIME_UNIT_FOREVER_REL,
@@ -822,9 +817,6 @@ check ()
char *const argv[] = { "test-dht-multipeer", /* Name to give running binary */
"-c",
"test_dht_multipeer_data.conf", /* Config file to use */
-#if VERBOSE
- "-L", "DEBUG",
-#endif
NULL
};
struct GNUNET_GETOPT_CommandLineOption options[] = {
@@ -848,13 +840,8 @@ main (int argc, char *argv[])
{
int ret;
-
GNUNET_log_setup ("test-dht-multipeer",
-#if VERBOSE
- "DEBUG",
-#else
"WARNING",
-#endif
NULL);
ret = check ();
/**
diff --git a/src/dht/test_dht_multipeer_data.conf b/src/dht/test_dht_multipeer_data.conf
index ff87698..f181594 100644
--- a/src/dht/test_dht_multipeer_data.conf
+++ b/src/dht/test_dht_multipeer_data.conf
@@ -16,7 +16,7 @@ ACCEPT_FROM = 127.0.0.1;
CONFIG = $DEFAULTCONFIG
HOME = $SERVICEHOME
HOSTNAME = localhost
-PORT = 2100
+PORT = 12100
STOP_FOUND = YES
USE_MAX_HOPS = YES
MAX_HOPS = 16
@@ -115,12 +115,13 @@ BEHIND_NAT = NO
ALLOW_NAT = NO
INTERNAL_ADDRESS = 127.0.0.1
EXTERNAL_ADDRESS = 127.0.0.1
-USE_LOCALADDR = NO
+USE_LOCALADDR = YES
[dns]
AUTOSTART = NO
-
+[namestore]
+AUTOSTART = NO
[nse]
AUTOSTART = NO
diff --git a/src/dht/test_dht_topo.c b/src/dht/test_dht_topo.c
index 81dc7cb..f51f3a6 100644
--- a/src/dht/test_dht_topo.c
+++ b/src/dht/test_dht_topo.c
@@ -30,8 +30,6 @@
#include "gnunet_testing_lib.h"
#include "gnunet_dht_service.h"
-#define VERBOSE GNUNET_NO
-
#define REMOVE_DIR GNUNET_YES
/**
@@ -55,11 +53,6 @@
static int ok;
/**
- * Be verbose
- */
-static int verbose;
-
-/**
* Total number of peers in the test.
*/
static unsigned long long num_peers;
@@ -116,27 +109,26 @@ static GNUNET_SCHEDULER_TaskIdentifier shutdown_handle;
static char *topology_file;
-struct GNUNET_TESTING_Daemon *d1;
+static struct GNUNET_DHT_Handle **hs;
-struct GNUNET_TESTING_Daemon *d2;
+static struct GNUNET_DHT_GetHandle *get_h;
-struct GNUNET_DHT_Handle **hs;
+static struct GNUNET_DHT_GetHandle *get_h_2;
-struct GNUNET_DHT_GetHandle *get_h;
+static struct GNUNET_DHT_GetHandle *get_h_far;
-struct GNUNET_DHT_GetHandle *get_h_2;
+static int found_1;
-struct GNUNET_DHT_GetHandle *get_h_far;
+static int found_2;
-int found_1;
-int found_2;
-int found_far;
+static int found_far;
/**
* Which topology are we to run
*/
static int test_topology;
+
/**
* Check whether peers successfully shut down.
*/
@@ -145,17 +137,12 @@ shutdown_callback (void *cls, const char *emsg)
{
if (emsg != NULL)
{
-#if VERBOSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutdown of peers failed!\n");
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Shutdown of peers failed!\n");
ok++;
}
else
{
-#if VERBOSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "All peers successfully shut down!\n");
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "All peers successfully shut down!\n");
}
GNUNET_CONFIGURATION_destroy (testing_cfg);
}
@@ -164,9 +151,7 @@ shutdown_callback (void *cls, const char *emsg)
static void
shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
-#if VERBOSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n");
-#endif
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Ending test.\n");
if (disconnect_task != GNUNET_SCHEDULER_NO_TASK)
{
@@ -185,7 +170,7 @@ disconnect_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
unsigned int i;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "disconnecting peers\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnecting peers\n");
disconnect_task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_SCHEDULER_cancel (put_task);
if (NULL != get_h)
@@ -202,6 +187,7 @@ disconnect_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
shutdown_handle = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL);
}
+
static void
dht_get_id_handler (void *cls, struct GNUNET_TIME_Absolute exp,
const GNUNET_HashCode * key,
@@ -255,14 +241,15 @@ dht_get_id_handler (void *cls, struct GNUNET_TIME_Absolute exp,
default:
GNUNET_break(0);
}
- if (TORUS == test_topology &&
- (found_1 == 0 || found_2 == 0 || found_far == 0))
+ if ( (TORUS == test_topology) &&
+ ( (found_1 == 0) || (found_2 == 0) || (found_far == 0)) )
return;
ok = 0;
GNUNET_SCHEDULER_cancel (disconnect_task);
disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_peers, NULL);
}
+
static void
do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
@@ -313,11 +300,11 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_assert (0);
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_task\ntest: from %s\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_task\nfrom %s\n",
GNUNET_h2s_full (&o->id.hashPubKey));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " looking for %s\n",
GNUNET_h2s_full (&d->id.hashPubKey));
- get_h = GNUNET_DHT_get_start (hs[0], GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */
+ get_h = GNUNET_DHT_get_start (hs[0],
GNUNET_BLOCK_TYPE_TEST, /* type */
&d->id.hashPubKey, /*key to search */
4U, /* replication level */
@@ -328,7 +315,7 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " looking for %s\n",
GNUNET_h2s_full (&d2->id.hashPubKey));
- get_h_2 = GNUNET_DHT_get_start (hs[0], GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */
+ get_h_2 = GNUNET_DHT_get_start (hs[0],
GNUNET_BLOCK_TYPE_TEST, /* type */
&d2->id.hashPubKey, /*key to search */
4U, /* replication level */
@@ -337,7 +324,7 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
&dht_get_id_handler, (void *)2);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " looking for %s\n",
GNUNET_h2s_full (&d_far->id.hashPubKey));
- get_h_far = GNUNET_DHT_get_start (hs[0], GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */
+ get_h_far = GNUNET_DHT_get_start (hs[0],
GNUNET_BLOCK_TYPE_TEST, /* type */
&d_far->id.hashPubKey, /*key to search */
4U, /* replication level */
@@ -402,21 +389,16 @@ peergroup_ready (void *cls, const char *emsg)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Peergroup callback called with error, aborting test!\n");
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Error from testing: `%s'\n",
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error from testing: `%s'\n",
emsg);
ok++;
GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL);
return;
}
-#if VERBOSE
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "************************************************************\n");
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Peer Group started successfully!\n");
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Have %u connections\n",
+ "Peer Group started successfully with %u connections\n",
total_connections);
-#endif
-
if (data_file != NULL)
{
buf = NULL;
@@ -465,7 +447,6 @@ connect_cb (void *cls, const struct GNUNET_PeerIdentity *first,
struct GNUNET_TESTING_Daemon *first_daemon,
struct GNUNET_TESTING_Daemon *second_daemon, const char *emsg)
{
-
if (emsg == NULL)
{
total_connections++;
@@ -474,10 +455,9 @@ connect_cb (void *cls, const struct GNUNET_PeerIdentity *first,
}
else
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Problem with new connection (%s)\n", emsg);
}
-
}
@@ -500,19 +480,11 @@ run (void *cls, char *const *args, const char *cfgfile,
testing_cfg = GNUNET_CONFIGURATION_dup (cfg);
GNUNET_log_setup ("test_dht_topo",
-#if VERBOSE
- "DEBUG",
-#else
"WARNING",
-#endif
NULL);
-
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting daemons.\n");
GNUNET_CONFIGURATION_set_value_string (testing_cfg, "testing",
"use_progressbars", "YES");
-#endif
-
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (testing_cfg, "testing",
"num_peers", &num_peers))
@@ -579,38 +551,23 @@ run (void *cls, char *const *args, const char *cfgfile,
}
-
-/**
- * test_dht_2d command line options
- */
-static struct GNUNET_GETOPT_CommandLineOption options[] = {
- {'V', "verbose", NULL,
- gettext_noop ("be verbose (print progress information)"),
- 0, &GNUNET_GETOPT_set_one, &verbose},
- GNUNET_GETOPT_OPTION_END
-};
-
-
/**
* Main: start test
*/
int
main (int xargc, char *xargv[])
{
- char *const argv_torus[] = { "test-dht-2dtorus",
+ static struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_OPTION_END
+ };
+ static char *const argv_torus[] = { "test-dht-2dtorus",
"-c",
"test_dht_2dtorus.conf",
-#if VERBOSE
- "-L", "DEBUG",
-#endif
NULL
};
- char *const argv_line[] = { "test-dht-line",
+ static char *const argv_line[] = { "test-dht-line",
"-c",
"test_dht_line.conf",
-#if VERBOSE
- "-L", "DEBUG",
-#endif
NULL
};
char *const *argv;
@@ -641,17 +598,17 @@ main (int xargc, char *xargv[])
#if REMOVE_DIR
GNUNET_DISK_directory_remove ("/tmp/test_dht_topo");
#endif
- if (found_1 == 0)
+ if (0 == found_1)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ID 1 not found!\n");
}
if (TORUS == test_topology)
{
- if (found_2 == 0)
+ if (0 == found_2)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ID 2 not found!\n");
}
- if (found_far == 0)
+ if (0 == found_far)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ID far not found!\n");
}
diff --git a/src/dht/test_dht_twopeer.c b/src/dht/test_dht_twopeer.c
index a3b6e4a..f0ac05b 100644
--- a/src/dht/test_dht_twopeer.c
+++ b/src/dht/test_dht_twopeer.c
@@ -28,8 +28,6 @@
#include "gnunet_dht_service.h"
/* DEFINES */
-#define VERBOSE GNUNET_NO
-
#define MAX_GET_ATTEMPTS 10
#define TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5)
@@ -250,8 +248,6 @@ get_stop_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
&stop_retry_get, get_context);
get_context->get_handle =
GNUNET_DHT_get_start (get_context->dht_handle,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
GNUNET_BLOCK_TYPE_DHT_HELLO,
&get_context->peer->hashPubKey, 1,
GNUNET_DHT_RO_NONE, NULL, 0, &get_result_iterator,
@@ -285,8 +281,6 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
&stop_retry_get, get_context);
get_context->get_handle =
GNUNET_DHT_get_start (get_context->dht_handle,
- GNUNET_TIME_relative_multiply
- (GNUNET_TIME_UNIT_SECONDS, 5),
GNUNET_BLOCK_TYPE_DHT_HELLO,
&get_context->peer->hashPubKey, 1,
GNUNET_DHT_RO_FIND_PEER, NULL, 0,
@@ -306,7 +300,6 @@ topology_callback (void *cls, const struct GNUNET_PeerIdentity *first,
if (emsg == NULL)
{
total_connections++;
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"connected peer %s to peer %s, distance %u\n",
first_daemon->shortname, second_daemon->shortname, distance);
@@ -314,19 +307,16 @@ topology_callback (void *cls, const struct GNUNET_PeerIdentity *first,
else
{
failed_connections++;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to connect peer %s to peer %s with error :\n%s\n",
first_daemon->shortname, second_daemon->shortname, emsg);
-#endif
}
if (total_connections == expected_connections)
{
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Created %d total connections, which is our target number! Starting next phase of testing.\n",
total_connections);
-#endif
GNUNET_SCHEDULER_cancel (die_task);
die_task =
GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly,
@@ -409,10 +399,8 @@ peers_started_callback (void *cls, const struct GNUNET_PeerIdentity *id,
if (peers_left == 0)
{
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"All %d daemons started, now connecting peers!\n", num_peers);
-#endif
GNUNET_SCHEDULER_cancel (die_task);
/* Set up task in case topology creation doesn't finish
* within a reasonable amount of time */
@@ -466,9 +454,6 @@ check ()
char *const argv[] = { "test-dht-twopeer",
"-c",
"test_dht_twopeer_data.conf",
-#if VERBOSE
- "-L", "DEBUG",
-#endif
NULL
};
struct GNUNET_GETOPT_CommandLineOption options[] = {
@@ -491,11 +476,7 @@ main (int argc, char *argv[])
int ret;
GNUNET_log_setup ("test-dht-twopeer",
-#if VERBOSE
- "DEBUG",
-#else
"WARNING",
-#endif
NULL);
ret = check ();
/**
diff --git a/src/dht/test_dht_twopeer_data.conf b/src/dht/test_dht_twopeer_data.conf
index ba1e22b..2889a41 100644
--- a/src/dht/test_dht_twopeer_data.conf
+++ b/src/dht/test_dht_twopeer_data.conf
@@ -9,7 +9,7 @@ AUTOSTART = YES
DEBUG = NO
AUTOSTART = YES
#PREFIX = xterm -T dht -e gdb --args
-PORT = 2100
+PORT = 12100
BINARY = gnunet-service-dht
[block]
@@ -34,7 +34,7 @@ HOSTNAME = localhost
PORT = 12092
[arm]
-DEFAULTSERVICES = core
+DEFAULTSERVICES = core dht
PORT = 12366
DEBUG = NO
@@ -58,7 +58,7 @@ BEHIND_NAT = NO
ALLOW_NAT = NO
INTERNAL_ADDRESS = 127.0.0.1
EXTERNAL_ADDRESS = 127.0.0.1
-USE_LOCALADDR = NO
+USE_LOCALADDR = YES
[dns]
AUTOSTART = NO
@@ -72,3 +72,5 @@ AUTOSTART = NO
[fs]
AUTOSTART = NO
+[namestore]
+AUTOSTART = NO
diff --git a/src/dht/test_dht_twopeer_get_put.c b/src/dht/test_dht_twopeer_get_put.c
index 0bb9fac..3271d04 100644
--- a/src/dht/test_dht_twopeer_get_put.c
+++ b/src/dht/test_dht_twopeer_get_put.c
@@ -272,7 +272,7 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
memset (&key, 42, sizeof (GNUNET_HashCode)); /* Set the key to the same thing as when data was inserted */
#endif
global_get_handle =
- GNUNET_DHT_get_start (peer2dht, GNUNET_TIME_relative_get_forever (),
+ GNUNET_DHT_get_start (peer2dht,
#if DNS
GNUNET_BLOCK_TYPE_DNS,
#else
@@ -289,7 +289,7 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
* Schedule the GET request for some time in the future.
*/
static void
-put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+put_finished (void *cls, int success)
{
GNUNET_SCHEDULER_cancel (die_task);
die_task =
@@ -391,29 +391,23 @@ topology_callback (void *cls, const struct GNUNET_PeerIdentity *first,
if (emsg == NULL)
{
total_connections++;
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"connected peer %s to peer %s, distance %u\n",
first_daemon->shortname, second_daemon->shortname, distance);
-#endif
}
-#if VERBOSE
else
{
failed_connections++;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to connect peer %s to peer %s with error :\n%s\n",
first_daemon->shortname, second_daemon->shortname, emsg);
}
-#endif
if (total_connections == expected_connections)
{
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Created %d total connections, which is our target number! Starting next phase of testing.\n",
total_connections);
-#endif
GNUNET_SCHEDULER_cancel (die_task);
die_task =
GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, "from test gets");
@@ -482,10 +476,8 @@ peers_started_callback (void *cls, const struct GNUNET_PeerIdentity *id,
if (peers_left == 0) /* Indicates all peers started */
{
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"All %d daemons started, now connecting peers!\n", num_peers);
-#endif
expected_connections = -1;
if ((pg != NULL)) /* Sanity check */
{
@@ -556,9 +548,6 @@ check ()
char *const argv[] = { "test-dht-twopeer-get-put", /* Name to give running binary */
"-c",
"test_dht_twopeer_data.conf", /* Config file to use */
-#if VERBOSE
- "-L", "DEBUG",
-#endif
NULL
};
struct GNUNET_GETOPT_CommandLineOption options[] = {
@@ -583,11 +572,7 @@ main (int argc, char *argv[])
int ret;
GNUNET_log_setup ("test-dht-twopeer",
-#if VERBOSE
- "DEBUG",
-#else
"WARNING",
-#endif
NULL);
ret = check ();
/**
diff --git a/src/dht/test_dht_twopeer_path_tracking.c b/src/dht/test_dht_twopeer_path_tracking.c
index 6e764a3..6ecf6a3 100644
--- a/src/dht/test_dht_twopeer_path_tracking.c
+++ b/src/dht/test_dht_twopeer_path_tracking.c
@@ -249,7 +249,7 @@ get_result_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
* Schedule the GET request for some time in the future.
*/
static void
-put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+put_finished (void *cls, int success)
{
GNUNET_HashCode key; /* Key for data lookup */
@@ -259,7 +259,7 @@ put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
"waiting for get response (data not found)");
memset (&key, 42, sizeof (GNUNET_HashCode)); /* Set the key to the same thing as when data was inserted */
global_get_handle =
- GNUNET_DHT_get_start (peer2dht, GNUNET_TIME_relative_get_forever (),
+ GNUNET_DHT_get_start (peer2dht,
GNUNET_BLOCK_TYPE_TEST, &key, 1,
GNUNET_DHT_RO_RECORD_ROUTE, NULL, 0,
&get_result_iterator, NULL);
diff --git a/src/dht/test_dht_twopeer_put_get.c b/src/dht/test_dht_twopeer_put_get.c
index 48bf9f8..44150e3 100644
--- a/src/dht/test_dht_twopeer_put_get.c
+++ b/src/dht/test_dht_twopeer_put_get.c
@@ -129,9 +129,15 @@ static struct GNUNET_DHT_Handle *peer1dht;
static struct GNUNET_DHT_Handle *peer2dht;
/**
+ * Handle for our PUT operation.
+ */
+static struct GNUNET_DHT_PutHandle *put_op;
+
+
+/**
* Check whether peers successfully shut down.
*/
-void
+static void
shutdown_callback (void *cls, const char *emsg)
{
if (emsg != NULL)
@@ -164,6 +170,11 @@ finish_testing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
static void
end_badly_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ if (NULL != put_op)
+ {
+ GNUNET_DHT_put_cancel (put_op);
+ put_op = NULL;
+ }
if (peer1dht != NULL)
GNUNET_DHT_disconnect (peer1dht);
@@ -174,6 +185,7 @@ end_badly_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL);
}
+
/**
* Check if the get_handle is being used, if so stop the request. Either
* way, schedule the end_badly_cont function which actually shuts down the
@@ -242,10 +254,11 @@ get_result_iterator (void *cls, struct GNUNET_TIME_Absolute exp,
* Schedule the GET request for some time in the future.
*/
static void
-put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+put_finished (void *cls, int success)
{
GNUNET_HashCode key; /* Key for data lookup */
+ put_op = NULL;
GNUNET_SCHEDULER_cancel (die_task);
die_task =
GNUNET_SCHEDULER_add_delayed (GET_TIMEOUT, &end_badly,
@@ -253,7 +266,7 @@ put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
memset (&key, 42, sizeof (GNUNET_HashCode)); /* Set the key to the same thing as when data was inserted */
global_get_handle =
- GNUNET_DHT_get_start (peer2dht, GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_DHT_get_start (peer2dht,
GNUNET_BLOCK_TYPE_TEST, &key, 1, GNUNET_DHT_RO_NONE,
NULL, 0, &get_result_iterator, NULL);
}
@@ -272,9 +285,9 @@ do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
memset (data, 43, sizeof (data));
/* Insert the data at the first peer */
- GNUNET_DHT_put (peer1dht, &key, 1, GNUNET_DHT_RO_NONE, GNUNET_BLOCK_TYPE_TEST,
- sizeof (data), data, GNUNET_TIME_UNIT_FOREVER_ABS,
- GNUNET_TIME_UNIT_FOREVER_REL, &put_finished, NULL);
+ put_op = GNUNET_DHT_put (peer1dht, &key, 1, GNUNET_DHT_RO_NONE, GNUNET_BLOCK_TYPE_TEST,
+ sizeof (data), data, GNUNET_TIME_UNIT_FOREVER_ABS,
+ GNUNET_TIME_UNIT_FOREVER_REL, &put_finished, NULL);
}
@@ -299,29 +312,23 @@ topology_callback (void *cls, const struct GNUNET_PeerIdentity *first,
if (emsg == NULL)
{
total_connections++;
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"connected peer %s to peer %s, distance %u\n",
first_daemon->shortname, second_daemon->shortname, distance);
-#endif
}
-#if VERBOSE
else
{
failed_connections++;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to connect peer %s to peer %s with error :\n%s\n",
first_daemon->shortname, second_daemon->shortname, emsg);
}
-#endif
if (total_connections == expected_connections)
{
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Created %d total connections, which is our target number! Starting next phase of testing.\n",
total_connections);
-#endif
GNUNET_SCHEDULER_cancel (die_task);
die_task =
GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, "from test gets");
@@ -389,10 +396,8 @@ peers_started_callback (void *cls, const struct GNUNET_PeerIdentity *id,
if (peers_left == 0) /* Indicates all peers started */
{
-#if VERBOSE
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"All %d daemons started, now connecting peers!\n", num_peers);
-#endif
expected_connections = -1;
if ((pg != NULL)) /* Sanity check */
{
@@ -463,9 +468,6 @@ check ()
char *const argv[] = { "test-dht-twopeer-put-get", /* Name to give running binary */
"-c",
"test_dht_twopeer_data.conf", /* Config file to use */
-#if VERBOSE
- "-L", "DEBUG",
-#endif
NULL
};
struct GNUNET_GETOPT_CommandLineOption options[] = {
@@ -490,11 +492,7 @@ main (int argc, char *argv[])
int ret;
GNUNET_log_setup ("test-dht-twopeer",
-#if VERBOSE
- "DEBUG",
-#else
"WARNING",
-#endif
NULL);
ret = check ();
/**