diff options
author | Bertrand Marc <beberking@gmail.com> | 2012-06-06 20:47:48 +0200 |
---|---|---|
committer | Bertrand Marc <beberking@gmail.com> | 2012-06-06 20:47:48 +0200 |
commit | 740b30688bd745a527f96f9116c19acb3480971a (patch) | |
tree | 2709a3f4dba11c174aa9e1ba3612e30c578e76a9 /src/dht/dht_api.c | |
parent | 2b81464a43485fcc8ce079fafdee7b7a171835f4 (diff) |
Imported Upstream version 0.9.3upstream/0.9.3
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r-- | src/dht/dht_api.c | 605 |
1 files changed, 486 insertions, 119 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 3cb13b4..420eacb 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2009, 2010 Christian Grothoff (and other contributing authors) + (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -74,11 +74,6 @@ struct PendingMessage void *cont_cls; /** - * Timeout task for this message - */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; - - /** * Unique ID for this request */ uint64_t unique_id; @@ -100,6 +95,56 @@ struct PendingMessage /** + * Handle to a PUT request. + */ +struct GNUNET_DHT_PutHandle +{ + /** + * Kept in a DLL. + */ + struct GNUNET_DHT_PutHandle *next; + + /** + * Kept in a DLL. + */ + struct GNUNET_DHT_PutHandle *prev; + + /** + * Continuation to call when done. + */ + GNUNET_DHT_PutContinuation cont; + + /** + * Pending message associated with this PUT operation, + * NULL after the message has been transmitted to the service. + */ + struct PendingMessage *pending; + + /** + * Main handle to this DHT api + */ + struct GNUNET_DHT_Handle *dht_handle; + + /** + * Closure for 'cont'. + */ + void *cont_cls; + + /** + * Timeout task for this operation. + */ + GNUNET_SCHEDULER_TaskIdentifier timeout_task; + + /** + * Unique ID for the PUT operation. + */ + uint64_t unique_id; + +}; + + + +/** * Handle to a GET request */ struct GNUNET_DHT_GetHandle @@ -171,9 +216,19 @@ struct GNUNET_DHT_MonitorHandle GNUNET_HashCode *key; /** - * Callback for each received message of interest. + * Callback for each received message of type get. + */ + GNUNET_DHT_MonitorGetCB get_cb; + + /** + * Callback for each received message of type get response. + */ + GNUNET_DHT_MonitorGetRespCB get_resp_cb; + + /** + * Callback for each received message of type put. */ - GNUNET_DHT_MonitorCB cb; + GNUNET_DHT_MonitorPutCB put_cb; /** * Closure for cb. @@ -225,8 +280,18 @@ struct GNUNET_DHT_Handle struct GNUNET_DHT_MonitorHandle *monitor_tail; /** - * Hash map containing the current outstanding unique requests - * (values are of type 'struct GNUNET_DHT_RouteHandle'). + * Head of active PUT requests. + */ + struct GNUNET_DHT_PutHandle *put_head; + + /** + * Tail of active PUT requests. + */ + struct GNUNET_DHT_PutHandle *put_tail; + + /** + * Hash map containing the current outstanding unique GET requests + * (values are of type 'struct GNUNET_DHT_GetHandle'). */ struct GNUNET_CONTAINER_MultiHashMap *active_requests; @@ -257,6 +322,8 @@ struct GNUNET_DHT_Handle * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types * + * @param cls the 'struct GNUNET_DHT_Handle' + * @param msg the incoming message */ static void service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); @@ -265,16 +332,17 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg); /** * Try to (re)connect to the DHT service. * + * @param handle DHT handle to reconnect * @return GNUNET_YES on success, GNUNET_NO on failure. */ static int try_connect (struct GNUNET_DHT_Handle *handle) { - if (handle->client != NULL) + if (NULL != handle->client) return GNUNET_OK; handle->in_receive = GNUNET_NO; handle->client = GNUNET_CLIENT_connect ("dht", handle->cfg); - if (handle->client == NULL) + if (NULL == handle->client) { LOG (GNUNET_ERROR_TYPE_WARNING, _("Failed to connect to the DHT service!\n")); @@ -314,6 +382,7 @@ add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value) /** * Try to send messages from list of messages to send + * * @param handle DHT_Handle */ static void @@ -359,17 +428,34 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) static void do_disconnect (struct GNUNET_DHT_Handle *handle) { - if (handle->client == NULL) + struct GNUNET_DHT_PutHandle *ph; + struct GNUNET_DHT_PutHandle *next; + + if (NULL == handle->client) return; - GNUNET_assert (handle->reconnect_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == handle->reconnect_task); if (NULL != handle->th) GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting from DHT service, will try to reconnect in %llu ms\n", (unsigned long long) handle->retry_time.rel_value); - GNUNET_CLIENT_disconnect (handle->client, GNUNET_NO); + GNUNET_CLIENT_disconnect (handle->client); handle->client = NULL; + + /* signal disconnect to all PUT requests that were transmitted but waiting + for the put confirmation */ + next = handle->put_head; + while (NULL != (ph = next)) + { + next = ph->next; + if (NULL == ph->pending) + { + if (NULL != ph->cont) + ph->cont (ph->cont_cls, GNUNET_SYSERR); + GNUNET_DHT_put_cancel (ph); + } + } handle->reconnect_task = GNUNET_SCHEDULER_add_delayed (handle->retry_time, &try_reconnect, handle); } @@ -377,6 +463,11 @@ do_disconnect (struct GNUNET_DHT_Handle *handle) /** * Transmit the next pending message, called by notify_transmit_ready + * + * @param cls the DHT handle + * @param size number of bytes available in 'buf' for transmission + * @param buf where to copy messages for the service + * @return number of bytes written to 'buf' */ static size_t transmit_pending (void *cls, size_t size, void *buf); @@ -384,20 +475,22 @@ transmit_pending (void *cls, size_t size, void *buf); /** * Try to send messages from list of messages to send + * + * @param handle handle to DHT */ static void process_pending_messages (struct GNUNET_DHT_Handle *handle) { struct PendingMessage *head; - if (handle->client == NULL) + if (NULL == handle->client) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "process_pending_messages called, but client is null, reconnecting\n"); + "process_pending_messages called, but client is NULL, reconnecting\n"); do_disconnect (handle); return; } - if (handle->th != NULL) + if (NULL != handle->th) return; if (NULL == (head = handle->pending_head)) return; @@ -417,6 +510,11 @@ process_pending_messages (struct GNUNET_DHT_Handle *handle) /** * Transmit the next pending message, called by notify_transmit_ready + * + * @param cls the DHT handle + * @param size number of bytes available in 'buf' for transmission + * @param buf where to copy messages for the service + * @return number of bytes written to 'buf' */ static size_t transmit_pending (void *cls, size_t size, void *buf) @@ -426,7 +524,7 @@ transmit_pending (void *cls, size_t size, void *buf) size_t tsize; handle->th = NULL; - if (buf == NULL) + if (NULL == buf) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Transmission to DHT service failed! Reconnecting!\n"); @@ -446,11 +544,6 @@ transmit_pending (void *cls, size_t size, void *buf) GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, head); head->in_pending_queue = GNUNET_NO; - if (head->timeout_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (head->timeout_task); - head->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } if (NULL != head->cont) { head->cont (head->cont_cls, NULL); @@ -533,63 +626,178 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) return GNUNET_YES; } - /** - * Process a monitoring message from the service. + * Process a get monitor message from the service. * * @param handle The DHT handle. - * @param msg Message from the service. + * @param msg Monitor get message from the service. * * @return GNUNET_OK if everything went fine, * GNUNET_SYSERR if the message is malformed. */ static int -process_monitor_message (struct GNUNET_DHT_Handle *handle, - const struct GNUNET_MessageHeader *msg) +process_monitor_get_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorGetMessage *msg) { - struct GNUNET_DHT_MonitorMessage *m; struct GNUNET_DHT_MonitorHandle *h; + + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->get_cb)) + h->get_cb (h->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + ntohl (msg->get_path_length), + (struct GNUNET_PeerIdentity *) &msg[1], + &msg->key); + } + return GNUNET_OK; +} + + +/** + * Process a get response monitor message from the service. + * + * @param handle The DHT handle. + * @param msg monitor get response message from the service + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorGetRespMessage + *msg) +{ + struct GNUNET_DHT_MonitorHandle *h; + struct GNUNET_PeerIdentity *path; + uint32_t getl; + uint32_t putl; size_t msize; - if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET || - ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT) - return GNUNET_SYSERR; - msize = ntohs (msg->size); - if (msize < sizeof (struct GNUNET_DHT_MonitorMessage)) + msize = ntohs (msg->header.size); + path = (struct GNUNET_PeerIdentity *) &msg[1]; + getl = ntohl (msg->get_path_length); + putl = ntohl (msg->put_path_length); + if ( (getl + putl < getl) || + ( ((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < getl + putl) ) + { + GNUNET_break (0); return GNUNET_SYSERR; + } + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->get_resp_cb)) + h->get_resp_cb (h->cb_cls, + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + path, getl, + &path[getl], putl, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (void *) &path[getl + putl], + msize - + sizeof (struct GNUNET_DHT_MonitorGetRespMessage) - + sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); + } + return GNUNET_OK; +} - m = (struct GNUNET_DHT_MonitorMessage *) msg; - h = handle->monitor_head; - while (NULL != h) + +/** + * Process a put monitor message from the service. + * + * @param handle The DHT handle. + * @param msg Monitor put message from the service. + * + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_monitor_put_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_MonitorPutMessage *msg) +{ + struct GNUNET_DHT_MonitorHandle *h; + size_t msize; + struct GNUNET_PeerIdentity *path; + uint32_t putl; + + msize = ntohs (msg->header.size); + path = (struct GNUNET_PeerIdentity *) &msg[1]; + putl = ntohl (msg->put_path_length); + if (((msize - sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) / sizeof (struct GNUNET_PeerIdentity)) < putl) { - if (h->type == ntohl(m->type) && - (NULL == h->key || - memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0)) - { - struct GNUNET_PeerIdentity *path; - uint32_t getl; - uint32_t putl; - - path = (struct GNUNET_PeerIdentity *) &m[1]; - getl = ntohl (m->get_path_length); - putl = ntohl (m->put_path_length); - h->cb (h->cb_cls, ntohs(msg->type), - GNUNET_TIME_absolute_ntoh(m->expiration), - &m->key, - &path[getl], putl, path, getl, - ntohl (m->desired_replication_level), - ntohl (m->options), ntohl (m->type), - (void *) &path[getl + putl], - ntohs (msg->size) - - sizeof (struct GNUNET_DHT_MonitorMessage) - - sizeof (struct GNUNET_PeerIdentity) * (putl + getl)); - } - h = h->next; + GNUNET_break (0); + return GNUNET_SYSERR; + } + for (h = handle->monitor_head; NULL != h; h = h->next) + { + int type_ok; + int key_ok; + + type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); + key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, + sizeof (GNUNET_HashCode))); + if (type_ok && key_ok && (NULL != h->put_cb)) + h->put_cb (h->cb_cls, + ntohl (msg->options), + (enum GNUNET_BLOCK_Type) ntohl(msg->type), + ntohl (msg->hop_count), + ntohl (msg->desired_replication_level), + putl, path, + GNUNET_TIME_absolute_ntoh(msg->expiration_time), + &msg->key, + (void *) &path[putl], + msize - + sizeof (struct GNUNET_DHT_MonitorPutMessage) - + sizeof (struct GNUNET_PeerIdentity) * putl); } + return GNUNET_OK; +} + + +/** + * Process a put confirmation message from the service. + * + * @param handle The DHT handle. + * @param msg confirmation message from the service. + * @return GNUNET_OK if everything went fine, + * GNUNET_SYSERR if the message is malformed. + */ +static int +process_put_confirmation_message (struct GNUNET_DHT_Handle *handle, + const struct GNUNET_DHT_ClientPutConfirmationMessage *msg) +{ + struct GNUNET_DHT_PutHandle *ph; + GNUNET_DHT_PutContinuation cont; + void *cont_cls; + for (ph = handle->put_head; NULL != ph; ph = ph->next) + if (ph->unique_id == msg->unique_id) + break; + if (NULL == ph) + return GNUNET_OK; + cont = ph->cont; + cont_cls = ph->cont_cls; + GNUNET_DHT_put_cancel (ph); + if (NULL != cont) + cont (cont_cls, GNUNET_OK); return GNUNET_OK; } + /** * Handler for messages received from the DHT service * a demultiplexer which handles numerous message types @@ -602,38 +810,84 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) { struct GNUNET_DHT_Handle *handle = cls; const struct GNUNET_DHT_ClientResultMessage *dht_msg; + uint16_t msize; + int ret; - if (msg == NULL) + if (NULL == msg) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Error receiving data from DHT service, reconnecting\n"); do_disconnect (handle); return; } - if (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT) + ret = GNUNET_SYSERR; + msize = ntohs (msg->size); + switch (ntohs (msg->type)) { - if (process_monitor_message (handle, msg) == GNUNET_OK) + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET: + if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage)) { - GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, - GNUNET_TIME_UNIT_FOREVER_REL); - return; + GNUNET_break (0); + break; } - GNUNET_break (0); - do_disconnect (handle); - return; + ret = process_monitor_get_message(handle, + (const struct GNUNET_DHT_MonitorGetMessage *) msg); + break; + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP: + if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage)) + { + GNUNET_break (0); + break; + } + ret = process_monitor_get_resp_message(handle, + (const struct GNUNET_DHT_MonitorGetRespMessage *) msg); + break; + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT: + if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage)) + { + GNUNET_break (0); + break; + } + ret = process_monitor_put_message(handle, + (const struct GNUNET_DHT_MonitorPutMessage *) msg); + break; + case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP: + /* Not implemented yet */ + GNUNET_break(0); + break; + case GNUNET_MESSAGE_TYPE_DHT_CLIENT_RESULT: + if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) + { + GNUNET_break (0); + break; + } + ret = GNUNET_OK; + dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg; + LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n", + GNUNET_h2s (&dht_msg->key), handle); + GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, + &dht_msg->key, &process_reply, + (void *) dht_msg); + break; + case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK: + if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)) + { + GNUNET_break (0); + break; + } + ret = process_put_confirmation_message (handle, + (const struct GNUNET_DHT_ClientPutConfirmationMessage*) msg); + break; + default: + GNUNET_break(0); + break; } - if (ntohs (msg->size) < sizeof (struct GNUNET_DHT_ClientResultMessage)) + if (GNUNET_OK != ret) { GNUNET_break (0); do_disconnect (handle); return; } - dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n", - GNUNET_h2s (&dht_msg->key), handle); - GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, - &dht_msg->key, &process_reply, - (void *) dht_msg); GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, GNUNET_TIME_UNIT_FOREVER_REL); } @@ -677,11 +931,12 @@ void GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) { struct PendingMessage *pm; + struct GNUNET_DHT_PutHandle *ph; - GNUNET_assert (handle != NULL); + GNUNET_assert (NULL != handle); GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (handle->active_requests)); - if (handle->th != NULL) + if (NULL != handle->th) { GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; @@ -693,18 +948,24 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) pm); pm->in_pending_queue = GNUNET_NO; GNUNET_assert (GNUNET_YES == pm->free_on_send); - if (GNUNET_SCHEDULER_NO_TASK != pm->timeout_task) - GNUNET_SCHEDULER_cancel (pm->timeout_task); if (NULL != pm->cont) pm->cont (pm->cont_cls, NULL); GNUNET_free (pm); } - if (handle->client != NULL) + while (NULL != (ph = handle->put_head)) { - GNUNET_CLIENT_disconnect (handle->client, GNUNET_YES); + GNUNET_break (NULL == ph->pending); + if (NULL != ph->cont) + ph->cont (ph->cont_cls, GNUNET_SYSERR); + GNUNET_DHT_put_cancel (ph); + } + + if (NULL != handle->client) + { + GNUNET_CLIENT_disconnect (handle->client); handle->client = NULL; } - if (handle->reconnect_task != GNUNET_SCHEDULER_NO_TASK) + if (GNUNET_SCHEDULER_NO_TASK != handle->reconnect_task) GNUNET_SCHEDULER_cancel (handle->reconnect_task); GNUNET_CONTAINER_multihashmap_destroy (handle->active_requests); GNUNET_free (handle); @@ -720,22 +981,49 @@ GNUNET_DHT_disconnect (struct GNUNET_DHT_Handle *handle) static void timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct PendingMessage *pending = cls; - struct GNUNET_DHT_Handle *handle; + struct GNUNET_DHT_PutHandle *ph = cls; + struct GNUNET_DHT_Handle *handle = ph->dht_handle; - handle = pending->handle; - GNUNET_assert (GNUNET_YES == pending->in_pending_queue); - GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, - pending); - pending->in_pending_queue = GNUNET_NO; - if (pending->cont != NULL) - pending->cont (pending->cont_cls, tc); - GNUNET_free (pending); + ph->timeout_task = GNUNET_SCHEDULER_NO_TASK; + if (NULL != ph->pending) + { + GNUNET_CONTAINER_DLL_remove (handle->pending_head, handle->pending_tail, + ph->pending); + ph->pending->in_pending_queue = GNUNET_NO; + GNUNET_free (ph->pending); + } + if (NULL != ph->cont) + ph->cont (ph->cont_cls, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (handle->put_head, + handle->put_tail, + ph); + GNUNET_free (ph); +} + + +/** + * Function called whenever the PUT message leaves the queue. Sets + * the message pointer in the put handle to NULL. + * + * @param cls the 'struct GNUNET_DHT_PutHandle' + * @param tc unused + */ +static void +mark_put_message_gone (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_DHT_PutHandle *ph = cls; + + ph->pending = NULL; } /** - * Perform a PUT operation storing data in the DHT. + * Perform a PUT operation storing data in the DHT. FIXME: we should + * change the protocol to get a confirmation for the PUT from the DHT + * and call 'cont' only after getting the confirmation; otherwise, the + * client has no good way of telling if the 'PUT' message actually got + * to the DHT service! * * @param handle handle to DHT service * @param key the key to store under @@ -748,51 +1036,97 @@ timeout_put_request (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) * @param exp desired expiration time for the value * @param timeout how long to wait for transmission of this request * @param cont continuation to call when done (transmitting request to service) + * You must not call GNUNET_DHT_DISCONNECT in this continuation * @param cont_cls closure for cont */ -void +struct GNUNET_DHT_PutHandle * GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, enum GNUNET_BLOCK_Type type, size_t size, const char *data, struct GNUNET_TIME_Absolute exp, - struct GNUNET_TIME_Relative timeout, GNUNET_SCHEDULER_Task cont, + struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont, void *cont_cls) { struct GNUNET_DHT_ClientPutMessage *put_msg; size_t msize; struct PendingMessage *pending; + struct GNUNET_DHT_PutHandle *ph; msize = sizeof (struct GNUNET_DHT_ClientPutMessage) + size; if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)) { GNUNET_break (0); - if (NULL != cont) - cont (cont_cls, NULL); - return; + return NULL; } + ph = GNUNET_malloc (sizeof (struct GNUNET_DHT_PutHandle)); + ph->dht_handle = handle; + ph->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, ph); + ph->cont = cont; + ph->cont_cls = cont_cls; + ph->unique_id = ++handle->uid_gen; pending = GNUNET_malloc (sizeof (struct PendingMessage) + msize); + ph->pending = pending; put_msg = (struct GNUNET_DHT_ClientPutMessage *) &pending[1]; pending->msg = &put_msg->header; pending->handle = handle; - pending->cont = cont; - pending->cont_cls = cont_cls; + pending->cont = &mark_put_message_gone; + pending->cont_cls = ph; pending->free_on_send = GNUNET_YES; - pending->timeout_task = - GNUNET_SCHEDULER_add_delayed (timeout, &timeout_put_request, pending); put_msg->header.size = htons (msize); put_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT); put_msg->type = htonl (type); put_msg->options = htonl ((uint32_t) options); put_msg->desired_replication_level = htonl (desired_replication_level); + put_msg->unique_id = ph->unique_id; put_msg->expiration = GNUNET_TIME_absolute_hton (exp); put_msg->key = *key; memcpy (&put_msg[1], data, size); GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, pending); pending->in_pending_queue = GNUNET_YES; + GNUNET_CONTAINER_DLL_insert_tail (handle->put_head, + handle->put_tail, + ph); process_pending_messages (handle); + return ph; +} + + +/** + * Cancels a DHT PUT operation. Note that the PUT request may still + * go out over the network (we can't stop that); However, if the PUT + * has not yet been sent to the service, cancelling the PUT will stop + * this from happening (but there is no way for the user of this API + * to tell if that is the case). The only use for this API is to + * prevent a later call to 'cont' from "GNUNET_DHT_put" (i.e. because + * the system is shutting down). + * + * @param ph put operation to cancel ('cont' will no longer be called) + */ +void +GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph) +{ + struct GNUNET_DHT_Handle *handle = ph->dht_handle; + + if (NULL != ph->pending) + { + GNUNET_CONTAINER_DLL_remove (handle->pending_head, + handle->pending_tail, + ph->pending); + GNUNET_free (ph->pending); + ph->pending = NULL; + } + if (ph->timeout_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (ph->timeout_task); + ph->timeout_task = GNUNET_SCHEDULER_NO_TASK; + } + GNUNET_CONTAINER_DLL_remove (handle->put_head, + handle->put_tail, + ph); + GNUNET_free (ph); } @@ -801,7 +1135,6 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, * also "GNUNET_BLOCK_evaluate". * * @param handle handle to the DHT service - * @param timeout how long to wait for transmission of this request to the service * @param type expected type of the response object * @param key the key to look up * @param desired_replication_level estimate of how many @@ -815,7 +1148,6 @@ GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, */ struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, - struct GNUNET_TIME_Relative timeout, enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, const void *xquery, @@ -847,8 +1179,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, get_msg->desired_replication_level = htonl (desired_replication_level); get_msg->type = htonl (type); get_msg->key = *key; - handle->uid_gen++; - get_msg->unique_id = handle->uid_gen; + get_msg->unique_id = ++handle->uid_gen; memcpy (&get_msg[1], xquery, xquery_size); GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, pending); @@ -925,7 +1256,9 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) * @param handle Handle to the DHT service. * @param type Type of blocks that are of interest. * @param key Key of data of interest, NULL for all. - * @param cb Callback to process all monitored data. + * @param get_cb Callback to process monitored get messages. + * @param get_resp_cb Callback to process monitored get response messages. + * @param put_cb Callback to process monitored put messages. * @param cb_cls Closure for cb. * * @return Handle to stop monitoring. @@ -934,18 +1267,21 @@ struct GNUNET_DHT_MonitorHandle * GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, enum GNUNET_BLOCK_Type type, const GNUNET_HashCode *key, - GNUNET_DHT_MonitorCB cb, + GNUNET_DHT_MonitorGetCB get_cb, + GNUNET_DHT_MonitorGetRespCB get_resp_cb, + GNUNET_DHT_MonitorPutCB put_cb, void *cb_cls) { struct GNUNET_DHT_MonitorHandle *h; - struct GNUNET_DHT_MonitorMessage *m; + struct GNUNET_DHT_MonitorStartStopMessage *m; struct PendingMessage *pending; h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle)); GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h); - GNUNET_assert (NULL != cb); - h->cb = cb; + h->get_cb = get_cb; + h->get_resp_cb = get_resp_cb; + h->put_cb = put_cb; h->cb_cls = cb_cls; h->type = type; h->dht_handle = handle; @@ -955,17 +1291,22 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, memcpy (h->key, key, sizeof(GNUNET_HashCode)); } - pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) + + pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) + sizeof (struct PendingMessage)); - m = (struct GNUNET_DHT_MonitorMessage *) &pending[1]; + m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1]; pending->msg = &m->header; pending->handle = handle; pending->free_on_send = GNUNET_YES; - m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); - m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage)); + m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_START); + m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage)); m->type = htonl(type); - if (NULL != key) + m->get = htons(NULL != get_cb); + m->get_resp = htons(NULL != get_resp_cb); + m->put = htons(NULL != put_cb); + if (NULL != key) { + m->filter_key = htons(1); memcpy (&m->key, key, sizeof(GNUNET_HashCode)); + } GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, pending); pending->in_pending_queue = GNUNET_YES; @@ -985,10 +1326,36 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, void GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle) { - GNUNET_free_non_null (handle->key); + struct GNUNET_DHT_MonitorStartStopMessage *m; + struct PendingMessage *pending; + GNUNET_CONTAINER_DLL_remove (handle->dht_handle->monitor_head, handle->dht_handle->monitor_tail, handle); + + pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) + + sizeof (struct PendingMessage)); + m = (struct GNUNET_DHT_MonitorStartStopMessage *) &pending[1]; + pending->msg = &m->header; + pending->handle = handle->dht_handle; + pending->free_on_send = GNUNET_YES; + m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP); + m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartStopMessage)); + m->type = htonl(handle->type); + m->get = htons(NULL != handle->get_cb); + m->get_resp = htons(NULL != handle->get_resp_cb); + m->put = htons(NULL != handle->put_cb); + if (NULL != handle->key) { + m->filter_key = htons(1); + memcpy (&m->key, handle->key, sizeof(GNUNET_HashCode)); + } + GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head, + handle->dht_handle->pending_tail, + pending); + pending->in_pending_queue = GNUNET_YES; + process_pending_messages (handle->dht_handle); + + GNUNET_free_non_null (handle->key); GNUNET_free (handle); } |