diff options
Diffstat (limited to 'src/dht')
29 files changed, 1740 insertions, 591 deletions
diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am index 93880c2..b2d18d2 100644 --- a/src/dht/Makefile.am +++ b/src/dht/Makefile.am @@ -50,6 +50,7 @@ libgnunet_plugin_block_dht_la_DEPENDENCIES = \ bin_PROGRAMS = \ gnunet-service-dht \ + gnunet-dht-monitor \ gnunet-dht-get \ gnunet-dht-put @@ -92,6 +93,16 @@ gnunet_dht_put_LDADD = \ gnunet_dht_put_DEPENDENCIES = \ libgnunetdht.la +gnunet_dht_monitor_SOURCES = \ + gnunet-dht-monitor.c +gnunet_dht_monitor_LDADD = \ + $(top_builddir)/src/dht/libgnunetdht.la \ + $(top_builddir)/src/core/libgnunetcore.la \ + $(top_builddir)/src/util/libgnunetutil.la +gnunet_dht_monitor_DEPENDENCIES = \ + libgnunetdht.la + + check_PROGRAMS = \ test_dht_api \ test_dht_twopeer \ diff --git a/src/dht/Makefile.in b/src/dht/Makefile.in index 97e9c59..12ce558 100644 --- a/src/dht/Makefile.in +++ b/src/dht/Makefile.in @@ -37,8 +37,8 @@ POST_UNINSTALL = : build_triplet = @build@ host_triplet = @host@ target_triplet = @target@ -bin_PROGRAMS = gnunet-service-dht$(EXEEXT) gnunet-dht-get$(EXEEXT) \ - gnunet-dht-put$(EXEEXT) +bin_PROGRAMS = gnunet-service-dht$(EXEEXT) gnunet-dht-monitor$(EXEEXT) \ + gnunet-dht-get$(EXEEXT) gnunet-dht-put$(EXEEXT) check_PROGRAMS = test_dht_api$(EXEEXT) test_dht_twopeer$(EXEEXT) \ test_dht_twopeer_put_get$(EXEEXT) \ test_dht_twopeer_get_put$(EXEEXT) \ @@ -123,6 +123,8 @@ libgnunetdht_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \ PROGRAMS = $(bin_PROGRAMS) am_gnunet_dht_get_OBJECTS = gnunet-dht-get.$(OBJEXT) gnunet_dht_get_OBJECTS = $(am_gnunet_dht_get_OBJECTS) +am_gnunet_dht_monitor_OBJECTS = gnunet-dht-monitor.$(OBJEXT) +gnunet_dht_monitor_OBJECTS = $(am_gnunet_dht_monitor_OBJECTS) am_gnunet_dht_put_OBJECTS = gnunet-dht-put.$(OBJEXT) gnunet_dht_put_OBJECTS = $(am_gnunet_dht_put_OBJECTS) am_gnunet_service_dht_OBJECTS = gnunet-service-dht.$(OBJEXT) \ @@ -208,19 +210,21 @@ am__v_GEN_ = $(am__v_GEN_$(AM_DEFAULT_VERBOSITY)) am__v_GEN_0 = @echo " GEN " $@; SOURCES = $(libgnunet_plugin_block_dht_la_SOURCES) \ $(libgnunetdht_la_SOURCES) $(gnunet_dht_get_SOURCES) \ - $(gnunet_dht_put_SOURCES) $(gnunet_service_dht_SOURCES) \ - $(test_dht_2dtorus_SOURCES) $(test_dht_api_SOURCES) \ - $(test_dht_line_SOURCES) $(test_dht_monitor_SOURCES) \ - $(test_dht_multipeer_SOURCES) $(test_dht_twopeer_SOURCES) \ + $(gnunet_dht_monitor_SOURCES) $(gnunet_dht_put_SOURCES) \ + $(gnunet_service_dht_SOURCES) $(test_dht_2dtorus_SOURCES) \ + $(test_dht_api_SOURCES) $(test_dht_line_SOURCES) \ + $(test_dht_monitor_SOURCES) $(test_dht_multipeer_SOURCES) \ + $(test_dht_twopeer_SOURCES) \ $(test_dht_twopeer_get_put_SOURCES) \ $(test_dht_twopeer_path_tracking_SOURCES) \ $(test_dht_twopeer_put_get_SOURCES) DIST_SOURCES = $(libgnunet_plugin_block_dht_la_SOURCES) \ $(libgnunetdht_la_SOURCES) $(gnunet_dht_get_SOURCES) \ - $(gnunet_dht_put_SOURCES) $(gnunet_service_dht_SOURCES) \ - $(test_dht_2dtorus_SOURCES) $(test_dht_api_SOURCES) \ - $(test_dht_line_SOURCES) $(test_dht_monitor_SOURCES) \ - $(test_dht_multipeer_SOURCES) $(test_dht_twopeer_SOURCES) \ + $(gnunet_dht_monitor_SOURCES) $(gnunet_dht_put_SOURCES) \ + $(gnunet_service_dht_SOURCES) $(test_dht_2dtorus_SOURCES) \ + $(test_dht_api_SOURCES) $(test_dht_line_SOURCES) \ + $(test_dht_monitor_SOURCES) $(test_dht_multipeer_SOURCES) \ + $(test_dht_twopeer_SOURCES) \ $(test_dht_twopeer_get_put_SOURCES) \ $(test_dht_twopeer_path_tracking_SOURCES) \ $(test_dht_twopeer_put_get_SOURCES) @@ -285,6 +289,7 @@ INSTALL_SCRIPT = @INSTALL_SCRIPT@ INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ INTLLIBS = @INTLLIBS@ INTL_MACOSX_LIBS = @INTL_MACOSX_LIBS@ +JAVAPORT = @JAVAPORT@ LD = @LD@ LDFLAGS = @LDFLAGS@ LIBADD_DL = @LIBADD_DL@ @@ -318,6 +323,7 @@ LT_DLLOADERS = @LT_DLLOADERS@ LT_DLPREOPEN = @LT_DLPREOPEN@ MAKEINFO = @MAKEINFO@ MKDIR_P = @MKDIR_P@ +MONKEYPREFIX = @MONKEYPREFIX@ MSGFMT = @MSGFMT@ MSGFMT_015 = @MSGFMT_015@ MSGMERGE = @MSGMERGE@ @@ -519,6 +525,17 @@ gnunet_dht_put_LDADD = \ gnunet_dht_put_DEPENDENCIES = \ libgnunetdht.la +gnunet_dht_monitor_SOURCES = \ + gnunet-dht-monitor.c + +gnunet_dht_monitor_LDADD = \ + $(top_builddir)/src/dht/libgnunetdht.la \ + $(top_builddir)/src/core/libgnunetcore.la \ + $(top_builddir)/src/util/libgnunetutil.la + +gnunet_dht_monitor_DEPENDENCIES = \ + libgnunetdht.la + test_dht_api_SOURCES = \ test_dht_api.c @@ -778,6 +795,9 @@ clean-checkPROGRAMS: gnunet-dht-get$(EXEEXT): $(gnunet_dht_get_OBJECTS) $(gnunet_dht_get_DEPENDENCIES) @rm -f gnunet-dht-get$(EXEEXT) $(AM_V_CCLD)$(LINK) $(gnunet_dht_get_OBJECTS) $(gnunet_dht_get_LDADD) $(LIBS) +gnunet-dht-monitor$(EXEEXT): $(gnunet_dht_monitor_OBJECTS) $(gnunet_dht_monitor_DEPENDENCIES) + @rm -f gnunet-dht-monitor$(EXEEXT) + $(AM_V_CCLD)$(LINK) $(gnunet_dht_monitor_OBJECTS) $(gnunet_dht_monitor_LDADD) $(LIBS) gnunet-dht-put$(EXEEXT): $(gnunet_dht_put_OBJECTS) $(gnunet_dht_put_DEPENDENCIES) @rm -f gnunet-dht-put$(EXEEXT) $(AM_V_CCLD)$(LINK) $(gnunet_dht_put_OBJECTS) $(gnunet_dht_put_LDADD) $(LIBS) @@ -820,6 +840,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/dht_api.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-get.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-monitor.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-put.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-service-dht.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-service-dht_clients.Po@am__quote@ diff --git a/src/dht/dht.conf.in b/src/dht/dht.conf.in index 17c13e9..59581dc 100644 --- a/src/dht/dht.conf.in +++ b/src/dht/dht.conf.in @@ -1,6 +1,6 @@ [dht] AUTOSTART = YES -@UNIXONLY@ PORT = 2095 +@JAVAPORT@PORT = 2095 HOSTNAME = localhost HOME = $SERVICEHOME CONFIG = $DEFAULTCONFIG diff --git a/src/dht/dht.h b/src/dht/dht.h index 9894be8..8adf49f 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -181,6 +181,11 @@ struct GNUNET_DHT_ClientPutMessage uint32_t desired_replication_level GNUNET_PACKED; /** + * Unique ID for the PUT message. + */ + uint64_t unique_id GNUNET_PACKED; + + /** * How long should this data persist? */ struct GNUNET_TIME_AbsoluteNBO expiration; @@ -196,20 +201,38 @@ struct GNUNET_DHT_ClientPutMessage /** - * Message to monitor requests going through peer, clients <--> DHT service. + * Message to confirming receipt of PUT, sent from DHT service to clients. */ -struct GNUNET_DHT_MonitorMessage +struct GNUNET_DHT_ClientPutConfirmationMessage { /** - * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_{GET, PUT, GET_RESP, PUT_RESP*} - * (*) not yet implemented, necessary for key randomization + * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK */ struct GNUNET_MessageHeader header; /** - * The type of data in the request. + * Always zero. */ - uint32_t type GNUNET_PACKED; + uint32_t reserved GNUNET_PACKED; + + /** + * Unique ID from the PUT message that is being confirmed. + */ + uint64_t unique_id GNUNET_PACKED; + +}; + + + +/** + * Message to monitor put requests going through peer, DHT service -> clients. + */ +struct GNUNET_DHT_MonitorPutMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT + */ + struct GNUNET_MessageHeader header; /** * Message options, actually an 'enum GNUNET_DHT_RouteOption' value. @@ -217,37 +240,165 @@ struct GNUNET_DHT_MonitorMessage uint32_t options GNUNET_PACKED; /** + * The type of data in the request. + */ + uint32_t type GNUNET_PACKED; + + /** + * Hop count so far. + */ + uint32_t hop_count GNUNET_PACKED; + + /** * Replication level for this message */ uint32_t desired_replication_level GNUNET_PACKED; /** * Number of peers recorded in the outgoing path from source to the - * storgage location of this message. + * storage location of this message. */ uint32_t put_path_length GNUNET_PACKED; /** - * The number of peer identities recorded from the storage location - * to this peer. + * How long should this data persist? */ - uint32_t get_path_length GNUNET_PACKED; + struct GNUNET_TIME_AbsoluteNBO expiration_time; /** - * Unique ID for GET / GET responses. + * The key to store the value under. */ - uint64_t unique_id GNUNET_PACKED; + GNUNET_HashCode key; + + /* put path (if tracked) */ + + /* Payload */ +}; + + +/** + * Message to request monitoring messages, clients -> DHT service. + */ +struct GNUNET_DHT_MonitorStartStopMessage +{ /** - * How long should this data persist? + * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_(START|STOP) */ - struct GNUNET_TIME_AbsoluteNBO expiration; + struct GNUNET_MessageHeader header; + + /** + * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all. + */ + uint32_t type GNUNET_PACKED; + + /** + * Flag whether to notify about GET messages. + */ + int16_t get GNUNET_PACKED; + + /** + * Flag whether to notify about GET_REPONSE messages. + */ + int16_t get_resp GNUNET_PACKED; + + /** + * Flag whether to notify about PUT messages. + */ + int16_t put GNUNET_PACKED; + + /** + * Flag whether to use the provided key to filter messages. + */ + int16_t filter_key GNUNET_PACKED; + + /** + * The key to filter messages by. + */ + GNUNET_HashCode key; +}; + + +/** + * Message to monitor get requests going through peer, DHT service -> clients. + */ +struct GNUNET_DHT_MonitorGetMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT + */ + struct GNUNET_MessageHeader header; + + /** + * Message options, actually an 'enum GNUNET_DHT_RouteOption' value. + */ + uint32_t options GNUNET_PACKED; + + /** + * The type of data in the request. + */ + uint32_t type GNUNET_PACKED; + + /** + * Hop count + */ + uint32_t hop_count GNUNET_PACKED; + + /** + * Replication level for this message + */ + uint32_t desired_replication_level GNUNET_PACKED; + + /** + * Number of peers recorded in the outgoing path from source to the + * storage location of this message. + */ + uint32_t get_path_length GNUNET_PACKED; /** * The key to store the value under. */ GNUNET_HashCode key; + /* get path (if tracked) */ + +}; + +/** + * Message to monitor get results going through peer, DHT service -> clients. + */ +struct GNUNET_DHT_MonitorGetRespMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Content type. + */ + uint32_t type GNUNET_PACKED; + + /** + * Length of the PUT path that follows (if tracked). + */ + uint32_t put_path_length GNUNET_PACKED; + + /** + * Length of the GET path that follows (if tracked). + */ + uint32_t get_path_length GNUNET_PACKED; + + /** + * When does the content expire? + */ + struct GNUNET_TIME_AbsoluteNBO expiration_time; + + /** + * The key of the corresponding GET request. + */ + GNUNET_HashCode key; + /* put path (if tracked) */ /* get path (if tracked) */ diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 3cb13b4..420eacb 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010 Christian Grothoff (and other contributing authors) + (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -74,11 +74,6 @@ struct PendingMessage void *cont_cls; /** - * Timeout task for this message - */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; - - /** * Unique ID for this request */ uint64_t unique_id; @@ -100,6 +95,56 @@ struct PendingMessage /** + * Handle to a PUT request. + */ +struct GNUNET_DHT_PutHandle +{ + /** + * Kept in a DLL. + */ + struct GNUNET_DHT_PutHandle *next; + + /** + * Kept in a DLL. + */ + struct GNUNET_DHT_PutHandle *prev; + + /** + * Continuation to call when done. + */ + GNUNET_DHT_PutContinuation cont; + + /** + * Pending message associated with this PUT operation, + * NULL after the message has been transmitted to the service. + */ + struct PendingMessage *pending; + + /** + * Main handle to this DHT api + */ + struct GNUNET_DHT_Handle *dht_handle; + + /** + * Closure for 'cont'. + */ + void *cont_cls; + + /** + * Timeout task for this operation. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Unique ID for the PUT operation. + */ + uint64_t unique_id; + +}; + + + +/** * Handle to a GET request */ struct GNUNET_DHT_GetHandle @@ -171,9 +216,19 @@ struct GNUNET_DHT_MonitorHandle GNUNET_HashCode *key; /** - * Callback for each received message of interest. + * Callback for each received message of type get. + */ + GNUNET_DHT_MonitorGetCB get_cb; + + /** + * Callback for each received message of type get response. + */ + GNUNET_DHT_MonitorGetRespCB get_resp_cb; + + /** + * Callback for each received message of type put. */ - GNUNET_DHT_MonitorCB cb; + GNUNET_DHT_MonitorPutCB put_cb; /** * Closure for cb. @@ -225,8 +280,18 @@ struct GNUNET_DHT_Handle struct GNUNET_DHT_MonitorHandle *monitor_tail; /** - * Hash map containing the current outstanding unique requests - * (values are of type 'struct GNUNET_DHT_RouteHandle'). + * Head of active PUT requests. + */ + struct GNUNET_DHT_PutHandle *put_head; + + /** + * Tail of active PUT requests. + */ + struct GNUNET_DHT_PutHandle *put_tail; + + /** + * Hash map containing the current outstanding unique GET requests + * (values are of type 'struct GNUNET_DHT_GetHandle'). */ struct GNUNET_CONTAINER_MultiHashMap *active_requests; @@ -257,6 +322,8 @@ struct GNUNET_DHT_Handle * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types * + * @param cls the 'struct GNUNET_DHT_Handle' + * @param msg the incoming message */ static void service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); @@ -265,16 +332,17 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); /** * Try to (re)connect to the DHT service. * + * @param handle DHT handle to reconnect * @return GNUNET_YES on success, GNUNET_NO on failure. */ static int try_connect (struct GNUNET_DHT_Handle *handle) { - if (handle->client != NULL) + if (NULL != handle->client) return GNUNET_OK; handle->in_receive = GNUNET_NO; handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg); - if (handle->client == NULL) + if (NULL == handle->client) { LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to connect to the DHT service!\n")); @@ -314,6 +382,7 @@ add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value) /** * Try to send messages from list of messages to send + * * @param handle DHT_Handle */ static void @@ -359,17 +428,34 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void do_disconnect (struct GNUNET_DHT_Handle *handle) { - if (handle->client == NULL) + struct GNUNET_DHT_PutHandle *ph; + struct GNUNET_DHT_PutHandle *next; + + if (NULL == handle->client) return; - GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task); if (NULL != handle->th) GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from DHT service, will try to reconnect in %llu ms\n", (unsigned long long) handle->retry_time.rel_value); - GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); + GNUNET_CLIENT_disconnect (handle->client); handle->client = NULL; + + /* signal disconnect to all PUT requests that were transmitted but waiting + for the put confirmation */ + next = handle->put_head; + while (NULL != (ph = next)) + { + next = ph->next; + if (NULL == ph->pending) + { + if (NULL != ph->cont) + ph->cont (ph->cont_cls, GNUNET_SYSERR); + GNUNET_DHT_put_cancel (ph); + } + } handle->reconnect_task = GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle); } @@ -377,6 +463,11 @@ do_disconnect (struct GNUNET_DHT_Handle *handle) /** * Transmit the next pending message, called by notify_transmit_ready + * + * @param cls the DHT handle + * @param size number of bytes available in 'buf' for transmission + * @param buf where to copy messages for the service + * @return number of bytes written to 'buf' */ static size_t transmit_pending (void *cls, size_t size, void *buf); @@ -384,20 +475,22 @@ transmit_pending (void *cls, size_t size, void *buf); /** * Try to send messages from list of messages to send + * + * @param handle handle to DHT */ static void process_pending_messages (struct GNUNET_DHT_Handle *handle) { struct PendingMessage *head; - if (handle->client == NULL) + if (NULL == handle->client) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "process_pending_messages called, but client is null, reconnecting\n"); + "process_pending_messages called, but client is NULL, reconnecting\n"); do_disconnect (handle); return; } - if (handle->th != NULL) + if (NULL != handle->th) return; if (NULL == (head = handle->pending_head)) return; @@ -417,6 +510,11 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle) /** * Transmit the next pending message, called by notify_transmit_ready + * + * @param cls the DHT handle + * @param size number of bytes available in 'buf' for transmission + * @param buf where to copy messages for the service + * @return number of bytes written to 'buf' */ static size_t transmit_pending (void *cls, size_t size, void *buf) @@ -426,7 +524,7 @@ transmit_pending (void *cls, size_t size, void *buf) size_t tsize; handle->th = NULL; - if (buf == NULL) + if (NULL == buf) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission to DHT service failed! Reconnecting!\n"); @@ -446,11 +544,6 @@ transmit_pending (void *cls, size_t size, void *buf) GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, head); head->in_pending_queue = GNUNET_NO; - if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (head->timeout_task); - head->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } if (NULL != head->cont) { head->cont (head->cont_cls, NULL); @@ -533,63 +626,178 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) return GNUNET_YES; } - /** - * Process a monitoring message from the service. + * Process a get monitor message from the service. * * @param handle The DHT handle. - * @param msg Message from the service. + * @param msg Monitor get message from the service. * * @return GNUNET_OK if everything went fine, * GNUNET_SYSERR if the message is malformed. */ static int -process_monitor_message (struct GNUNET_DHT_Handle *handle, - const struct GNUNET_MessageHeader *msg) +process_monitor_get_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorGetMessage *msg) { - struct GNUNET_DHT_MonitorMessage *m; struct GNUNET_DHT_MonitorHandle *h; + + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->get_cb)) + h->get_cb (h->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + ntohl (msg->get_path_length), + (struct GNUNET_PeerIdentity *) &msg[1], + &msg->key); + } + return GNUNET_OK; +} + + +/** + * Process a get response monitor message from the service. + * + * @param handle The DHT handle. + * @param msg monitor get response message from the service + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorGetRespMessage + *msg) +{ + struct GNUNET_DHT_MonitorHandle *h; + struct GNUNET_PeerIdentity *path; + uint32_t getl; + uint32_t putl; size_t msize; - if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || - ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) - return GNUNET_SYSERR; - msize = ntohs (msg->size); - if (msize < sizeof (struct GNUNET_DHT_MonitorMessage)) + msize = ntohs (msg->header.size); + path = (struct GNUNET_PeerIdentity *) &msg[1]; + getl = ntohl (msg->get_path_length); + putl = ntohl (msg->put_path_length); + if ( (getl + putl < getl) || + ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) ) + { + GNUNET_break (0); return GNUNET_SYSERR; + } + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->get_resp_cb)) + h->get_resp_cb (h->cb_cls, + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + path, getl, + &path[getl], putl, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (void *) &path[getl + putl], + msize - + sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - + sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); + } + return GNUNET_OK; +} - m = (struct GNUNET_DHT_MonitorMessage *) msg; - h = handle->monitor_head; - while (NULL != h) + +/** + * Process a put monitor message from the service. + * + * @param handle The DHT handle. + * @param msg Monitor put message from the service. + * + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_put_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorPutMessage *msg) +{ + struct GNUNET_DHT_MonitorHandle *h; + size_t msize; + struct GNUNET_PeerIdentity *path; + uint32_t putl; + + msize = ntohs (msg->header.size); + path = (struct GNUNET_PeerIdentity *) &msg[1]; + putl = ntohl (msg->put_path_length); + if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl) { - if (h->type == ntohl(m->type) && - (NULL == h->key || - memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0)) - { - struct GNUNET_PeerIdentity *path; - uint32_t getl; - uint32_t putl; - - path = (struct GNUNET_PeerIdentity *) &m[1]; - getl = ntohl (m->get_path_length); - putl = ntohl (m->put_path_length); - h->cb (h->cb_cls, ntohs(msg->type), - GNUNET_TIME_absolute_ntoh(m->expiration), - &m->key, - &path[getl], putl, path, getl, - ntohl (m->desired_replication_level), - ntohl (m->options), ntohl (m->type), - (void *) &path[getl + putl], - ntohs (msg->size) - - sizeof (struct GNUNET_DHT_MonitorMessage) - - sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); - } - h = h->next; + GNUNET_break (0); + return GNUNET_SYSERR; + } + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->put_cb)) + h->put_cb (h->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + putl, path, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (void *) &path[putl], + msize - + sizeof (struct GNUNET_DHT_MonitorPutMessage) - + sizeof (struct GNUNET_PeerIdentity) * putl); } + return GNUNET_OK; +} + + +/** + * Process a put confirmation message from the service. + * + * @param handle The DHT handle. + * @param msg confirmation message from the service. + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_put_confirmation_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_ClientPutConfirmationMessage *msg) +{ + struct GNUNET_DHT_PutHandle *ph; + GNUNET_DHT_PutContinuation cont; + void *cont_cls; + for (ph = handle->put_head; NULL != ph; ph = ph->next) + if (ph->unique_id == msg->unique_id) + break; + if (NULL == ph) + return GNUNET_OK; + cont = ph->cont; + cont_cls = ph->cont_cls; + GNUNET_DHT_put_cancel (ph); + if (NULL != cont) + cont (cont_cls, GNUNET_OK); return GNUNET_OK; } + /** * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types @@ -602,38 +810,84 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_DHT_Handle *handle = cls; const struct GNUNET_DHT_ClientResultMessage *dht_msg; + uint16_t msize; + int ret; - if (msg == NULL) + if (NULL == msg) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Error receiving data from DHT service, reconnecting\n"); do_disconnect (handle); return; } - if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT) + ret = GNUNET_SYSERR; + msize = ntohs (msg->size); + switch (ntohs (msg->type)) { - if (process_monitor_message (handle, msg) == GNUNET_OK) + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: + if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage)) { - GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, - GNUNET_TIME_UNIT_FOREVER_REL); - return; + GNUNET_break (0); + break; } - GNUNET_break (0); - do_disconnect (handle); - return; + ret = process_monitor_get_message(handle, + (const struct GNUNET_DHT_MonitorGetMessage *) msg); + break; + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: + if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) + { + GNUNET_break (0); + break; + } + ret = process_monitor_get_resp_message(handle, + (const struct GNUNET_DHT_MonitorGetRespMessage *) msg); + break; + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: + if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) + { + GNUNET_break (0); + break; + } + ret = process_monitor_put_message(handle, + (const struct GNUNET_DHT_MonitorPutMessage *) msg); + break; + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP: + /* Not implemented yet */ + GNUNET_break(0); + break; + case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT: + if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) + { + GNUNET_break (0); + break; + } + ret = GNUNET_OK; + dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n", + GNUNET_h2s (&dht_msg->key), handle); + GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, + &dht_msg->key, &process_reply, + (void *) dht_msg); + break; + case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK: + if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)) + { + GNUNET_break (0); + break; + } + ret = process_put_confirmation_message (handle, + (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg); + break; + default: + GNUNET_break(0); + break; } - if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) + if (GNUNET_OK != ret) { GNUNET_break (0); do_disconnect (handle); return; } - dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n", - GNUNET_h2s (&dht_msg->key), handle); - GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, - &dht_msg->key, &process_reply, - (void *) dht_msg); GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, GNUNET_TIME_UNIT_FOREVER_REL); } @@ -677,11 +931,12 @@ void GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) { struct PendingMessage *pm; + struct GNUNET_DHT_PutHandle *ph; - GNUNET_assert (handle != NULL); + GNUNET_assert (NULL != handle); GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); - if (handle->th != NULL) + if (NULL != handle->th) { GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; @@ -693,18 +948,24 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) pm); pm->in_pending_queue = GNUNET_NO; GNUNET_assert (GNUNET_YES == pm->free_on_send); - if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task) - GNUNET_SCHEDULER_cancel (pm->timeout_task); if (NULL != pm->cont) pm->cont (pm->cont_cls, NULL); GNUNET_free (pm); } - if (handle->client != NULL) + while (NULL != (ph = handle->put_head)) { - GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES); + GNUNET_break (NULL == ph->pending); + if (NULL != ph->cont) + ph->cont (ph->cont_cls, GNUNET_SYSERR); + GNUNET_DHT_put_cancel (ph); + } + + if (NULL != handle->client) + { + GNUNET_CLIENT_disconnect (handle->client); handle->client = NULL; } - if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) + if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task) GNUNET_SCHEDULER_cancel (handle->reconnect_task); GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); GNUNET_free (handle); @@ -720,22 +981,49 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) static void timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct PendingMessage *pending = cls; - struct GNUNET_DHT_Handle *handle; + struct GNUNET_DHT_PutHandle *ph = cls; + struct GNUNET_DHT_Handle *handle = ph->dht_handle; - handle = pending->handle; - GNUNET_assert (GNUNET_YES == pending->in_pending_queue); - GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, - pending); - pending->in_pending_queue = GNUNET_NO; - if (pending->cont != NULL) - pending->cont (pending->cont_cls, tc); - GNUNET_free (pending); + ph->timeout_task = GNUNET_SCHEDULER_NO_TASK; + if (NULL != ph->pending) + { + GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, + ph->pending); + ph->pending->in_pending_queue = GNUNET_NO; + GNUNET_free (ph->pending); + } + if (NULL != ph->cont) + ph->cont (ph->cont_cls, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (handle->put_head, + handle->put_tail, + ph); + GNUNET_free (ph); +} + + +/** + * Function called whenever the PUT message leaves the queue. Sets + * the message pointer in the put handle to NULL. + * + * @param cls the 'struct GNUNET_DHT_PutHandle' + * @param tc unused + */ +static void +mark_put_message_gone (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_DHT_PutHandle *ph = cls; + + ph->pending = NULL; } /** - * Perform a PUT operation storing data in the DHT. + * Perform a PUT operation storing data in the DHT. FIXME: we should + * change the protocol to get a confirmation for the PUT from the DHT + * and call 'cont' only after getting the confirmation; otherwise, the + * client has no good way of telling if the 'PUT' message actually got + * to the DHT service! * * @param handle handle to DHT service * @param key the key to store under @@ -748,51 +1036,97 @@ timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param exp desired expiration time for the value * @param timeout how long to wait for transmission of this request * @param cont continuation to call when done (transmitting request to service) + * You must not call GNUNET_DHT_DISCONNECT in this continuation * @param cont_cls closure for cont */ -void +struct GNUNET_DHT_PutHandle * GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, enum GNUNET_BLOCK_Type type, size_t size, const char *data, struct GNUNET_TIME_Absolute exp, - struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont, + struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont, void *cont_cls) { struct GNUNET_DHT_ClientPutMessage *put_msg; size_t msize; struct PendingMessage *pending; + struct GNUNET_DHT_PutHandle *ph; msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) { GNUNET_break (0); - if (NULL != cont) - cont (cont_cls, NULL); - return; + return NULL; } + ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle)); + ph->dht_handle = handle; + ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph); + ph->cont = cont; + ph->cont_cls = cont_cls; + ph->unique_id = ++handle->uid_gen; pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); + ph->pending = pending; put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1]; pending->msg = &put_msg->header; pending->handle = handle; - pending->cont = cont; - pending->cont_cls = cont_cls; + pending->cont = &mark_put_message_gone; + pending->cont_cls = ph; pending->free_on_send = GNUNET_YES; - pending->timeout_task = - GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending); put_msg->header.size = htons (msize); put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); put_msg->type = htonl (type); put_msg->options = htonl ((uint32_t) options); put_msg->desired_replication_level = htonl (desired_replication_level); + put_msg->unique_id = ph->unique_id; put_msg->expiration = GNUNET_TIME_absolute_hton (exp); put_msg->key = *key; memcpy (&put_msg[1], data, size); GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, pending); pending->in_pending_queue = GNUNET_YES; + GNUNET_CONTAINER_DLL_insert_tail (handle->put_head, + handle->put_tail, + ph); process_pending_messages (handle); + return ph; +} + + +/** + * Cancels a DHT PUT operation. Note that the PUT request may still + * go out over the network (we can't stop that); However, if the PUT + * has not yet been sent to the service, cancelling the PUT will stop + * this from happening (but there is no way for the user of this API + * to tell if that is the case). The only use for this API is to + * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because + * the system is shutting down). + * + * @param ph put operation to cancel ('cont' will no longer be called) + */ +void +GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph) +{ + struct GNUNET_DHT_Handle *handle = ph->dht_handle; + + if (NULL != ph->pending) + { + GNUNET_CONTAINER_DLL_remove (handle->pending_head, + handle->pending_tail, + ph->pending); + GNUNET_free (ph->pending); + ph->pending = NULL; + } + if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (ph->timeout_task); + ph->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_CONTAINER_DLL_remove (handle->put_head, + handle->put_tail, + ph); + GNUNET_free (ph); } @@ -801,7 +1135,6 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, * also "GNUNET_BLOCK_evaluate". * * @param handle handle to the DHT service - * @param timeout how long to wait for transmission of this request to the service * @param type expected type of the response object * @param key the key to look up * @param desired_replication_level estimate of how many @@ -815,7 +1148,6 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, */ struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, - struct GNUNET_TIME_Relative timeout, enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, const void *xquery, @@ -847,8 +1179,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, get_msg->desired_replication_level = htonl (desired_replication_level); get_msg->type = htonl (type); get_msg->key = *key; - handle->uid_gen++; - get_msg->unique_id = handle->uid_gen; + get_msg->unique_id = ++handle->uid_gen; memcpy (&get_msg[1], xquery, xquery_size); GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, pending); @@ -925,7 +1256,9 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) * @param handle Handle to the DHT service. * @param type Type of blocks that are of interest. * @param key Key of data of interest, NULL for all. - * @param cb Callback to process all monitored data. + * @param get_cb Callback to process monitored get messages. + * @param get_resp_cb Callback to process monitored get response messages. + * @param put_cb Callback to process monitored put messages. * @param cb_cls Closure for cb. * * @return Handle to stop monitoring. @@ -934,18 +1267,21 @@ struct GNUNET_DHT_MonitorHandle * GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, enum GNUNET_BLOCK_Type type, const GNUNET_HashCode *key, - GNUNET_DHT_MonitorCB cb, + GNUNET_DHT_MonitorGetCB get_cb, + GNUNET_DHT_MonitorGetRespCB get_resp_cb, + GNUNET_DHT_MonitorPutCB put_cb, void *cb_cls) { struct GNUNET_DHT_MonitorHandle *h; - struct GNUNET_DHT_MonitorMessage *m; + struct GNUNET_DHT_MonitorStartStopMessage *m; struct PendingMessage *pending; h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle)); GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); - GNUNET_assert (NULL != cb); - h->cb = cb; + h->get_cb = get_cb; + h->get_resp_cb = get_resp_cb; + h->put_cb = put_cb; h->cb_cls = cb_cls; h->type = type; h->dht_handle = handle; @@ -955,17 +1291,22 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, memcpy (h->key, key, sizeof(GNUNET_HashCode)); } - pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) + + pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) + sizeof (struct PendingMessage)); - m = (struct GNUNET_DHT_MonitorMessage *) &pending[1]; + m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1]; pending->msg = &m->header; pending->handle = handle; pending->free_on_send = GNUNET_YES; - m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); - m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage)); + m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START); + m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage)); m->type = htonl(type); - if (NULL != key) + m->get = htons(NULL != get_cb); + m->get_resp = htons(NULL != get_resp_cb); + m->put = htons(NULL != put_cb); + if (NULL != key) { + m->filter_key = htons(1); memcpy (&m->key, key, sizeof(GNUNET_HashCode)); + } GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, pending); pending->in_pending_queue = GNUNET_YES; @@ -985,10 +1326,36 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, void GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle) { - GNUNET_free_non_null (handle->key); + struct GNUNET_DHT_MonitorStartStopMessage *m; + struct PendingMessage *pending; + GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head, handle->dht_handle->monitor_tail, handle); + + pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) + + sizeof (struct PendingMessage)); + m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1]; + pending->msg = &m->header; + pending->handle = handle->dht_handle; + pending->free_on_send = GNUNET_YES; + m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP); + m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage)); + m->type = htonl(handle->type); + m->get = htons(NULL != handle->get_cb); + m->get_resp = htons(NULL != handle->get_resp_cb); + m->put = htons(NULL != handle->put_cb); + if (NULL != handle->key) { + m->filter_key = htons(1); + memcpy (&m->key, handle->key, sizeof(GNUNET_HashCode)); + } + GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head, + handle->dht_handle->pending_tail, + pending); + pending->in_pending_queue = GNUNET_YES; + process_pending_messages (handle->dht_handle); + + GNUNET_free_non_null (handle->key); GNUNET_free (handle); } diff --git a/src/dht/gnunet-dht-get.c b/src/dht/gnunet-dht-get.c index 6ad4b30..fb185c4 100644 --- a/src/dht/gnunet-dht-get.c +++ b/src/dht/gnunet-dht-get.c @@ -186,7 +186,7 @@ run (void *cls, char *const *args, const char *cfgfile, GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_absolute_get_remaining (absolute_timeout), &cleanup_task, NULL); get_handle = - GNUNET_DHT_get_start (dht_handle, timeout, query_type, &key, replication, + GNUNET_DHT_get_start (dht_handle, query_type, &key, replication, GNUNET_DHT_RO_NONE, NULL, 0, &get_result_iterator, NULL); diff --git a/src/dht/gnunet-dht-monitor.c b/src/dht/gnunet-dht-monitor.c new file mode 100644 index 0000000..8ca3beb --- /dev/null +++ b/src/dht/gnunet-dht-monitor.c @@ -0,0 +1,325 @@ +/* + This file is part of GNUnet. + (C) 2012 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ +/** + * @file dht/gnunet-dht-monitor.c + * @brief search for data in DHT + * @author Christian Grothoff + * @author Bartlomiej Polot + */ +#include "platform.h" +#include "gnunet_dht_service.h" + +/** + * The type of the query + */ +static unsigned int block_type; + +/** + * The key to be monitored + */ +static char *query_key; + +/** + * User supplied timeout value (in seconds) + */ +static unsigned long long timeout_request = 5; + +/** + * Be verbose + */ +static int verbose; + +/** +* Handle to the DHT + */ +static struct GNUNET_DHT_Handle *dht_handle; + +/** + * Global handle of the configuration + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Handle for the get request + */ +static struct GNUNET_DHT_MonitorHandle *monitor_handle; + +/** + * Count of messages received + */ +static unsigned int result_count; + +/** + * Global status value + */ +static int ret; + + +/** + * Function called on shutdown, disconnects from DHT if necessary. + * + * @param cls closure (unused) + * @param tc Task Context + */ +static void +shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if (verbose) + FPRINTF (stderr, "%s", "Shutting down!\n"); + if (dht_handle != NULL) + { + GNUNET_DHT_disconnect (dht_handle); + dht_handle = NULL; + } +} + + +/** + * Stop monitoring request and start shutdown + * + * @param cls closure (unused) + * @param tc Task Context + */ +static void +cleanup_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + if (verbose) + FPRINTF (stderr, "%s", "Cleaning up!\n"); + if (monitor_handle != NULL) + { + GNUNET_DHT_monitor_stop (monitor_handle); + monitor_handle = NULL; + } + GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); +} + + +/** + * Callback called on each GET request going through the DHT. + * + * @param cls Closure. + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the GET path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param key Key of the requested data. + */ +void +get_callback (void *cls, + enum GNUNET_DHT_RouteOption options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + const GNUNET_HashCode * key) +{ + FPRINTF (stdout, "Result %d, operation: %s, type %d\n Key: %s", + result_count, + "GET", + type, + GNUNET_h2s_full(key)); + result_count++; +} + +/** + * Callback called on each GET reply going through the DHT. + * + * @param cls Closure. + * @param type The type of data in the result. + * @param get_path Peers on GET path (or NULL if not recorded). + * @param get_path_length number of entries in get_path. + * @param put_path peers on the PUT path (or NULL if not recorded). + * @param put_path_length number of entries in get_path. + * @param exp Expiration time of the data. + * @param key Key of the data. + * @param data Pointer to the result data. + * @param size Number of bytes in data. + */ +void +get_resp_callback (void *cls, + enum GNUNET_BLOCK_Type type, + const struct GNUNET_PeerIdentity *get_path, + unsigned int get_path_length, + const struct GNUNET_PeerIdentity *put_path, + unsigned int put_path_length, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const void *data, + size_t size) +{ + FPRINTF (stdout, "Result %d, operation: %s, type %d:\n Key: %s\n %.*s\n", + result_count, + "GET_RESP", + type, + GNUNET_h2s_full(key), + (unsigned int) size, + (char *) data); + result_count++; +} + +/** + * Callback called on each PUT request going through the DHT. + * + * @param cls Closure. + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the PUT path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param exp Expiration time of the data. + * @param key Key under which data is to be stored. + * @param data Pointer to the data carried. + * @param size Number of bytes in data. + */ +void +put_callback (void *cls, + enum GNUNET_DHT_RouteOption options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const void *data, + size_t size) +{ + FPRINTF (stdout, "Result %d, operation: %s, type %d:\n Key: %s\n %.*s\n", + result_count, + "PUT", + type, + GNUNET_h2s_full(key), + (unsigned int) size, + (char *) data); + result_count++; +} + +/** + * Main function that will be run by the scheduler. + * + * @param cls closure + * @param args remaining command-line arguments + * @param cfgfile name of the configuration file used (for saving, can be NULL!) + * @param c configuration + */ +static void +run (void *cls, char *const *args, const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + struct GNUNET_TIME_Relative timeout; + GNUNET_HashCode *key; + + cfg = c; + + dht_handle = GNUNET_DHT_connect (cfg, 1); + + if (dht_handle == NULL) + { + if (verbose) + FPRINTF (stderr, "%s", "Couldn't connect to DHT service!\n"); + ret = 1; + return; + } + else if (verbose) + FPRINTF (stderr, "%s", "Connected to DHT service!\n"); + + if (block_type == GNUNET_BLOCK_TYPE_ANY) /* Type of data not set */ + block_type = GNUNET_BLOCK_TYPE_TEST; + + if (query_key != NULL) { + key = GNUNET_malloc (sizeof(GNUNET_HashCode)); + GNUNET_CRYPTO_hash (query_key, strlen (query_key), key); + } + else + key = NULL; + + if (0 != timeout_request) + { + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + timeout_request); + if (verbose) + FPRINTF (stderr, "Monitoring for %llus\n", timeout_request); + } + else + { + timeout = GNUNET_TIME_UNIT_FOREVER_REL; + if (verbose) + FPRINTF (stderr, "%s", "Monitoring indefinitely (close with Ctrl+C)\n"); + } + + GNUNET_SCHEDULER_add_delayed (timeout, &cleanup_task, NULL); + if (verbose) + FPRINTF (stderr, "Issuing MONITOR request for %s!\n", query_key); + monitor_handle = GNUNET_DHT_monitor_start (dht_handle, + block_type, + key, + &get_callback, + &get_resp_callback, + &put_callback, + NULL); + if (verbose) + FPRINTF (stderr, "%s", "MONITOR started!\n"); + GNUNET_free_non_null (key); + +} + + +/** + * gnunet-dht-get command line options + */ +static struct GNUNET_GETOPT_CommandLineOption options[] = { + {'k', "key", "KEY", + gettext_noop ("the query key"), + 1, &GNUNET_GETOPT_set_string, &query_key}, + {'t', "type", "TYPE", + gettext_noop ("the type of data to look for"), + 1, &GNUNET_GETOPT_set_uint, &block_type}, + {'T', "timeout", "TIMEOUT", + gettext_noop ("how long to execute? 0 = forever"), + 1, &GNUNET_GETOPT_set_ulong, &timeout_request}, + {'V', "verbose", NULL, + gettext_noop ("be verbose (print progress information)"), + 0, &GNUNET_GETOPT_set_one, &verbose}, + GNUNET_GETOPT_OPTION_END +}; + + +/** + * Entry point for gnunet-dht-monitor + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, char *const *argv) +{ + return (GNUNET_OK == + GNUNET_PROGRAM_run (argc, argv, "gnunet-dht-get", + gettext_noop + ("Prints all packets that go through the DHT."), + options, &run, NULL)) ? ret : 1; +} + +/* end of gnunet-dht-monitor.c */ diff --git a/src/dht/gnunet-dht-put.c b/src/dht/gnunet-dht-put.c index ef5ae5e..59acc79 100644 --- a/src/dht/gnunet-dht-put.c +++ b/src/dht/gnunet-dht-put.c @@ -80,7 +80,7 @@ static char *data; static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - if (dht_handle != NULL) + if (NULL != dht_handle) { GNUNET_DHT_disconnect (dht_handle); dht_handle = NULL; @@ -91,13 +91,33 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * Signature of the main function of a task. * * @param cls closure - * @param tc context information (why was this task triggered now) + * @param success GNUNET_OK if the PUT was transmitted, + * GNUNET_NO on timeout, + * GNUNET_SYSERR on disconnect from service + * after the PUT message was transmitted + * (so we don't know if it was received or not) */ -void -message_sent_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +static void +message_sent_cont (void *cls, int success) { if (verbose) - FPRINTF (stderr, "%s", _("PUT request sent!\n")); + { + switch (success) + { + case GNUNET_OK: + FPRINTF (stderr, "%s", _("PUT request sent!\n")); + break; + case GNUNET_NO: + FPRINTF (stderr, "%s", _("Timeout sending PUT request!\n")); + break; + case GNUNET_SYSERR: + FPRINTF (stderr, "%s", _("PUT request not confirmed!\n")); + break; + default: + GNUNET_break (0); + break; + } + } GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); } diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index 96fcd34..d897d1f 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -87,7 +87,7 @@ struct ClientList * Handle to the current transmission request, NULL * if none pending. */ - struct GNUNET_CONNECTION_TransmitHandle *transmit_handle; + struct GNUNET_SERVER_TransmitHandle *transmit_handle; /** * Linked list of pending messages for this client @@ -204,6 +204,21 @@ struct ClientMonitorRecord GNUNET_HashCode *key; /** + * Flag whether to notify about GET messages. + */ + int16_t get; + + /** + * Flag whether to notify about GET_REPONSE messages. + */ + int16_t get_resp; + + /** + * Flag whether to notify about PUT messages. + */ + uint16_t put; + + /** * Client to notify of these requests. */ struct ClientList *client; @@ -247,6 +262,31 @@ static GNUNET_SCHEDULER_TaskIdentifier retry_task; /** + * Task run to check for messages that need to be sent to a client. + * + * @param client a ClientList, containing the client and any messages to be sent to it + */ +static void +process_pending_messages (struct ClientList *client); + + +/** + * Add a PendingMessage to the clients list of messages to be sent + * + * @param client the active client to send the message to + * @param pending_message the actual message to send + */ +static void +add_pending_message (struct ClientList *client, + struct PendingMessage *pending_message) +{ + GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail, + pending_message); + process_pending_messages (client); +} + + +/** * Find a client if it exists, add it otherwise. * * @param client the server handle to the client @@ -289,11 +329,9 @@ remove_client_records (void *cls, const GNUNET_HashCode * key, void *value) if (record->client != client) return GNUNET_YES; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing client %p's record for key %s\n", client, GNUNET_h2s (key)); -#endif GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (forward_map, key, record)); @@ -320,13 +358,11 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) struct PendingMessage *reply; struct ClientMonitorRecord *monitor; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", client); -#endif pos = find_active_client (client); GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos); if (pos->transmit_handle != NULL) - GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle); + GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle); while (NULL != (reply = pos->pending_head)) { GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, reply); @@ -449,6 +485,8 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_DHT_ClientPutMessage *dht_msg; struct GNUNET_CONTAINER_BloomFilter *peer_bf; uint16_t size; + struct PendingMessage *pm; + struct GNUNET_DHT_ClientPutConfirmationMessage *conf; size = ntohs (message->size); if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) @@ -463,12 +501,10 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_NO); dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; /* give to local clients */ -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Handling local PUT of %u-bytes for query %s\n", size - sizeof (struct GNUNET_DHT_ClientPutMessage), GNUNET_h2s (&dht_msg->key)); -#endif GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration), &dht_msg->key, 0, NULL, 0, NULL, ntohl (dht_msg->type), @@ -490,7 +526,26 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1], size - sizeof (struct GNUNET_DHT_ClientPutMessage)); + GDS_CLIENTS_process_put (ntohl (dht_msg->options), + ntohl (dht_msg->type), + 0, + ntohl (dht_msg->desired_replication_level), + 1, + GDS_NEIGHBOURS_get_id(), + GNUNET_TIME_absolute_ntoh (dht_msg->expiration), + &dht_msg->key, + &dht_msg[1], + size - sizeof (struct GNUNET_DHT_ClientPutMessage)); GNUNET_CONTAINER_bloomfilter_free (peer_bf); + pm = GNUNET_malloc (sizeof (struct PendingMessage) + + sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); + conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1]; + conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)); + conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK); + conf->reserved = htonl (0); + conf->unique_id = dht_msg->unique_id; + pm->msg = &conf->header; + add_pending_message (find_active_client (client), pm); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -528,11 +583,9 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, gettext_noop ("# GET requests received from clients"), 1, GNUNET_NO); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received request for %s from local client %p\n", GNUNET_h2s (&get->key), client); -#endif cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); cqr->key = get->key; cqr->client = find_active_client (client); @@ -548,6 +601,13 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, cqr->type = ntohl (get->type); GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + GDS_CLIENTS_process_get (ntohl (get->options), + ntohl (get->type), + 0, + ntohl (get->desired_replication_level), + 1, + GDS_NEIGHBOURS_get_id(), + &get->key); /* start remote requests */ if (GNUNET_SCHEDULER_NO_TASK != retry_task) GNUNET_SCHEDULER_cancel (retry_task); @@ -593,11 +653,9 @@ remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value) if (record->unique_id != ctx->unique_id) return GNUNET_YES; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Removing client %p's record for key %s (by unique id)\n", ctx->client->client_handle, GNUNET_h2s (key)); -#endif return remove_client_records (ctx->client, key, record); } @@ -623,10 +681,8 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client, gettext_noop ("# GET STOP requests received from clients"), 1, GNUNET_NO); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key %s\n", client, GNUNET_h2s (&dht_stop_msg->key)); -#endif ctx.client = find_active_client (client); ctx.unique_id = dht_stop_msg->unique_id; GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->key, @@ -636,7 +692,7 @@ handle_dht_local_get_stop (void *cls, struct GNUNET_SERVER_Client *client, /** - * Handler for monitor messages + * Handler for monitor start messages * * @param cls closure for the service * @param client the client we received this message from @@ -648,37 +704,74 @@ handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct ClientMonitorRecord *r; - const struct GNUNET_DHT_MonitorMessage *msg; - unsigned int i; - char *c; + const struct GNUNET_DHT_MonitorStartStopMessage *msg; - msg = (struct GNUNET_DHT_MonitorMessage *) message; + msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; r = GNUNET_malloc (sizeof(struct ClientMonitorRecord)); r->client = find_active_client(client); r->type = ntohl(msg->type); - c = (char *) &msg->key; - for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++); - if (sizeof (GNUNET_HashCode) == i) - r->key = NULL; + r->get = ntohs(msg->get); + r->get_resp = ntohs(msg->get_resp); + r->put = ntohs(msg->put); + if (0 == ntohs(msg->filter_key)) + r->key = NULL; else { r->key = GNUNET_malloc (sizeof (GNUNET_HashCode)); memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode)); } GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); - // FIXME add remove somewhere GNUNET_SERVER_receive_done (client, GNUNET_OK); } - /** - * Task run to check for messages that need to be sent to a client. + * Handler for monitor stop messages + * + * @param cls closure for the service + * @param client the client we received this message from + * @param message the actual message received * - * @param client a ClientList, containing the client and any messages to be sent to it */ static void -process_pending_messages (struct ClientList *client); +handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + struct ClientMonitorRecord *r; + const struct GNUNET_DHT_MonitorStartStopMessage *msg; + int keys_match; + + msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; + r = monitor_head; + + while (NULL != r) + { + if (NULL == r->key) + keys_match = (0 == ntohs(msg->filter_key)); + else + { + keys_match = (0 != ntohs(msg->filter_key) + && !memcmp(r->key, &msg->key, sizeof(GNUNET_HashCode))); + } + if (find_active_client(client) == r->client + && ntohl(msg->type) == r->type + && r->get == msg->get + && r->get_resp == msg->get_resp + && r->put == msg->put + && keys_match + ) + { + GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r); + GNUNET_free_non_null (r->key); + GNUNET_free (r); + GNUNET_SERVER_receive_done (client, GNUNET_OK); + return; /* Delete only ONE entry */ + } + r = r->next; + } + + GNUNET_SERVER_receive_done (client, GNUNET_OK); +} /** @@ -706,11 +799,9 @@ send_reply_to_client (void *cls, size_t size, void *buf) if (buf == NULL) { /* client disconnected */ -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected, pending messages will be discarded\n", client->client_handle); -#endif return 0; } off = 0; @@ -721,17 +812,13 @@ send_reply_to_client (void *cls, size_t size, void *buf) reply); memcpy (&cbuf[off], reply->msg, msize); GNUNET_free (reply); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client %p\n", msize, client->client_handle); -#endif off += msize; } process_pending_messages (client); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client %p\n", (unsigned int) off, (unsigned int) size, client->client_handle); -#endif return off; } @@ -746,20 +833,16 @@ process_pending_messages (struct ClientList *client) { if ((client->pending_head == NULL) || (client->transmit_handle != NULL)) { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Not asking for transmission to %p now: %s\n", client->client_handle, client->pending_head == NULL ? "no more messages" : "request already pending"); -#endif return; } -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Asking for transmission of %u bytes to client %p\n", ntohs (client->pending_head->msg->size), client->client_handle); -#endif client->transmit_handle = GNUNET_SERVER_notify_transmit_ready (client->client_handle, ntohs (client->pending_head-> @@ -770,22 +853,6 @@ process_pending_messages (struct ClientList *client) /** - * Add a PendingMessage to the clients list of messages to be sent - * - * @param client the active client to send the message to - * @param pending_message the actual message to send - */ -static void -add_pending_message (struct ClientList *client, - struct PendingMessage *pending_message) -{ - GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_tail, - pending_message); - process_pending_messages (client); -} - - -/** * Closure for 'forward_reply' */ struct ForwardReplyContext @@ -844,11 +911,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type)) { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Record type missmatch, not passing request for key %s to local client\n", GNUNET_h2s (key)); -#endif GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Key match, type mismatches in REPLY to CLIENT"), @@ -859,11 +924,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) for (i = 0; i < record->seen_replies_count; i++) if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode))) { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Duplicate reply, not passing request for key %s to local client\n", GNUNET_h2s (key)); -#endif GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Duplicate REPLIES to CLIENT request dropped"), @@ -874,11 +937,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0, record->xquery, record->xquery_size, frc->data, frc->data_size); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Evaluation result is %d for key %s for local client's query\n", (int) eval, GNUNET_h2s (key)); -#endif switch (eval) { case GNUNET_BLOCK_EVALUATION_OK_LAST: @@ -929,11 +990,9 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) GNUNET_NO); reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; reply->unique_id = record->unique_id; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Queueing reply to query %s for client %p\n", GNUNET_h2s (key), record->client->client_handle); -#endif add_pending_message (record->client, pm); if (GNUNET_YES == do_free) remove_client_records (record->client, key, record); @@ -1027,33 +1086,101 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, /** - * Check if some client is monitoring messages of this type and notify - * him in that case. + * Check if some client is monitoring GET messages and notify + * them in that case. * - * @param mtype Type of the DHT message. - * @param exp When will this value expire. - * @param key Key of the result/request. - * @param putl number of entries in get_path. - * @param put_path peers on the PUT path (or NULL if not recorded). - * @param getl number of entries in get_path. - * @param get_path Peers on reply path (or NULL if not recorded). + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the GET path (or NULL if not recorded). * @param desired_replication_level Desired replication level. - * @param type Type of the result/request. + * @param key Key of the requested data. + */ +void +GDS_CLIENTS_process_get (uint32_t options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + const GNUNET_HashCode * key) +{ + struct ClientMonitorRecord *m; + struct ClientList **cl; + unsigned int cl_size; + + cl = NULL; + cl_size = 0; + for (m = monitor_head; NULL != m; m = m->next) + { + if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && + (NULL == m->key || + memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) + { + struct PendingMessage *pm; + struct GNUNET_DHT_MonitorGetMessage *mmsg; + struct GNUNET_PeerIdentity *msg_path; + size_t msize; + unsigned int i; + + /* Don't send duplicates */ + for (i = 0; i < cl_size; i++) + if (cl[i] == m->client) + break; + if (i < cl_size) + continue; + GNUNET_array_append (cl, cl_size, m->client); + + msize = path_length * sizeof (struct GNUNET_PeerIdentity); + msize += sizeof (struct GNUNET_DHT_MonitorGetMessage); + msize += sizeof (struct PendingMessage); + pm = (struct PendingMessage *) GNUNET_malloc (msize); + mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1]; + pm->msg = &mmsg->header; + mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); + mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); + mmsg->options = htonl(options); + mmsg->type = htonl(type); + mmsg->hop_count = htonl(hop_count); + mmsg->desired_replication_level = htonl(desired_replication_level); + mmsg->get_path_length = htonl(path_length); + memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); + msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + if (path_length > 0) + memcpy (msg_path, path, + path_length * sizeof (struct GNUNET_PeerIdentity)); + add_pending_message (m->client, pm); + } + } + GNUNET_free_non_null (cl); +} + + +/** + * Check if some client is monitoring GET RESP messages and notify + * them in that case. + * + * @param type The type of data in the result. + * @param get_path Peers on GET path (or NULL if not recorded). + * @param get_path_length number of entries in get_path. + * @param put_path peers on the PUT path (or NULL if not recorded). + * @param put_path_length number of entries in get_path. + * @param exp Expiration time of the data. + * @param key Key of the data. * @param data Pointer to the result data. * @param size Number of bytes in data. */ void -GDS_CLIENTS_process_monitor (uint16_t mtype, - const struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode *key, - uint32_t putl, - const struct GNUNET_PeerIdentity *put_path, - uint32_t getl, - const struct GNUNET_PeerIdentity *get_path, - uint32_t desired_replication_level, - enum GNUNET_BLOCK_Type type, - const struct GNUNET_MessageHeader *data, - uint16_t size) +GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, + const struct GNUNET_PeerIdentity *get_path, + unsigned int get_path_length, + const struct GNUNET_PeerIdentity *put_path, + unsigned int put_path_length, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const void *data, + size_t size) { struct ClientMonitorRecord *m; struct ClientList **cl; @@ -1068,7 +1195,7 @@ GDS_CLIENTS_process_monitor (uint16_t mtype, memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) { struct PendingMessage *pm; - struct GNUNET_DHT_MonitorMessage *mmsg; + struct GNUNET_DHT_MonitorGetRespMessage *mmsg; struct GNUNET_PeerIdentity *path; size_t msize; unsigned int i; @@ -1082,29 +1209,116 @@ GDS_CLIENTS_process_monitor (uint16_t mtype, GNUNET_array_append (cl, cl_size, m->client); msize = size; - msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity); - msize += sizeof (struct GNUNET_DHT_MonitorMessage); + msize += (get_path_length + put_path_length) + * sizeof (struct GNUNET_PeerIdentity); + msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage); msize += sizeof (struct PendingMessage); pm = (struct PendingMessage *) GNUNET_malloc (msize); - mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1]; + mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1]; pm->msg = (struct GNUNET_MessageHeader *) mmsg; mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); - mmsg->header.type = htons (mtype); - mmsg->expiration = GNUNET_TIME_absolute_hton(exp); - memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); - mmsg->put_path_length = htonl(putl); - mmsg->get_path_length = htonl(getl); - mmsg->desired_replication_level = htonl (desired_replication_level); + mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); + mmsg->type = htonl(type); + mmsg->put_path_length = htonl(put_path_length); + mmsg->get_path_length = htonl(get_path_length); path = (struct GNUNET_PeerIdentity *) &mmsg[1]; - if (putl > 0) + if (put_path_length > 0) + { + memcpy (path, put_path, + put_path_length * sizeof (struct GNUNET_PeerIdentity)); + path = &path[put_path_length]; + } + if (get_path_length > 0) + memcpy (path, get_path, + get_path_length * sizeof (struct GNUNET_PeerIdentity)); + mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); + memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); + if (size > 0) + memcpy (&path[get_path_length], data, size); + add_pending_message (m->client, pm); + } + } + GNUNET_free_non_null (cl); +} + + +/** + * Check if some client is monitoring PUT messages and notify + * them in that case. + * + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the PUT path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param exp Expiration time of the data. + * @param key Key under which data is to be stored. + * @param data Pointer to the data carried. + * @param size Number of bytes in data. + */ +void +GDS_CLIENTS_process_put (uint32_t options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const void *data, + size_t size) +{ + struct ClientMonitorRecord *m; + struct ClientList **cl; + unsigned int cl_size; + + cl = NULL; + cl_size = 0; + for (m = monitor_head; NULL != m; m = m->next) + { + if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && + (NULL == m->key || + memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) + { + struct PendingMessage *pm; + struct GNUNET_DHT_MonitorPutMessage *mmsg; + struct GNUNET_PeerIdentity *msg_path; + size_t msize; + unsigned int i; + + /* Don't send duplicates */ + for (i = 0; i < cl_size; i++) + if (cl[i] == m->client) + break; + if (i < cl_size) + continue; + GNUNET_array_append (cl, cl_size, m->client); + + msize = size; + msize += path_length * sizeof (struct GNUNET_PeerIdentity); + msize += sizeof (struct GNUNET_DHT_MonitorPutMessage); + msize += sizeof (struct PendingMessage); + pm = (struct PendingMessage *) GNUNET_malloc (msize); + mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1]; + pm->msg = (struct GNUNET_MessageHeader *) mmsg; + mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); + mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); + mmsg->options = htonl(options); + mmsg->type = htonl(type); + mmsg->hop_count = htonl(hop_count); + mmsg->desired_replication_level = htonl(desired_replication_level); + mmsg->put_path_length = htonl(path_length); + msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; + if (path_length > 0) { - memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity)); - path = &path[putl]; + memcpy (msg_path, path, + path_length * sizeof (struct GNUNET_PeerIdentity)); } - if (getl > 0) - memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity)); + mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); + memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); if (size > 0) - memcpy (&path[getl], data, size); + memcpy (&msg_path[path_length], data, size); add_pending_message (m->client, pm); } } @@ -1129,8 +1343,11 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server) GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, {&handle_dht_local_monitor, NULL, - GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, - sizeof (struct GNUNET_DHT_MonitorMessage)}, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, + sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, + {&handle_dht_local_monitor_stop, NULL, + GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, + sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, {NULL, NULL, 0, 0} }; forward_map = GNUNET_CONTAINER_multihashmap_create (1024); @@ -1153,12 +1370,18 @@ GDS_CLIENTS_done () GNUNET_SCHEDULER_cancel (retry_task); retry_task = GNUNET_SCHEDULER_NO_TASK; } - GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); - GNUNET_CONTAINER_heap_destroy (retry_heap); - retry_heap = NULL; - GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map)); - GNUNET_CONTAINER_multihashmap_destroy (forward_map); - forward_map = NULL; + if (NULL != retry_heap) + { + GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); + GNUNET_CONTAINER_heap_destroy (retry_heap); + retry_heap = NULL; + } + if (NULL != forward_map) + { + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map)); + GNUNET_CONTAINER_multihashmap_destroy (forward_map); + forward_map = NULL; + } } /* end of gnunet-service-dht_clients.c */ diff --git a/src/dht/gnunet-service-dht_clients.h b/src/dht/gnunet-service-dht_clients.h index a477456..9f3d2dd 100644 --- a/src/dht/gnunet-service-dht_clients.h +++ b/src/dht/gnunet-service-dht_clients.h @@ -57,33 +57,77 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, /** - * Check if some client is monitoring messages of this type and notify - * him in that case. + * Check if some client is monitoring GET messages and notify + * them in that case. * - * @param mtype Type of the DHT message. - * @param exp When will this value expire. - * @param key Key of the result/request. - * @param putl number of entries in get_path. - * @param put_path peers on the PUT path (or NULL if not recorded). - * @param getl number of entries in get_path. - * @param get_path Peers on reply path (or NULL if not recorded). + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the GET path (or NULL if not recorded). * @param desired_replication_level Desired replication level. - * @param type Type of the result/request. + * @param key Key of the requested data. + */ +void +GDS_CLIENTS_process_get (uint32_t options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + const GNUNET_HashCode * key); + +/** + * Check if some client is monitoring GET RESP messages and notify + * them in that case. + * + * @param type The type of data in the result. + * @param get_path Peers on GET path (or NULL if not recorded). + * @param get_path_length number of entries in get_path. + * @param put_path peers on the PUT path (or NULL if not recorded). + * @param put_path_length number of entries in get_path. + * @param exp Expiration time of the data. + * @param key Key of the data. * @param data Pointer to the result data. * @param size Number of bytes in data. */ void -GDS_CLIENTS_process_monitor (uint16_t mtype, - const struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode *key, - uint32_t putl, - const struct GNUNET_PeerIdentity *put_path, - uint32_t getl, - const struct GNUNET_PeerIdentity *get_path, - uint32_t desired_replication_level, - enum GNUNET_BLOCK_Type type, - const struct GNUNET_MessageHeader *data, - uint16_t size); +GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, + const struct GNUNET_PeerIdentity *get_path, + unsigned int get_path_length, + const struct GNUNET_PeerIdentity *put_path, + unsigned int put_path_length, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const void *data, + size_t size); + +/** + * Check if some client is monitoring PUT messages and notify + * them in that case. + * + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the PUT path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param exp Expiration time of the data. + * @param key Key under which data is to be stored. + * @param data Pointer to the data carried. + * @param size Number of bytes in data. + */ +void +GDS_CLIENTS_process_put (uint32_t options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const void *data, + size_t size); /** * Initialize client subsystem. diff --git a/src/dht/gnunet-service-dht_datacache.c b/src/dht/gnunet-service-dht_datacache.c index 82cd067..4d1dd6f 100644 --- a/src/dht/gnunet-service-dht_datacache.c +++ b/src/dht/gnunet-service-dht_datacache.c @@ -193,11 +193,9 @@ datacache_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp, GNUNET_BLOCK_evaluate (GDS_block_context, type, key, ctx->reply_bf, ctx->reply_bf_mutator, ctx->xquery, ctx->xquery_size, rdata, rdata_size); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found reply for query %s in datacache, evaluation result is %d\n", GNUNET_h2s (key), (int) eval); -#endif ctx->eval = eval; switch (eval) { diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c index 4ea5dd6..083b499 100644 --- a/src/dht/gnunet-service-dht_neighbours.c +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -524,11 +524,9 @@ add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value) GNUNET_HashCode mh; GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n", GNUNET_h2s (key), ctx->bf_mutator); -#endif GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh); return GNUNET_YES; } @@ -615,10 +613,8 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer, /* Check for connect to self message */ if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) return; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected %s to %s\n", GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey)); -#endif if (GNUNET_YES == GNUNET_CONTAINER_multihashmap_contains (all_known_peers, &peer->hashPubKey)) @@ -626,7 +622,7 @@ handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer, GNUNET_break (0); return; } - GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), 1, + GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), 1, GNUNET_NO); peer_bucket = find_bucket (&peer->hashPubKey); GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS)); @@ -675,10 +671,8 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) /* Check for disconnect from self message */ if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) return; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s from %s\n", GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey)); -#endif to_remove = GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey); if (NULL == to_remove) @@ -686,7 +680,7 @@ handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) GNUNET_break (0); return; } - GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), -1, + GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# peers connected"), -1, GNUNET_NO); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (all_known_peers, @@ -1030,11 +1024,9 @@ select_peer (const GNUNET_HashCode * key, } else { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Excluded peer `%s' due to BF match in greedy routing for %s\n", GNUNET_i2s (&pos->id), GNUNET_h2s (key)); -#endif GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers excluded from routing due to Bloomfilter"), @@ -1067,11 +1059,9 @@ select_peer (const GNUNET_HashCode * key, gettext_noop ("# Peers excluded from routing due to Bloomfilter"), 1, GNUNET_NO); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Excluded peer `%s' due to BF match in random routing for %s\n", GNUNET_i2s (&pos->id), GNUNET_h2s (key)); -#endif pos = pos->next; continue; /* Ignore bloomfiltered peers */ } @@ -1154,12 +1144,10 @@ get_target_peers (const GNUNET_HashCode * key, &nxt->id.hashPubKey)); GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey); } -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Selected %u/%u peers at hop %u for %s (target was %u)\n", off, GNUNET_CONTAINER_multihashmap_size (all_known_peers), (unsigned int) hop_count, GNUNET_h2s (key), ret); -#endif if (0 == off) { GNUNET_free (rtargets); @@ -1212,11 +1200,9 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, struct GNUNET_PeerIdentity *pp; GNUNET_assert (NULL != bf); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding myself (%s) to PUT bloomfilter for %s\n", GNUNET_i2s (&my_identity), GNUNET_h2s (key)); -#endif GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"), 1, GNUNET_NO); @@ -1225,12 +1211,10 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, &targets); if (0 == target_count) { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing PUT for %s terminates after %u hops at %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (&my_identity)); -#endif return; } msize = @@ -1254,11 +1238,9 @@ GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, for (i = 0; i < target_count; i++) { target = targets[i]; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (&target->id)); -#endif pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); pending->importance = 0; /* FIXME */ pending->timeout = expiration_time; @@ -1335,20 +1317,16 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, target_count = get_target_peers (key, peer_bf, hop_count, desired_replication_level, &targets); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding myself (%s) to GET bloomfilter for %s\n", GNUNET_i2s (&my_identity), GNUNET_h2s (key)); -#endif GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey); if (0 == target_count) { -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing GET for %s terminates after %u hops at %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (&my_identity)); -#endif return; } reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf); @@ -1367,11 +1345,9 @@ GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, for (i = 0; i < target_count; i++) { target = targets[i]; -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (&target->id)); -#endif pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); pending->importance = 0; /* FIXME */ pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT); @@ -1578,10 +1554,8 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, /* cannot verify, good luck */ break; } -#if DEBUG_DHT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for %s at %s\n", - GNUNET_h2s (&put->key), GNUNET_i2s (&my_identity)); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n", + GNUNET_h2s (&put->key), GNUNET_i2s (peer)); bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K); GNUNET_break_op (GNUNET_YES == @@ -1617,10 +1591,15 @@ handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, pp, payload, payload_size); } GNUNET_CONTAINER_bloomfilter_free (bf); - GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT, - GNUNET_TIME_absolute_ntoh (put->expiration_time), &put->key, - putlen, put_path, 0, NULL, ntohl(put->desired_replication_level), - ntohl (put->type), payload, payload_size); + GDS_CLIENTS_process_put (options, + ntohl (put->type), + ntohl (put->hop_count), + ntohl (put->desired_replication_level), + putlen, put_path, + GNUNET_TIME_absolute_ntoh (put->expiration_time), + &put->key, + payload, + payload_size); return GNUNET_YES; } @@ -1795,11 +1774,9 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, /* remember request for routing replies */ GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size, reply_bf, get->bf_mutator); -#if DEBUG_DHT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n", GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity), (unsigned int) ntohl (get->hop_count)); -#endif /* local lookup (this may update the reply_bf) */ if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || (am_closest_peer (&get->key, peer_bf))) @@ -1826,9 +1803,13 @@ handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, 1, GNUNET_NO); } - GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, - GNUNET_TIME_UNIT_FOREVER_ABS, &get->key, 0, NULL, 0, NULL, - ntohl (get->desired_replication_level), type, NULL, 0); + /* FIXME Path */ + GDS_CLIENTS_process_get (options, + type, + ntohl(get->hop_count), + ntohl(get->desired_replication_level), + 0, NULL, + &get->key); /* P2P forwarding */ if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) @@ -1962,10 +1943,16 @@ handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, xget_path, data, data_size); } - GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP, - GNUNET_TIME_absolute_ntoh (prm->expiration_time), &prm->key, - put_path_length, put_path, get_path_length, get_path, - 0, type, data, data_size); + GDS_CLIENTS_process_get_resp (type, + get_path, + get_path_length, + put_path, + put_path_length, + GNUNET_TIME_absolute_ntoh ( + prm->expiration_time), + &prm->key, + data, + data_size); return GNUNET_YES; } @@ -2025,5 +2012,16 @@ GDS_NEIGHBOURS_done () } } +/** + * Get the ID of the local node. + * + * @return identity of the local node + */ +struct GNUNET_PeerIdentity * +GDS_NEIGHBOURS_get_id () +{ + return &my_identity; +} + /* end of gnunet-service-dht_neighbours.c */ diff --git a/src/dht/gnunet-service-dht_neighbours.h b/src/dht/gnunet-service-dht_neighbours.h index b6e0f0e..3297638 100644 --- a/src/dht/gnunet-service-dht_neighbours.h +++ b/src/dht/gnunet-service-dht_neighbours.h @@ -135,4 +135,13 @@ void GDS_NEIGHBOURS_done (void); +/** + * Get the ID of the local node. + * + * @return identity of the local node + */ +struct GNUNET_PeerIdentity * +GDS_NEIGHBOURS_get_id (); + + #endif diff --git a/src/dht/gnunet-service-dht_routing.c b/src/dht/gnunet-service-dht_routing.c index a880bf7..013d856 100644 --- a/src/dht/gnunet-service-dht_routing.c +++ b/src/dht/gnunet-service-dht_routing.c @@ -227,6 +227,8 @@ process (void *cls, const GNUNET_HashCode * key, void *value) 1, GNUNET_NO); return GNUNET_SYSERR; case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: + GNUNET_break (0); + return GNUNET_OK; case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: GNUNET_break (0); return GNUNET_OK; @@ -280,11 +282,48 @@ GDS_ROUTING_process (enum GNUNET_BLOCK_Type type, pc.get_path = get_path; pc.data = data; pc.data_size = data_size; + if (NULL == data) + { + /* Some apps might have an 'empty' reply as a valid reply; however, + 'process' will call GNUNET_BLOCK_evaluate' which treats a 'NULL' + reply as request-validation (but we need response-validation). + So we set 'data' to a 0-byte non-NULL value just to be sure */ + GNUNET_break (0 == data_size); + data_size = 0; + pc.data = ""; /* something not null */ + } GNUNET_CONTAINER_multihashmap_get_multiple (recent_map, key, &process, &pc); } /** + * Remove the oldest entry from the DHT routing table. Must only + * be called if it is known that there is at least one entry + * in the heap and hashmap. + */ +static void +expire_oldest_entry () +{ + struct RecentRequest *recent_req; + + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# Entries removed from routing table"), 1, + GNUNET_NO); + recent_req = GNUNET_CONTAINER_heap_peek (recent_heap); + GNUNET_assert (recent_req != NULL); + GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node); + GNUNET_CONTAINER_bloomfilter_free (recent_req->reply_bf); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (recent_map, + &recent_req->key, + recent_req)); + GNUNET_free (recent_req); +} + + + +/** * Add a new entry to our routing table. * * @param sender peer that originated the request @@ -308,18 +347,7 @@ GDS_ROUTING_add (const struct GNUNET_PeerIdentity *sender, struct RecentRequest *recent_req; while (GNUNET_CONTAINER_heap_get_size (recent_heap) >= DHT_MAX_RECENT) - { - GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# Entries removed from routing table"), 1, - GNUNET_NO); - recent_req = GNUNET_CONTAINER_heap_peek (recent_heap); - GNUNET_assert (recent_req != NULL); - GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node); - GNUNET_CONTAINER_bloomfilter_free (recent_req->reply_bf); - GNUNET_free (recent_req); - } - + expire_oldest_entry (); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Entries added to routing table"), 1, GNUNET_NO); @@ -359,23 +387,12 @@ GDS_ROUTING_init () void GDS_ROUTING_done () { - struct RecentRequest *recent_req; - while (GNUNET_CONTAINER_heap_get_size (recent_heap) > 0) - { - GNUNET_STATISTICS_update (GDS_stats, - gettext_noop - ("# Entries removed from routing table"), 1, - GNUNET_NO); - recent_req = GNUNET_CONTAINER_heap_peek (recent_heap); - GNUNET_assert (recent_req != NULL); - GNUNET_CONTAINER_heap_remove_node (recent_req->heap_node); - GNUNET_CONTAINER_bloomfilter_free (recent_req->reply_bf); - GNUNET_free (recent_req); - } + expire_oldest_entry (); GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (recent_heap)); GNUNET_CONTAINER_heap_destroy (recent_heap); recent_heap = NULL; + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (recent_map)); GNUNET_CONTAINER_multihashmap_destroy (recent_map); recent_map = NULL; } diff --git a/src/dht/plugin_block_dht.c b/src/dht/plugin_block_dht.c index 19467b9..3c016ae 100644 --- a/src/dht/plugin_block_dht.c +++ b/src/dht/plugin_block_dht.c @@ -65,17 +65,29 @@ block_plugin_dht_evaluate (void *cls, enum GNUNET_BLOCK_Type type, if (type != GNUNET_BLOCK_TYPE_DHT_HELLO) return GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED; if (xquery_size != 0) + { + GNUNET_break_op (0); return GNUNET_BLOCK_EVALUATION_REQUEST_INVALID; - if (reply_block_size == 0) + } + if (NULL == reply_block) return GNUNET_BLOCK_EVALUATION_REQUEST_VALID; if (reply_block_size < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); return GNUNET_BLOCK_EVALUATION_RESULT_INVALID; + } msg = reply_block; if (reply_block_size != ntohs (msg->size)) + { + GNUNET_break_op (0); return GNUNET_BLOCK_EVALUATION_RESULT_INVALID; + } hello = reply_block; if (GNUNET_OK != GNUNET_HELLO_get_id (hello, &pid)) + { + GNUNET_break_op (0); return GNUNET_BLOCK_EVALUATION_RESULT_INVALID; + } if (NULL != bf) { GNUNET_BLOCK_mingle_hash (&pid.hashPubKey, bf_mutator, &mhash); diff --git a/src/dht/test_dht_2dtorus.conf b/src/dht/test_dht_2dtorus.conf index d420b29..d7a3d8a 100644 --- a/src/dht/test_dht_2dtorus.conf +++ b/src/dht/test_dht_2dtorus.conf @@ -65,7 +65,7 @@ CONNECT_TIMEOUT = 60 s CONNECT_ATTEMPTS = 3 DEBUG = YES HOSTKEYSFILE = ../../contrib/testing_hostkeys.dat -MAX_CONCURRENT_SSH = 10 +MAX_CONCURRENT_SSH = 20 USE_PROGRESSBARS = YES PEERGROUP_TIMEOUT = 2400 s TOPOLOGY_OUTPUT_FILE = 2dtorus_topo_initial @@ -77,7 +77,7 @@ MAX_OUTSTANDING_CONNECTIONS = 75 DELETE_FILES = YES [test_dht_topo] -CONNECTION_LIMIT = 16 +CONNECTION_LIMIT = 20 #DATA_OUTPUT_FILE=data_output diff --git a/src/dht/test_dht_api.c b/src/dht/test_dht_api.c index 182856a..000ad33 100644 --- a/src/dht/test_dht_api.c +++ b/src/dht/test_dht_api.c @@ -121,7 +121,7 @@ stop_arm (struct PeerContext *p) if (0 != GNUNET_OS_process_kill (p->arm_proc, SIGTERM)) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill"); GNUNET_OS_process_wait (p->arm_proc); - GNUNET_OS_process_close (p->arm_proc); + GNUNET_OS_process_destroy (p->arm_proc); p->arm_proc = NULL; #endif GNUNET_CONFIGURATION_destroy (p->cfg); @@ -195,10 +195,10 @@ test_get_iterator (void *cls, struct GNUNET_TIME_Absolute exp, * Signature of the main function of a task. * * @param cls closure - * @param tc context information (why was this task triggered now) + * @param success result of PUT */ static void -test_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +test_get (void *cls, int success) { struct PeerContext *peer = cls; GNUNET_HashCode hash; @@ -212,7 +212,7 @@ test_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) retry_context.next_timeout = BASE_TIMEOUT; peer->get_handle = - GNUNET_DHT_get_start (peer->dht_handle, TOTAL_TIMEOUT, + GNUNET_DHT_get_start (peer->dht_handle, GNUNET_BLOCK_TYPE_TEST, &hash, 1, GNUNET_DHT_RO_NONE, NULL, 0, &test_get_iterator, NULL); diff --git a/src/dht/test_dht_api_data.conf b/src/dht/test_dht_api_data.conf index 0e34eb7..032416c 100644 --- a/src/dht/test_dht_api_data.conf +++ b/src/dht/test_dht_api_data.conf @@ -77,7 +77,8 @@ EXTERNAL_ADDRESS = 127.0.0.1 [dns] AUTOSTART = NO - +[namestore] +AUTOSTART = NO [nse] AUTOSTART = NO diff --git a/src/dht/test_dht_api_peer1.conf b/src/dht/test_dht_api_peer1.conf index cacc4da..d9db7c4 100644 --- a/src/dht/test_dht_api_peer1.conf +++ b/src/dht/test_dht_api_peer1.conf @@ -10,7 +10,7 @@ AUTOSTART = YES ACCEPT_FROM6 = ::1; ACCEPT_FROM = 127.0.0.1; HOSTNAME = localhost -PORT = 2100 +PORT = 12100 BINARY = gnunet-service-dht [block] diff --git a/src/dht/test_dht_line.conf b/src/dht/test_dht_line.conf index 8bcb12c..6be3559 100644 --- a/src/dht/test_dht_line.conf +++ b/src/dht/test_dht_line.conf @@ -80,6 +80,8 @@ DELETE_FILES = YES CONNECTION_LIMIT = 5 #DATA_OUTPUT_FILE=data_output +[namestore] +AUTOSTART = NO [nse] WORKDELAY = 500 ms diff --git a/src/dht/test_dht_monitor.c b/src/dht/test_dht_monitor.c index 63af7e9..ca6704a 100644 --- a/src/dht/test_dht_monitor.c +++ b/src/dht/test_dht_monitor.c @@ -31,8 +31,6 @@ #include "gnunet_testing_lib.h" #include "gnunet_dht_service.h" -#define VERBOSE GNUNET_YES - #define REMOVE_DIR GNUNET_YES @@ -137,17 +135,14 @@ shutdown_callback (void *cls, const char *emsg) { if (emsg != NULL) { -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Shutdown of peers failed!\n"); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "test: Shutdown of peers failed: %s\n", + emsg); ok++; } else { -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: All peers successfully shut down!\n"); -#endif } GNUNET_CONFIGURATION_destroy (testing_cfg); } @@ -156,16 +151,12 @@ shutdown_callback (void *cls, const char *emsg) static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Ending test.\n"); -#endif - if (disconnect_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (disconnect_task); disconnect_task = GNUNET_SCHEDULER_NO_TASK; } - if (data_file != NULL) GNUNET_DISK_file_close (data_file); GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL); @@ -184,6 +175,7 @@ disconnect_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_DHT_get_stop (get_h_far); for (i = 0; i < num_peers; i++) { + GNUNET_DHT_monitor_stop(mhs[i]); GNUNET_DHT_disconnect (hs[i]); } GNUNET_SCHEDULER_cancel (shutdown_handle); @@ -258,7 +250,7 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_h2s_full (&d_far->id.hashPubKey)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: from %s\n", GNUNET_h2s_full (&o->id.hashPubKey)); - get_h_far = GNUNET_DHT_get_start (hs[0], GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ + get_h_far = GNUNET_DHT_get_start (hs[0], GNUNET_BLOCK_TYPE_TEST, /* type */ &d_far->id.hashPubKey, /*key to search */ 4U, /* replication level */ @@ -302,65 +294,123 @@ put_id (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } /** - * Callback called on each request going through the DHT. + * Callback called on each GET request going through the DHT. + * Prints the info about the intercepted packet and increments a counter. + * + * @param cls Closure. + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the GET path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param key Key of the requested data. + */ +void +monitor_get_cb (void *cls, + enum GNUNET_DHT_RouteOption options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + const GNUNET_HashCode * key) +{ + const char *s_key; + unsigned int i; + + i = (unsigned int) (long) cls; + s_key = GNUNET_h2s(key); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%u got a GET message for key %s\n", + i, s_key); + + if (strncmp (s_key, id_far, 4) == 0 && in_test == GNUNET_YES) + monitor_counter++; +} + + +/** + * Callback called on each PUT request going through the DHT. + * Prints the info about the intercepted packet and increments a counter. + * + * @param cls Closure. + * @param options Options, for instance RecordRoute, DemultiplexEverywhere. + * @param type The type of data in the request. + * @param hop_count Hop count so far. + * @param path_length number of entries in path (or 0 if not recorded). + * @param path peers on the PUT path (or NULL if not recorded). + * @param desired_replication_level Desired replication level. + * @param exp Expiration time of the data. + * @param key Key under which data is to be stored. + * @param data Pointer to the data carried. + * @param size Number of bytes in data. + */ +void +monitor_put_cb (void *cls, + enum GNUNET_DHT_RouteOption options, + enum GNUNET_BLOCK_Type type, + uint32_t hop_count, + uint32_t desired_replication_level, + unsigned int path_length, + const struct GNUNET_PeerIdentity *path, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, + const void *data, + size_t size) +{ + const char *s_key; + unsigned int i; + + i = (unsigned int) (long) cls; + s_key = GNUNET_h2s(key); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "%u got a PUT message for key %s with %u bytes\n", + i, s_key, size); + + if (strncmp (s_key, id_far, 4) == 0 && in_test == GNUNET_YES) + monitor_counter++; +} + + +/** + * Callback called on each GET reply going through the DHT. * Prints the info about the intercepted packet and increments a counter. * - * @param cls Closure (long) # of daemon that got the monitor event. - * @param mtype Type of the DHT message monitored. - * @param exp When will this value expire. - * @param key Key of the result/request. - * @param get_path Peers on reply path (or NULL if not recorded). + * @param cls Closure. + * @param type The type of data in the result. + * @param get_path Peers on GET path (or NULL if not recorded). * @param get_path_length number of entries in get_path. * @param put_path peers on the PUT path (or NULL if not recorded). * @param put_path_length number of entries in get_path. - * @param desired_replication_level Desired replication level. - * @param type Type of the result/request. + * @param exp Expiration time of the data. + * @param key Key of the data. * @param data Pointer to the result data. * @param size Number of bytes in data. */ void -monitor_dht_cb (void *cls, - uint16_t mtype, - struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, - const struct GNUNET_PeerIdentity * get_path, +monitor_res_cb (void *cls, + enum GNUNET_BLOCK_Type type, + const struct GNUNET_PeerIdentity *get_path, unsigned int get_path_length, - const struct GNUNET_PeerIdentity * put_path, + const struct GNUNET_PeerIdentity *put_path, unsigned int put_path_length, - uint32_t desired_replication_level, - enum GNUNET_DHT_RouteOption options, - enum GNUNET_BLOCK_Type type, + struct GNUNET_TIME_Absolute exp, + const GNUNET_HashCode * key, const void *data, size_t size) { const char *s_key; - const char *mtype_s; unsigned int i; i = (unsigned int) (long) cls; s_key = GNUNET_h2s(key); - switch (mtype) - { - case 149: - mtype_s = "GET "; - break; - case 150: - mtype_s = "RESULT"; - break; - case 151: - mtype_s = "PUT "; - break; - default: - GNUNET_break (0); - mtype_s = "UNKNOWN!!!"; - } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "%u got a message of type %s for key %s\n", - i, mtype_s, s_key); + "%u got a REPLY message for key %s with %u bytes\n", + i, s_key, size); - if ((mtype == GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || - mtype == GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) && - strncmp (s_key, id_far, 4) == 0 && in_test == GNUNET_YES) + if (strncmp (s_key, id_far, 4) == 0 && in_test == GNUNET_YES) monitor_counter++; } @@ -389,15 +439,9 @@ peergroup_ready (void *cls, const char *emsg) GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL); return; } -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "************************************************************\n"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "test: Peer Group started successfully!\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Have %u connections\n", + "test: Peer Group started successfully with %u connections\n", total_connections); -#endif - if (data_file != NULL) { buf = NULL; @@ -419,8 +463,13 @@ peergroup_ready (void *cls, const char *emsg) { d = GNUNET_TESTING_daemon_get (pg, i); hs[i] = GNUNET_DHT_connect (d->cfg, 32); - mhs[i] = GNUNET_DHT_monitor_start(hs[i], GNUNET_BLOCK_TYPE_ANY, NULL, - &monitor_dht_cb, (void *)(long)i); + mhs[i] = GNUNET_DHT_monitor_start(hs[i], + GNUNET_BLOCK_TYPE_ANY, + NULL, + &monitor_get_cb, + &monitor_res_cb, + &monitor_put_cb, + (void *)(long)i); } if ((NULL == o) || (NULL == d_far)) @@ -500,19 +549,12 @@ run (void *cls, char *const *args, const char *cfgfile, testing_cfg = GNUNET_CONFIGURATION_dup (cfg); GNUNET_log_setup ("test_dht_monitor", -#if VERBOSE - "DEBUG", -#else "WARNING", -#endif NULL); -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test: Starting daemons.\n"); GNUNET_CONFIGURATION_set_value_string (testing_cfg, "testing", "use_progressbars", "YES"); -#endif - if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (testing_cfg, "testing", "num_peers", &num_peers)) @@ -600,9 +642,6 @@ main (int xargc, char *xargv[]) char *const argv[] = { "test-dht-monitor", "-c", "test_dht_line.conf", -#if VERBOSE - "-L", "DEBUG", -#endif NULL }; diff --git a/src/dht/test_dht_multipeer.c b/src/dht/test_dht_multipeer.c index 94e39d2..ab7d90e 100644 --- a/src/dht/test_dht_multipeer.c +++ b/src/dht/test_dht_multipeer.c @@ -27,9 +27,6 @@ #include "gnunet_core_service.h" #include "gnunet_dht_service.h" -/* DEFINES */ -#define VERBOSE GNUNET_NO - /* Timeout for entire testcase */ #define TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 30) @@ -279,7 +276,7 @@ static struct StatValues stats[] = { {"core", "# bytes encrypted", 0}, {"core", "# type maps received", 0}, {"core", "# session keys confirmed via PONG", 0}, - {"core", "# entries in session map", 0}, + {"core", "# peers connected", 0}, {"core", "# key exchanges initiated", 0}, {"core", "# send requests dropped (disconnected)", 0}, {"core", "# transmissions delayed due to corking", 0}, @@ -624,7 +621,7 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_assert (test_get->dht_handle != NULL); outstanding_gets++; test_get->get_handle = - GNUNET_DHT_get_start (test_get->dht_handle, GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_DHT_get_start (test_get->dht_handle, GNUNET_BLOCK_TYPE_TEST, &key, 1, route_option, NULL, 0, &get_result_iterator, test_get); test_get->task = @@ -658,10 +655,9 @@ start_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) unsigned long long j; struct TestGetContext *test_get; -#if VERBOSE - FPRINTF (stderr, "Issuing %llu GETs\n", - (unsigned long long) (num_peers * num_peers)); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Issuing %llu GETs\n", + (unsigned long long) (num_peers * num_peers)); for (i = 0; i < num_peers; i++) for (j = 0; j < num_peers; j++) { @@ -678,7 +674,7 @@ start_gets (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * Called when the PUT request has been transmitted to the DHT service. */ static void -put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +put_finished (void *cls, int success) { struct TestPutContext *test_put = cls; @@ -719,10 +715,9 @@ do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) test_put->dht_handle = GNUNET_DHT_connect (test_put->daemon->cfg, 10); GNUNET_assert (test_put->dht_handle != NULL); outstanding_puts++; -#if VERBOSE > 2 - FPRINTF (stderr, "PUT %u at `%s'\n", test_put->uid, - GNUNET_i2s (&test_put->daemon->id)); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "PUT %u at `%s'\n", test_put->uid, + GNUNET_i2s (&test_put->daemon->id)); GNUNET_DHT_put (test_put->dht_handle, &key, 1, route_option, GNUNET_BLOCK_TYPE_TEST, sizeof (data), data, GNUNET_TIME_UNIT_FOREVER_ABS, GNUNET_TIME_UNIT_FOREVER_REL, @@ -822,9 +817,6 @@ check () char *const argv[] = { "test-dht-multipeer", /* Name to give running binary */ "-c", "test_dht_multipeer_data.conf", /* Config file to use */ -#if VERBOSE - "-L", "DEBUG", -#endif NULL }; struct GNUNET_GETOPT_CommandLineOption options[] = { @@ -848,13 +840,8 @@ main (int argc, char *argv[]) { int ret; - GNUNET_log_setup ("test-dht-multipeer", -#if VERBOSE - "DEBUG", -#else "WARNING", -#endif NULL); ret = check (); /** diff --git a/src/dht/test_dht_multipeer_data.conf b/src/dht/test_dht_multipeer_data.conf index ff87698..f181594 100644 --- a/src/dht/test_dht_multipeer_data.conf +++ b/src/dht/test_dht_multipeer_data.conf @@ -16,7 +16,7 @@ ACCEPT_FROM = 127.0.0.1; CONFIG = $DEFAULTCONFIG HOME = $SERVICEHOME HOSTNAME = localhost -PORT = 2100 +PORT = 12100 STOP_FOUND = YES USE_MAX_HOPS = YES MAX_HOPS = 16 @@ -115,12 +115,13 @@ BEHIND_NAT = NO ALLOW_NAT = NO INTERNAL_ADDRESS = 127.0.0.1 EXTERNAL_ADDRESS = 127.0.0.1 -USE_LOCALADDR = NO +USE_LOCALADDR = YES [dns] AUTOSTART = NO - +[namestore] +AUTOSTART = NO [nse] AUTOSTART = NO diff --git a/src/dht/test_dht_topo.c b/src/dht/test_dht_topo.c index 81dc7cb..f51f3a6 100644 --- a/src/dht/test_dht_topo.c +++ b/src/dht/test_dht_topo.c @@ -30,8 +30,6 @@ #include "gnunet_testing_lib.h" #include "gnunet_dht_service.h" -#define VERBOSE GNUNET_NO - #define REMOVE_DIR GNUNET_YES /** @@ -55,11 +53,6 @@ static int ok; /** - * Be verbose - */ -static int verbose; - -/** * Total number of peers in the test. */ static unsigned long long num_peers; @@ -116,27 +109,26 @@ static GNUNET_SCHEDULER_TaskIdentifier shutdown_handle; static char *topology_file; -struct GNUNET_TESTING_Daemon *d1; +static struct GNUNET_DHT_Handle **hs; -struct GNUNET_TESTING_Daemon *d2; +static struct GNUNET_DHT_GetHandle *get_h; -struct GNUNET_DHT_Handle **hs; +static struct GNUNET_DHT_GetHandle *get_h_2; -struct GNUNET_DHT_GetHandle *get_h; +static struct GNUNET_DHT_GetHandle *get_h_far; -struct GNUNET_DHT_GetHandle *get_h_2; +static int found_1; -struct GNUNET_DHT_GetHandle *get_h_far; +static int found_2; -int found_1; -int found_2; -int found_far; +static int found_far; /** * Which topology are we to run */ static int test_topology; + /** * Check whether peers successfully shut down. */ @@ -145,17 +137,12 @@ shutdown_callback (void *cls, const char *emsg) { if (emsg != NULL) { -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutdown of peers failed!\n"); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Shutdown of peers failed!\n"); ok++; } else { -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "All peers successfully shut down!\n"); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "All peers successfully shut down!\n"); } GNUNET_CONFIGURATION_destroy (testing_cfg); } @@ -164,9 +151,7 @@ shutdown_callback (void *cls, const char *emsg) static void shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Ending test.\n"); -#endif + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Ending test.\n"); if (disconnect_task != GNUNET_SCHEDULER_NO_TASK) { @@ -185,7 +170,7 @@ disconnect_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { unsigned int i; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "disconnecting peers\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "disconnecting peers\n"); disconnect_task = GNUNET_SCHEDULER_NO_TASK; GNUNET_SCHEDULER_cancel (put_task); if (NULL != get_h) @@ -202,6 +187,7 @@ disconnect_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) shutdown_handle = GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); } + static void dht_get_id_handler (void *cls, struct GNUNET_TIME_Absolute exp, const GNUNET_HashCode * key, @@ -255,14 +241,15 @@ dht_get_id_handler (void *cls, struct GNUNET_TIME_Absolute exp, default: GNUNET_break(0); } - if (TORUS == test_topology && - (found_1 == 0 || found_2 == 0 || found_far == 0)) + if ( (TORUS == test_topology) && + ( (found_1 == 0) || (found_2 == 0) || (found_far == 0)) ) return; ok = 0; GNUNET_SCHEDULER_cancel (disconnect_task); disconnect_task = GNUNET_SCHEDULER_add_now (&disconnect_peers, NULL); } + static void do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { @@ -313,11 +300,11 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_assert (0); } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_task\ntest: from %s\n", + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "test_task\nfrom %s\n", GNUNET_h2s_full (&o->id.hashPubKey)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " looking for %s\n", GNUNET_h2s_full (&d->id.hashPubKey)); - get_h = GNUNET_DHT_get_start (hs[0], GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ + get_h = GNUNET_DHT_get_start (hs[0], GNUNET_BLOCK_TYPE_TEST, /* type */ &d->id.hashPubKey, /*key to search */ 4U, /* replication level */ @@ -328,7 +315,7 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " looking for %s\n", GNUNET_h2s_full (&d2->id.hashPubKey)); - get_h_2 = GNUNET_DHT_get_start (hs[0], GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ + get_h_2 = GNUNET_DHT_get_start (hs[0], GNUNET_BLOCK_TYPE_TEST, /* type */ &d2->id.hashPubKey, /*key to search */ 4U, /* replication level */ @@ -337,7 +324,7 @@ do_test (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) &dht_get_id_handler, (void *)2); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " looking for %s\n", GNUNET_h2s_full (&d_far->id.hashPubKey)); - get_h_far = GNUNET_DHT_get_start (hs[0], GNUNET_TIME_UNIT_FOREVER_REL, /* timeout */ + get_h_far = GNUNET_DHT_get_start (hs[0], GNUNET_BLOCK_TYPE_TEST, /* type */ &d_far->id.hashPubKey, /*key to search */ 4U, /* replication level */ @@ -402,21 +389,16 @@ peergroup_ready (void *cls, const char *emsg) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peergroup callback called with error, aborting test!\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Error from testing: `%s'\n", + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error from testing: `%s'\n", emsg); ok++; GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL); return; } -#if VERBOSE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "************************************************************\n"); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Peer Group started successfully!\n"); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Have %u connections\n", + "Peer Group started successfully with %u connections\n", total_connections); -#endif - if (data_file != NULL) { buf = NULL; @@ -465,7 +447,6 @@ connect_cb (void *cls, const struct GNUNET_PeerIdentity *first, struct GNUNET_TESTING_Daemon *first_daemon, struct GNUNET_TESTING_Daemon *second_daemon, const char *emsg) { - if (emsg == NULL) { total_connections++; @@ -474,10 +455,9 @@ connect_cb (void *cls, const struct GNUNET_PeerIdentity *first, } else { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Problem with new connection (%s)\n", emsg); } - } @@ -500,19 +480,11 @@ run (void *cls, char *const *args, const char *cfgfile, testing_cfg = GNUNET_CONFIGURATION_dup (cfg); GNUNET_log_setup ("test_dht_topo", -#if VERBOSE - "DEBUG", -#else "WARNING", -#endif NULL); - -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting daemons.\n"); GNUNET_CONFIGURATION_set_value_string (testing_cfg, "testing", "use_progressbars", "YES"); -#endif - if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_number (testing_cfg, "testing", "num_peers", &num_peers)) @@ -579,38 +551,23 @@ run (void *cls, char *const *args, const char *cfgfile, } - -/** - * test_dht_2d command line options - */ -static struct GNUNET_GETOPT_CommandLineOption options[] = { - {'V', "verbose", NULL, - gettext_noop ("be verbose (print progress information)"), - 0, &GNUNET_GETOPT_set_one, &verbose}, - GNUNET_GETOPT_OPTION_END -}; - - /** * Main: start test */ int main (int xargc, char *xargv[]) { - char *const argv_torus[] = { "test-dht-2dtorus", + static struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_OPTION_END + }; + static char *const argv_torus[] = { "test-dht-2dtorus", "-c", "test_dht_2dtorus.conf", -#if VERBOSE - "-L", "DEBUG", -#endif NULL }; - char *const argv_line[] = { "test-dht-line", + static char *const argv_line[] = { "test-dht-line", "-c", "test_dht_line.conf", -#if VERBOSE - "-L", "DEBUG", -#endif NULL }; char *const *argv; @@ -641,17 +598,17 @@ main (int xargc, char *xargv[]) #if REMOVE_DIR GNUNET_DISK_directory_remove ("/tmp/test_dht_topo"); #endif - if (found_1 == 0) + if (0 == found_1) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ID 1 not found!\n"); } if (TORUS == test_topology) { - if (found_2 == 0) + if (0 == found_2) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ID 2 not found!\n"); } - if (found_far == 0) + if (0 == found_far) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "ID far not found!\n"); } diff --git a/src/dht/test_dht_twopeer.c b/src/dht/test_dht_twopeer.c index a3b6e4a..f0ac05b 100644 --- a/src/dht/test_dht_twopeer.c +++ b/src/dht/test_dht_twopeer.c @@ -28,8 +28,6 @@ #include "gnunet_dht_service.h" /* DEFINES */ -#define VERBOSE GNUNET_NO - #define MAX_GET_ATTEMPTS 10 #define TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 5) @@ -250,8 +248,6 @@ get_stop_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) &stop_retry_get, get_context); get_context->get_handle = GNUNET_DHT_get_start (get_context->dht_handle, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), GNUNET_BLOCK_TYPE_DHT_HELLO, &get_context->peer->hashPubKey, 1, GNUNET_DHT_RO_NONE, NULL, 0, &get_result_iterator, @@ -285,8 +281,6 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) &stop_retry_get, get_context); get_context->get_handle = GNUNET_DHT_get_start (get_context->dht_handle, - GNUNET_TIME_relative_multiply - (GNUNET_TIME_UNIT_SECONDS, 5), GNUNET_BLOCK_TYPE_DHT_HELLO, &get_context->peer->hashPubKey, 1, GNUNET_DHT_RO_FIND_PEER, NULL, 0, @@ -306,7 +300,6 @@ topology_callback (void *cls, const struct GNUNET_PeerIdentity *first, if (emsg == NULL) { total_connections++; -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "connected peer %s to peer %s, distance %u\n", first_daemon->shortname, second_daemon->shortname, distance); @@ -314,19 +307,16 @@ topology_callback (void *cls, const struct GNUNET_PeerIdentity *first, else { failed_connections++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to connect peer %s to peer %s with error :\n%s\n", first_daemon->shortname, second_daemon->shortname, emsg); -#endif } if (total_connections == expected_connections) { -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Created %d total connections, which is our target number! Starting next phase of testing.\n", total_connections); -#endif GNUNET_SCHEDULER_cancel (die_task); die_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, @@ -409,10 +399,8 @@ peers_started_callback (void *cls, const struct GNUNET_PeerIdentity *id, if (peers_left == 0) { -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "All %d daemons started, now connecting peers!\n", num_peers); -#endif GNUNET_SCHEDULER_cancel (die_task); /* Set up task in case topology creation doesn't finish * within a reasonable amount of time */ @@ -466,9 +454,6 @@ check () char *const argv[] = { "test-dht-twopeer", "-c", "test_dht_twopeer_data.conf", -#if VERBOSE - "-L", "DEBUG", -#endif NULL }; struct GNUNET_GETOPT_CommandLineOption options[] = { @@ -491,11 +476,7 @@ main (int argc, char *argv[]) int ret; GNUNET_log_setup ("test-dht-twopeer", -#if VERBOSE - "DEBUG", -#else "WARNING", -#endif NULL); ret = check (); /** diff --git a/src/dht/test_dht_twopeer_data.conf b/src/dht/test_dht_twopeer_data.conf index ba1e22b..2889a41 100644 --- a/src/dht/test_dht_twopeer_data.conf +++ b/src/dht/test_dht_twopeer_data.conf @@ -9,7 +9,7 @@ AUTOSTART = YES DEBUG = NO AUTOSTART = YES #PREFIX = xterm -T dht -e gdb --args -PORT = 2100 +PORT = 12100 BINARY = gnunet-service-dht [block] @@ -34,7 +34,7 @@ HOSTNAME = localhost PORT = 12092 [arm] -DEFAULTSERVICES = core +DEFAULTSERVICES = core dht PORT = 12366 DEBUG = NO @@ -58,7 +58,7 @@ BEHIND_NAT = NO ALLOW_NAT = NO INTERNAL_ADDRESS = 127.0.0.1 EXTERNAL_ADDRESS = 127.0.0.1 -USE_LOCALADDR = NO +USE_LOCALADDR = YES [dns] AUTOSTART = NO @@ -72,3 +72,5 @@ AUTOSTART = NO [fs] AUTOSTART = NO +[namestore] +AUTOSTART = NO diff --git a/src/dht/test_dht_twopeer_get_put.c b/src/dht/test_dht_twopeer_get_put.c index 0bb9fac..3271d04 100644 --- a/src/dht/test_dht_twopeer_get_put.c +++ b/src/dht/test_dht_twopeer_get_put.c @@ -272,7 +272,7 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) memset (&key, 42, sizeof (GNUNET_HashCode)); /* Set the key to the same thing as when data was inserted */ #endif global_get_handle = - GNUNET_DHT_get_start (peer2dht, GNUNET_TIME_relative_get_forever (), + GNUNET_DHT_get_start (peer2dht, #if DNS GNUNET_BLOCK_TYPE_DNS, #else @@ -289,7 +289,7 @@ do_get (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * Schedule the GET request for some time in the future. */ static void -put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +put_finished (void *cls, int success) { GNUNET_SCHEDULER_cancel (die_task); die_task = @@ -391,29 +391,23 @@ topology_callback (void *cls, const struct GNUNET_PeerIdentity *first, if (emsg == NULL) { total_connections++; -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "connected peer %s to peer %s, distance %u\n", first_daemon->shortname, second_daemon->shortname, distance); -#endif } -#if VERBOSE else { failed_connections++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to connect peer %s to peer %s with error :\n%s\n", first_daemon->shortname, second_daemon->shortname, emsg); } -#endif if (total_connections == expected_connections) { -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Created %d total connections, which is our target number! Starting next phase of testing.\n", total_connections); -#endif GNUNET_SCHEDULER_cancel (die_task); die_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, "from test gets"); @@ -482,10 +476,8 @@ peers_started_callback (void *cls, const struct GNUNET_PeerIdentity *id, if (peers_left == 0) /* Indicates all peers started */ { -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "All %d daemons started, now connecting peers!\n", num_peers); -#endif expected_connections = -1; if ((pg != NULL)) /* Sanity check */ { @@ -556,9 +548,6 @@ check () char *const argv[] = { "test-dht-twopeer-get-put", /* Name to give running binary */ "-c", "test_dht_twopeer_data.conf", /* Config file to use */ -#if VERBOSE - "-L", "DEBUG", -#endif NULL }; struct GNUNET_GETOPT_CommandLineOption options[] = { @@ -583,11 +572,7 @@ main (int argc, char *argv[]) int ret; GNUNET_log_setup ("test-dht-twopeer", -#if VERBOSE - "DEBUG", -#else "WARNING", -#endif NULL); ret = check (); /** diff --git a/src/dht/test_dht_twopeer_path_tracking.c b/src/dht/test_dht_twopeer_path_tracking.c index 6e764a3..6ecf6a3 100644 --- a/src/dht/test_dht_twopeer_path_tracking.c +++ b/src/dht/test_dht_twopeer_path_tracking.c @@ -249,7 +249,7 @@ get_result_iterator (void *cls, struct GNUNET_TIME_Absolute exp, * Schedule the GET request for some time in the future. */ static void -put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +put_finished (void *cls, int success) { GNUNET_HashCode key; /* Key for data lookup */ @@ -259,7 +259,7 @@ put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) "waiting for get response (data not found)"); memset (&key, 42, sizeof (GNUNET_HashCode)); /* Set the key to the same thing as when data was inserted */ global_get_handle = - GNUNET_DHT_get_start (peer2dht, GNUNET_TIME_relative_get_forever (), + GNUNET_DHT_get_start (peer2dht, GNUNET_BLOCK_TYPE_TEST, &key, 1, GNUNET_DHT_RO_RECORD_ROUTE, NULL, 0, &get_result_iterator, NULL); diff --git a/src/dht/test_dht_twopeer_put_get.c b/src/dht/test_dht_twopeer_put_get.c index 48bf9f8..44150e3 100644 --- a/src/dht/test_dht_twopeer_put_get.c +++ b/src/dht/test_dht_twopeer_put_get.c @@ -129,9 +129,15 @@ static struct GNUNET_DHT_Handle *peer1dht; static struct GNUNET_DHT_Handle *peer2dht; /** + * Handle for our PUT operation. + */ +static struct GNUNET_DHT_PutHandle *put_op; + + +/** * Check whether peers successfully shut down. */ -void +static void shutdown_callback (void *cls, const char *emsg) { if (emsg != NULL) @@ -164,6 +170,11 @@ finish_testing (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void end_badly_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + if (NULL != put_op) + { + GNUNET_DHT_put_cancel (put_op); + put_op = NULL; + } if (peer1dht != NULL) GNUNET_DHT_disconnect (peer1dht); @@ -174,6 +185,7 @@ end_badly_cont (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_TESTING_daemons_stop (pg, TIMEOUT, &shutdown_callback, NULL); } + /** * Check if the get_handle is being used, if so stop the request. Either * way, schedule the end_badly_cont function which actually shuts down the @@ -242,10 +254,11 @@ get_result_iterator (void *cls, struct GNUNET_TIME_Absolute exp, * Schedule the GET request for some time in the future. */ static void -put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +put_finished (void *cls, int success) { GNUNET_HashCode key; /* Key for data lookup */ + put_op = NULL; GNUNET_SCHEDULER_cancel (die_task); die_task = GNUNET_SCHEDULER_add_delayed (GET_TIMEOUT, &end_badly, @@ -253,7 +266,7 @@ put_finished (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) memset (&key, 42, sizeof (GNUNET_HashCode)); /* Set the key to the same thing as when data was inserted */ global_get_handle = - GNUNET_DHT_get_start (peer2dht, GNUNET_TIME_UNIT_FOREVER_REL, + GNUNET_DHT_get_start (peer2dht, GNUNET_BLOCK_TYPE_TEST, &key, 1, GNUNET_DHT_RO_NONE, NULL, 0, &get_result_iterator, NULL); } @@ -272,9 +285,9 @@ do_put (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) memset (data, 43, sizeof (data)); /* Insert the data at the first peer */ - GNUNET_DHT_put (peer1dht, &key, 1, GNUNET_DHT_RO_NONE, GNUNET_BLOCK_TYPE_TEST, - sizeof (data), data, GNUNET_TIME_UNIT_FOREVER_ABS, - GNUNET_TIME_UNIT_FOREVER_REL, &put_finished, NULL); + put_op = GNUNET_DHT_put (peer1dht, &key, 1, GNUNET_DHT_RO_NONE, GNUNET_BLOCK_TYPE_TEST, + sizeof (data), data, GNUNET_TIME_UNIT_FOREVER_ABS, + GNUNET_TIME_UNIT_FOREVER_REL, &put_finished, NULL); } @@ -299,29 +312,23 @@ topology_callback (void *cls, const struct GNUNET_PeerIdentity *first, if (emsg == NULL) { total_connections++; -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "connected peer %s to peer %s, distance %u\n", first_daemon->shortname, second_daemon->shortname, distance); -#endif } -#if VERBOSE else { failed_connections++; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to connect peer %s to peer %s with error :\n%s\n", first_daemon->shortname, second_daemon->shortname, emsg); } -#endif if (total_connections == expected_connections) { -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Created %d total connections, which is our target number! Starting next phase of testing.\n", total_connections); -#endif GNUNET_SCHEDULER_cancel (die_task); die_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, "from test gets"); @@ -389,10 +396,8 @@ peers_started_callback (void *cls, const struct GNUNET_PeerIdentity *id, if (peers_left == 0) /* Indicates all peers started */ { -#if VERBOSE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "All %d daemons started, now connecting peers!\n", num_peers); -#endif expected_connections = -1; if ((pg != NULL)) /* Sanity check */ { @@ -463,9 +468,6 @@ check () char *const argv[] = { "test-dht-twopeer-put-get", /* Name to give running binary */ "-c", "test_dht_twopeer_data.conf", /* Config file to use */ -#if VERBOSE - "-L", "DEBUG", -#endif NULL }; struct GNUNET_GETOPT_CommandLineOption options[] = { @@ -490,11 +492,7 @@ main (int argc, char *argv[]) int ret; GNUNET_log_setup ("test-dht-twopeer", -#if VERBOSE - "DEBUG", -#else "WARNING", -#endif NULL); ret = check (); /** |