diff options
author | Bertrand Marc <beberking@gmail.com> | 2012-06-06 20:47:48 +0200 |
---|---|---|
committer | Bertrand Marc <beberking@gmail.com> | 2012-06-06 20:47:48 +0200 |
commit | 740b30688bd745a527f96f9116c19acb3480971a (patch) | |
tree | 2709a3f4dba11c174aa9e1ba3612e30c578e76a9 /src/dht | |
parent | 2b81464a43485fcc8ce079fafdee7b7a171835f4 (diff) |
Imported Upstream version 0.9.3upstream/0.9.3
Diffstat (limited to 'src/dht')
29 files changed, 1740 insertions, 591 deletions
diff --git a/src/dht/Makefile.am b/src/dht/Makefile.am index 93880c2..b2d18d2 100644 --- a/src/dht/Makefile.am +++ b/src/dht/Makefile.am @@ -50,6 +50,7 @@ libgnunet_plugin_block_dht_la_DEPENDENCIES = \ bin_PROGRAMS = \ gnunet-service-dht \ + gnunet-dht-monitor \ gnunet-dht-get \ gnunet-dht-put @@ -92,6 +93,16 @@ gnunet_dht_put_LDADD = \ gnunet_dht_put_DEPENDENCIES = \ libgnunetdht.la +gnunet_dht_monitor_SOURCES = \ + gnunet-dht-monitor.c +gnunet_dht_monitor_LDADD = \ + $(top_builddir)/src/dht/libgnunetdht.la \ + $(top_builddir)/src/core/libgnunetcore.la \ + $(top_builddir)/src/util/libgnunetutil.la +gnunet_dht_monitor_DEPENDENCIES = \ + libgnunetdht.la + + check_PROGRAMS = \ test_dht_api \ test_dht_twopeer \ diff --git a/src/dht/Makefile.in b/src/dht/Makefile.in index 97e9c59..12ce558 100644 --- a/src/dht/Makefile.in +++ b/src/dht/Makefile.in @@ -37,8 +37,8 @@ POST_UNINSTALL = : build_triplet = @build@ host_triplet = @host@ target_triplet = @target@ -bin_PROGRAMS = gnunet-service-dht$(EXEEXT) gnunet-dht-get$(EXEEXT) \ - gnunet-dht-put$(EXEEXT) +bin_PROGRAMS = gnunet-service-dht$(EXEEXT) gnunet-dht-monitor$(EXEEXT) \ + gnunet-dht-get$(EXEEXT) gnunet-dht-put$(EXEEXT) check_PROGRAMS = test_dht_api$(EXEEXT) test_dht_twopeer$(EXEEXT) \ test_dht_twopeer_put_get$(EXEEXT) \ test_dht_twopeer_get_put$(EXEEXT) \ @@ -123,6 +123,8 @@ libgnunetdht_la_LINK = $(LIBTOOL) $(AM_V_lt) --tag=CC \ PROGRAMS = $(bin_PROGRAMS) am_gnunet_dht_get_OBJECTS = gnunet-dht-get.$(OBJEXT) gnunet_dht_get_OBJECTS = $(am_gnunet_dht_get_OBJECTS) +am_gnunet_dht_monitor_OBJECTS = gnunet-dht-monitor.$(OBJEXT) +gnunet_dht_monitor_OBJECTS = $(am_gnunet_dht_monitor_OBJECTS) am_gnunet_dht_put_OBJECTS = gnunet-dht-put.$(OBJEXT) gnunet_dht_put_OBJECTS = $(am_gnunet_dht_put_OBJECTS) am_gnunet_service_dht_OBJECTS = gnunet-service-dht.$(OBJEXT) \ @@ -208,19 +210,21 @@ am__v_GEN_ = $(am__v_GEN_$(AM_DEFAULT_VERBOSITY)) am__v_GEN_0 = @echo " GEN " $@; SOURCES = $(libgnunet_plugin_block_dht_la_SOURCES) \ $(libgnunetdht_la_SOURCES) $(gnunet_dht_get_SOURCES) \ - $(gnunet_dht_put_SOURCES) $(gnunet_service_dht_SOURCES) \ - $(test_dht_2dtorus_SOURCES) $(test_dht_api_SOURCES) \ - $(test_dht_line_SOURCES) $(test_dht_monitor_SOURCES) \ - $(test_dht_multipeer_SOURCES) $(test_dht_twopeer_SOURCES) \ + $(gnunet_dht_monitor_SOURCES) $(gnunet_dht_put_SOURCES) \ + $(gnunet_service_dht_SOURCES) $(test_dht_2dtorus_SOURCES) \ + $(test_dht_api_SOURCES) $(test_dht_line_SOURCES) \ + $(test_dht_monitor_SOURCES) $(test_dht_multipeer_SOURCES) \ + $(test_dht_twopeer_SOURCES) \ $(test_dht_twopeer_get_put_SOURCES) \ $(test_dht_twopeer_path_tracking_SOURCES) \ $(test_dht_twopeer_put_get_SOURCES) DIST_SOURCES = $(libgnunet_plugin_block_dht_la_SOURCES) \ $(libgnunetdht_la_SOURCES) $(gnunet_dht_get_SOURCES) \ - $(gnunet_dht_put_SOURCES) $(gnunet_service_dht_SOURCES) \ - $(test_dht_2dtorus_SOURCES) $(test_dht_api_SOURCES) \ - $(test_dht_line_SOURCES) $(test_dht_monitor_SOURCES) \ - $(test_dht_multipeer_SOURCES) $(test_dht_twopeer_SOURCES) \ + $(gnunet_dht_monitor_SOURCES) $(gnunet_dht_put_SOURCES) \ + $(gnunet_service_dht_SOURCES) $(test_dht_2dtorus_SOURCES) \ + $(test_dht_api_SOURCES) $(test_dht_line_SOURCES) \ + $(test_dht_monitor_SOURCES) $(test_dht_multipeer_SOURCES) \ + $(test_dht_twopeer_SOURCES) \ $(test_dht_twopeer_get_put_SOURCES) \ $(test_dht_twopeer_path_tracking_SOURCES) \ $(test_dht_twopeer_put_get_SOURCES) @@ -285,6 +289,7 @@ INSTALL_SCRIPT = @INSTALL_SCRIPT@ INSTALL_STRIP_PROGRAM = @INSTALL_STRIP_PROGRAM@ INTLLIBS = @INTLLIBS@ INTL_MACOSX_LIBS = @INTL_MACOSX_LIBS@ +JAVAPORT = @JAVAPORT@ LD = @LD@ LDFLAGS = @LDFLAGS@ LIBADD_DL = @LIBADD_DL@ @@ -318,6 +323,7 @@ LT_DLLOADERS = @LT_DLLOADERS@ LT_DLPREOPEN = @LT_DLPREOPEN@ MAKEINFO = @MAKEINFO@ MKDIR_P = @MKDIR_P@ +MONKEYPREFIX = @MONKEYPREFIX@ MSGFMT = @MSGFMT@ MSGFMT_015 = @MSGFMT_015@ MSGMERGE = @MSGMERGE@ @@ -519,6 +525,17 @@ gnunet_dht_put_LDADD = \ gnunet_dht_put_DEPENDENCIES = \ libgnunetdht.la +gnunet_dht_monitor_SOURCES = \ + gnunet-dht-monitor.c + +gnunet_dht_monitor_LDADD = \ + $(top_builddir)/src/dht/libgnunetdht.la \ + $(top_builddir)/src/core/libgnunetcore.la \ + $(top_builddir)/src/util/libgnunetutil.la + +gnunet_dht_monitor_DEPENDENCIES = \ + libgnunetdht.la + test_dht_api_SOURCES = \ test_dht_api.c @@ -778,6 +795,9 @@ clean-checkPROGRAMS: gnunet-dht-get$(EXEEXT): $(gnunet_dht_get_OBJECTS) $(gnunet_dht_get_DEPENDENCIES) @rm -f gnunet-dht-get$(EXEEXT) $(AM_V_CCLD)$(LINK) $(gnunet_dht_get_OBJECTS) $(gnunet_dht_get_LDADD) $(LIBS) +gnunet-dht-monitor$(EXEEXT): $(gnunet_dht_monitor_OBJECTS) $(gnunet_dht_monitor_DEPENDENCIES) + @rm -f gnunet-dht-monitor$(EXEEXT) + $(AM_V_CCLD)$(LINK) $(gnunet_dht_monitor_OBJECTS) $(gnunet_dht_monitor_LDADD) $(LIBS) gnunet-dht-put$(EXEEXT): $(gnunet_dht_put_OBJECTS) $(gnunet_dht_put_DEPENDENCIES) @rm -f gnunet-dht-put$(EXEEXT) $(AM_V_CCLD)$(LINK) $(gnunet_dht_put_OBJECTS) $(gnunet_dht_put_LDADD) $(LIBS) @@ -820,6 +840,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/dht_api.Plo@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-get.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-monitor.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-dht-put.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-service-dht.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/gnunet-service-dht_clients.Po@am__quote@ diff --git a/src/dht/dht.conf.in b/src/dht/dht.conf.in index 17c13e9..59581dc 100644 --- a/src/dht/dht.conf.in +++ b/src/dht/dht.conf.in @@ -1,6 +1,6 @@ [dht] AUTOSTART = YES -@UNIXONLY@ PORT = 2095 +@JAVAPORT@PORT = 2095 HOSTNAME = localhost HOME = $SERVICEHOME CONFIG = $DEFAULTCONFIG diff --git a/src/dht/dht.h b/src/dht/dht.h index 9894be8..8adf49f 100644 --- a/src/dht/dht.h +++ b/src/dht/dht.h @@ -181,6 +181,11 @@ struct GNUNET_DHT_ClientPutMessage uint32_t desired_replication_level GNUNET_PACKED; /** + * Unique ID for the PUT message. + */ + uint64_t unique_id GNUNET_PACKED; + + /** * How long should this data persist? */ struct GNUNET_TIME_AbsoluteNBO expiration; @@ -196,20 +201,38 @@ struct GNUNET_DHT_ClientPutMessage /** - * Message to monitor requests going through peer, clients <--> DHT service. + * Message to confirming receipt of PUT, sent from DHT service to clients. */ -struct GNUNET_DHT_MonitorMessage +struct GNUNET_DHT_ClientPutConfirmationMessage { /** - * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_{GET, PUT, GET_RESP, PUT_RESP*} - * (*) not yet implemented, necessary for key randomization + * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK */ struct GNUNET_MessageHeader header; /** - * The type of data in the request. + * Always zero. */ - uint32_t type GNUNET_PACKED; + uint32_t reserved GNUNET_PACKED; + + /** + * Unique ID from the PUT message that is being confirmed. + */ + uint64_t unique_id GNUNET_PACKED; + +}; + + + +/** + * Message to monitor put requests going through peer, DHT service -> clients. + */ +struct GNUNET_DHT_MonitorPutMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT + */ + struct GNUNET_MessageHeader header; /** * Message options, actually an 'enum GNUNET_DHT_RouteOption' value. @@ -217,37 +240,165 @@ struct GNUNET_DHT_MonitorMessage uint32_t options GNUNET_PACKED; /** + * The type of data in the request. + */ + uint32_t type GNUNET_PACKED; + + /** + * Hop count so far. + */ + uint32_t hop_count GNUNET_PACKED; + + /** * Replication level for this message */ uint32_t desired_replication_level GNUNET_PACKED; /** * Number of peers recorded in the outgoing path from source to the - * storgage location of this message. + * storage location of this message. */ uint32_t put_path_length GNUNET_PACKED; /** - * The number of peer identities recorded from the storage location - * to this peer. + * How long should this data persist? */ - uint32_t get_path_length GNUNET_PACKED; + struct GNUNET_TIME_AbsoluteNBO expiration_time; /** - * Unique ID for GET / GET responses. + * The key to store the value under. */ - uint64_t unique_id GNUNET_PACKED; + GNUNET_HashCode key; + + /* put path (if tracked) */ + + /* Payload */ +}; + + +/** + * Message to request monitoring messages, clients -> DHT service. + */ +struct GNUNET_DHT_MonitorStartStopMessage +{ /** - * How long should this data persist? + * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_(START|STOP) */ - struct GNUNET_TIME_AbsoluteNBO expiration; + struct GNUNET_MessageHeader header; + + /** + * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all. + */ + uint32_t type GNUNET_PACKED; + + /** + * Flag whether to notify about GET messages. + */ + int16_t get GNUNET_PACKED; + + /** + * Flag whether to notify about GET_REPONSE messages. + */ + int16_t get_resp GNUNET_PACKED; + + /** + * Flag whether to notify about PUT messages. + */ + int16_t put GNUNET_PACKED; + + /** + * Flag whether to use the provided key to filter messages. + */ + int16_t filter_key GNUNET_PACKED; + + /** + * The key to filter messages by. + */ + GNUNET_HashCode key; +}; + + +/** + * Message to monitor get requests going through peer, DHT service -> clients. + */ +struct GNUNET_DHT_MonitorGetMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT + */ + struct GNUNET_MessageHeader header; + + /** + * Message options, actually an 'enum GNUNET_DHT_RouteOption' value. + */ + uint32_t options GNUNET_PACKED; + + /** + * The type of data in the request. + */ + uint32_t type GNUNET_PACKED; + + /** + * Hop count + */ + uint32_t hop_count GNUNET_PACKED; + + /** + * Replication level for this message + */ + uint32_t desired_replication_level GNUNET_PACKED; + + /** + * Number of peers recorded in the outgoing path from source to the + * storage location of this message. + */ + uint32_t get_path_length GNUNET_PACKED; /** * The key to store the value under. */ GNUNET_HashCode key; + /* get path (if tracked) */ + +}; + +/** + * Message to monitor get results going through peer, DHT service -> clients. + */ +struct GNUNET_DHT_MonitorGetRespMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Content type. + */ + uint32_t type GNUNET_PACKED; + + /** + * Length of the PUT path that follows (if tracked). + */ + uint32_t put_path_length GNUNET_PACKED; + + /** + * Length of the GET path that follows (if tracked). + */ + uint32_t get_path_length GNUNET_PACKED; + + /** + * When does the content expire? + */ + struct GNUNET_TIME_AbsoluteNBO expiration_time; + + /** + * The key of the corresponding GET request. + */ + GNUNET_HashCode key; + /* put path (if tracked) */ /* get path (if tracked) */ diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 3cb13b4..420eacb 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010 Christian Grothoff (and other contributing authors) + (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -74,11 +74,6 @@ struct PendingMessage void *cont_cls; /** - * Timeout task for this message - */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; - - /** * Unique ID for this request */ uint64_t unique_id; @@ -100,6 +95,56 @@ struct PendingMessage /** + * Handle to a PUT request. + */ +struct GNUNET_DHT_PutHandle +{ + /** + * Kept in a DLL. + */ + struct GNUNET_DHT_PutHandle *next; + + /** + * Kept in a DLL. + */ + struct GNUNET_DHT_PutHandle *prev; + + /** + * Continuation to call when done. + */ + GNUNET_DHT_PutContinuation cont; + + /** + * Pending message associated with this PUT operation, + * NULL after the message has been transmitted to the service. + */ + struct PendingMessage *pending; + + /** + * Main handle to this DHT api + */ + struct GNUNET_DHT_Handle *dht_handle; + + /** + * Closure for 'cont'. + */ + void *cont_cls; + + /** + * Timeout task for this operation. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Unique ID for the PUT operation. + */ + uint64_t unique_id; + +}; + + + +/** * Handle to a GET request */ struct GNUNET_DHT_GetHandle @@ -171,9 +216,19 @@ struct GNUNET_DHT_MonitorHandle GNUNET_HashCode *key; /** - * Callback for each received message of interest. + * Callback for each received message of type get. + */ + GNUNET_DHT_MonitorGetCB get_cb; + + /** + * Callback for each received message of type get response. + */ + GNUNET_DHT_MonitorGetRespCB get_resp_cb; + + /** + * Callback for each received message of type put. */ - GNUNET_DHT_MonitorCB cb; + GNUNET_DHT_MonitorPutCB put_cb; /** * Closure for cb. @@ -225,8 +280,18 @@ struct GNUNET_DHT_Handle struct GNUNET_DHT_MonitorHandle *monitor_tail; /** - * Hash map containing the current outstanding unique requests - * (values are of type 'struct GNUNET_DHT_RouteHandle'). + * Head of active PUT requests. + */ + struct GNUNET_DHT_PutHandle *put_head; + + /** + * Tail of active PUT requests. + */ + struct GNUNET_DHT_PutHandle *put_tail; + + /** + * Hash map containing the current outstanding unique GET requests + * (values are of type 'struct GNUNET_DHT_GetHandle'). */ struct GNUNET_CONTAINER_MultiHashMap *active_requests; @@ -257,6 +322,8 @@ struct GNUNET_DHT_Handle * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types * + * @param cls the 'struct GNUNET_DHT_Handle' + * @param msg the incoming message */ static void service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); @@ -265,16 +332,17 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); /** * Try to (re)connect to the DHT service. * + * @param handle DHT handle to reconnect * @return GNUNET_YES on success, GNUNET_NO on failure. */ static int try_connect (struct GNUNET_DHT_Handle *handle) { - if (handle->client != NULL) + if (NULL != handle->client) return GNUNET_OK; handle->in_receive = GNUNET_NO; handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg); - if (handle->client == NULL) + if (NULL == handle->client) { LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to connect to the DHT service!\n")); @@ -314,6 +382,7 @@ add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value) /** * Try to send messages from list of messages to send + * * @param handle DHT_Handle */ static void @@ -359,17 +428,34 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void do_disconnect (struct GNUNET_DHT_Handle *handle) { - if (handle->client == NULL) + struct GNUNET_DHT_PutHandle *ph; + struct GNUNET_DHT_PutHandle *next; + + if (NULL == handle->client) return; - GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task); if (NULL != handle->th) GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from DHT service, will try to reconnect in %llu ms\n", (unsigned long long) handle->retry_time.rel_value); - GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); + GNUNET_CLIENT_disconnect (handle->client); handle->client = NULL; + + /* signal disconnect to all PUT requests that were transmitted but waiting + for the put confirmation */ + next = handle->put_head; + while (NULL != (ph = next)) + { + next = ph->next; + if (NULL == ph->pending) + { + if (NULL != ph->cont) + ph->cont (ph->cont_cls, GNUNET_SYSERR); + GNUNET_DHT_put_cancel (ph); + } + } handle->reconnect_task = GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle); } @@ -377,6 +463,11 @@ do_disconnect (struct GNUNET_DHT_Handle *handle) /** * Transmit the next pending message, called by notify_transmit_ready + * + * @param cls the DHT handle + * @param size number of bytes available in 'buf' for transmission + * @param buf where to copy messages for the service + * @return number of bytes written to 'buf' */ static size_t transmit_pending (void *cls, size_t size, void *buf); @@ -384,20 +475,22 @@ transmit_pending (void *cls, size_t size, void *buf); /** * Try to send messages from list of messages to send + * + * @param handle handle to DHT */ static void process_pending_messages (struct GNUNET_DHT_Handle *handle) { struct PendingMessage *head; - if (handle->client == NULL) + if (NULL == handle->client) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "process_pending_messages called, but client is null, reconnecting\n"); + "process_pending_messages called, but client is NULL, reconnecting\n"); do_disconnect (handle); return; } - if (handle->th != NULL) + if (NULL != handle->th) return; if (NULL == (head = handle->pending_head)) return; @@ -417,6 +510,11 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle) /** * Transmit the next pending message, called by notify_transmit_ready + * + * @param cls the DHT handle + * @param size number of bytes available in 'buf' for transmission + * @param buf where to copy messages for the service + * @return number of bytes written to 'buf' */ static size_t transmit_pending (void *cls, size_t size, void *buf) @@ -426,7 +524,7 @@ transmit_pending (void *cls, size_t size, void *buf) size_t tsize; handle->th = NULL; - if (buf == NULL) + if (NULL == buf) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission to DHT service failed! Reconnecting!\n"); @@ -446,11 +544,6 @@ transmit_pending (void *cls, size_t size, void *buf) GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, head); head->in_pending_queue = GNUNET_NO; - if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (head->timeout_task); - head->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } if (NULL != head->cont) { head->cont (head->cont_cls, NULL); @@ -533,63 +626,178 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) return GNUNET_YES; } - /** - * Process a monitoring message from the service. + * Process a get monitor message from the service. * * @param handle The DHT handle. - * @param msg Message from the service. + * @param msg Monitor get message from the service. * * @return GNUNET_OK if everything went fine, * GNUNET_SYSERR if the message is malformed. */ static int -process_monitor_message (struct GNUNET_DHT_Handle *handle, - const struct GNUNET_MessageHeader *msg) +process_monitor_get_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorGetMessage *msg) { - struct GNUNET_DHT_MonitorMessage *m; struct GNUNET_DHT_MonitorHandle *h; + + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->get_cb)) + h->get_cb (h->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + ntohl (msg->get_path_length), + (struct GNUNET_PeerIdentity *) &msg[1], + &msg->key); + } + return GNUNET_OK; +} + + +/** + * Process a get response monitor message from the service. + * + * @param handle The DHT handle. + * @param msg monitor get response message from the service + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorGetRespMessage + *msg) +{ + struct GNUNET_DHT_MonitorHandle *h; + struct GNUNET_PeerIdentity *path; + uint32_t getl; + uint32_t putl; size_t msize; - if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || - ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) - return GNUNET_SYSERR; - msize = ntohs (msg->size); - if (msize < sizeof (struct GNUNET_DHT_MonitorMessage)) + msize = ntohs (msg->header.size); + path = (struct GNUNET_PeerIdentity *) &msg[1]; + getl = ntohl (msg->get_path_length); + putl = ntohl (msg->put_path_length); + if ( (getl + putl < getl) || + ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) ) + { + GNUNET_break (0); return GNUNET_SYSERR; + } + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->get_resp_cb)) + h->get_resp_cb (h->cb_cls, + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + path, getl, + &path[getl], putl, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (void *) &path[getl + putl], + msize - + sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - + sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); + } + return GNUNET_OK; +} - m = (struct GNUNET_DHT_MonitorMessage *) msg; - h = handle->monitor_head; - while (NULL != h) + +/** + * Process a put monitor message from the service. + * + * @param handle The DHT handle. + * @param msg Monitor put message from the service. + * + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_put_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorPutMessage *msg) +{ + struct GNUNET_DHT_MonitorHandle *h; + size_t msize; + struct GNUNET_PeerIdentity *path; + uint32_t putl; + + msize = ntohs (msg->header.size); + path = (struct GNUNET_PeerIdentity *) &msg[1]; + putl = ntohl (msg->put_path_length); + if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl) { - if (h->type == ntohl(m->type) && - (NULL == h->key || - memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0)) - { - struct GNUNET_PeerIdentity *path; - uint32_t getl; - uint32_t putl; - - path = (struct GNUNET_PeerIdentity *) &m[1]; - getl = ntohl (m->get_path_length); - putl = ntohl (m->put_path_length); - h->cb (h->cb_cls, ntohs(msg->type), - GNUNET_TIME_absolute_ntoh(m->expiration), - &m->key, - &path[getl], putl, path, getl, - ntohl (m->desired_replication_level), - ntohl (m->options), ntohl (m->type), - (void *) &path[getl + putl], - ntohs (msg->size) - - sizeof (struct GNUNET_DHT_MonitorMessage) - - sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); - } - h = h->next; + GNUNET_break (0); + return GNUNET_SYSERR; + } + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->put_cb)) + h->put_cb (h->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + putl, path, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (void *) &path[putl], + msize - + sizeof (struct GNUNET_DHT_MonitorPutMessage) - + sizeof (struct GNUNET_PeerIdentity) * putl); } + return GNUNET_OK; +} + + +/** + * Process a put confirmation message from the service. + * + * @param handle The DHT handle. + * @param msg confirmation message from the service. + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_put_confirmation_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_ClientPutConfirmationMessage *msg) +{ + struct GNUNET_DHT_PutHandle *ph; + GNUNET_DHT_PutContinuation cont; + void *cont_cls; + for (ph = handle->put_head; NULL != ph; ph = ph->next) + if (ph->unique_id == msg->unique_id) + break; + if (NULL == ph) + return GNUNET_OK; + cont = ph->cont; + cont_cls = ph->cont_cls; + GNUNET_DHT_put_cancel (ph); + if (NULL != cont) + cont (cont_cls, GNUNET_OK); retur |