diff options
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r-- | src/dht/dht_api.c | 198 |
1 files changed, 159 insertions, 39 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c index 420eacb..96ca6ab 100644 --- a/src/dht/dht_api.c +++ b/src/dht/dht_api.c @@ -173,15 +173,42 @@ struct GNUNET_DHT_GetHandle struct PendingMessage *message; /** + * Array of hash codes over the results that we have already + * seen. + */ + struct GNUNET_HashCode *seen_results; + + /** * Key that this get request is for */ - GNUNET_HashCode key; + struct GNUNET_HashCode key; /** * Unique identifier for this request (for key collisions). */ uint64_t unique_id; + /** + * Size of the 'seen_results' array. Note that not + * all positions might be used (as we over-allocate). + */ + unsigned int seen_results_size; + + /** + * Offset into the 'seen_results' array marking the + * end of the positions that are actually used. + */ + unsigned int seen_results_end; + + /** + * Offset into the 'seen_results' array marking the + * position up to where we've send the hash codes to + * the DHT for blocking (needed as we might not be + * able to send all hash codes at once). + */ + unsigned int seen_results_transmission_offset; + + }; @@ -213,7 +240,7 @@ struct GNUNET_DHT_MonitorHandle /** * Key being looked for, NULL == all. */ - GNUNET_HashCode *key; + struct GNUNET_HashCode *key; /** * Callback for each received message of type get. @@ -353,6 +380,50 @@ try_connect (struct GNUNET_DHT_Handle *handle) /** + * Queue messages to DHT to block certain results from the result set. + * + * @param get_handle GET to generate messages for. + */ +static void +queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle) +{ + struct PendingMessage *pm; + struct GNUNET_DHT_ClientGetResultSeenMessage *msg; + uint16_t msize; + unsigned int delta; + unsigned int max; + + while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end) + { + delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset; + max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); + if (delta > max) + delta = max; + msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode); + + pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); + msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1]; + pm->msg = &msg->header; + pm->handle = get_handle->dht_handle; + pm->unique_id = get_handle->unique_id; + pm->free_on_send = GNUNET_YES; + pm->in_pending_queue = GNUNET_YES; + msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN); + msg->header.size = htons (msize); + msg->key = get_handle->key; + msg->unique_id = get_handle->unique_id; + memcpy (&msg[1], + &get_handle->seen_results[get_handle->seen_results_transmission_offset], + sizeof (struct GNUNET_HashCode) * delta); + get_handle->seen_results_transmission_offset += delta; + GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head, + get_handle->dht_handle->pending_tail, + pm); + } +} + + +/** * Add the request corresponding to the given route handle * to the pending queue (if it is not already in there). * @@ -362,19 +433,21 @@ try_connect (struct GNUNET_DHT_Handle *handle) * @return GNUNET_YES (always) */ static int -add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value) +add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value) { struct GNUNET_DHT_Handle *handle = cls; - struct GNUNET_DHT_GetHandle *rh = value; + struct GNUNET_DHT_GetHandle *get_handle = value; - if (GNUNET_NO == rh->message->in_pending_queue) + if (GNUNET_NO == get_handle->message->in_pending_queue) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key), handle); + get_handle->seen_results_transmission_offset = 0; GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, - rh->message); - rh->message->in_pending_queue = GNUNET_YES; + get_handle->message); + queue_filter_messages (get_handle); + get_handle->message->in_pending_queue = GNUNET_YES; } return GNUNET_YES; } @@ -401,13 +474,7 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) struct GNUNET_DHT_Handle *handle = cls; LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle); - handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; - if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value) - handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY; - else - handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2); - if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value) - handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT; + handle->retry_time = GNUNET_TIME_STD_BACKOFF (handle->retry_time); handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK; if (GNUNET_YES != try_connect (handle)) { @@ -438,8 +505,9 @@ do_disconnect (struct GNUNET_DHT_Handle *handle) GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th); handle->th = NULL; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Disconnecting from DHT service, will try to reconnect in %llu ms\n", - (unsigned long long) handle->retry_time.rel_value); + "Disconnecting from DHT service, will try to reconnect in %s\n", + GNUNET_STRINGS_relative_time_to_string (handle->retry_time, + GNUNET_YES)); GNUNET_CLIENT_disconnect (handle->client); handle->client = NULL; @@ -574,15 +642,16 @@ transmit_pending (void *cls, size_t size, void *buf) * @param key query of the request * @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key * @return GNUNET_YES to continue to iterate over all results, - * GNUNET_NO if the reply is malformed + * GNUNET_NO if the reply is malformed or we found a matching request */ static int -process_reply (void *cls, const GNUNET_HashCode * key, void *value) +process_reply (void *cls, const struct GNUNET_HashCode * key, void *value) { const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls; struct GNUNET_DHT_GetHandle *get_handle = value; const struct GNUNET_PeerIdentity *put_path; const struct GNUNET_PeerIdentity *get_path; + struct GNUNET_HashCode hc; uint32_t put_path_length; uint32_t get_path_length; size_t data_length; @@ -619,11 +688,22 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value) put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1]; get_path = &put_path[put_path_length]; data = &get_path[get_path_length]; + /* remember that we've seen this result */ + GNUNET_CRYPTO_hash (data, data_length, &hc); + if (get_handle->seen_results_size == get_handle->seen_results_end) + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_size, + get_handle->seen_results_size * 2 + 1); + GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset); + get_handle->seen_results[get_handle->seen_results_end++] = hc; + /* no need to block it explicitly, service already knows about it! */ + get_handle->seen_results_transmission_offset++; + get_handle->iter (get_handle->iter_cls, GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key, get_path, get_path_length, put_path, put_path_length, ntohl (dht_msg->type), data_length, data); - return GNUNET_YES; + return GNUNET_NO; } /** @@ -648,7 +728,7 @@ process_monitor_get_message (struct GNUNET_DHT_Handle *handle, type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, - sizeof (GNUNET_HashCode))); + sizeof (struct GNUNET_HashCode))); if (type_ok && key_ok && (NULL != h->get_cb)) h->get_cb (h->cb_cls, ntohl (msg->options), @@ -699,7 +779,7 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle, type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, - sizeof (GNUNET_HashCode))); + sizeof (struct GNUNET_HashCode))); if (type_ok && key_ok && (NULL != h->get_resp_cb)) h->get_resp_cb (h->cb_cls, (enum GNUNET_BLOCK_Type) ntohl(msg->type), @@ -749,7 +829,7 @@ process_monitor_put_message (struct GNUNET_DHT_Handle *handle, type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type)); key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key, - sizeof (GNUNET_HashCode))); + sizeof (struct GNUNET_HashCode))); if (type_ok && key_ok && (NULL != h->put_cb)) h->put_cb (h->cb_cls, ntohl (msg->options), @@ -820,6 +900,8 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) do_disconnect (handle); return; } + GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, + GNUNET_TIME_UNIT_FOREVER_REL); ret = GNUNET_SYSERR; msize = ntohs (msg->size); switch (ntohs (msg->type)) @@ -861,13 +943,15 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) GNUNET_break (0); break; } - ret = GNUNET_OK; dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg; - LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n", - GNUNET_h2s (&dht_msg->key), handle); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Received reply for `%s' from DHT service %p\n", + GNUNET_h2s (&dht_msg->key), handle); GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests, - &dht_msg->key, &process_reply, - (void *) dht_msg); + &dht_msg->key, + &process_reply, + (void *) dht_msg); + ret = GNUNET_OK; break; case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK: if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage)) @@ -880,6 +964,9 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) break; default: GNUNET_break(0); + LOG (GNUNET_ERROR_TYPE_WARNING, + "Unknown DHT message type: %hu (%hu) size: %hu\n", + ntohs (msg->type), msg->type, msize); break; } if (GNUNET_OK != ret) @@ -888,8 +975,6 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg) do_disconnect (handle); return; } - GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle, - GNUNET_TIME_UNIT_FOREVER_REL); } @@ -912,7 +997,7 @@ GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg, handle->cfg = cfg; handle->uid_gen = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX); - handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len); + handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len, GNUNET_NO); if (GNUNET_NO == try_connect (handle)) { GNUNET_DHT_disconnect (handle); @@ -1040,10 +1125,10 @@ mark_put_message_gone (void *cls, * @param cont_cls closure for cont */ struct GNUNET_DHT_PutHandle * -GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key, +GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const struct GNUNET_HashCode * key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, - enum GNUNET_BLOCK_Type type, size_t size, const char *data, + enum GNUNET_BLOCK_Type type, size_t size, const void *data, struct GNUNET_TIME_Absolute exp, struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont, void *cont_cls) @@ -1148,7 +1233,7 @@ GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph) */ struct GNUNET_DHT_GetHandle * GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, - enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key, uint32_t desired_replication_level, enum GNUNET_DHT_RouteOption options, const void *xquery, size_t xquery_size, GNUNET_DHT_GetIterator iter, @@ -1185,6 +1270,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, pending); pending->in_pending_queue = GNUNET_YES; get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle)); + get_handle->dht_handle = handle; get_handle->iter = iter; get_handle->iter_cls = iter_cls; get_handle->message = pending; @@ -1196,6 +1282,38 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle, } + +/** + * Tell the DHT not to return any of the following known results + * to this client. + * + * @param get_handle get operation for which results should be filtered + * @param num_results number of results to be blocked that are + * provided in this call (size of the 'results' array) + * @param results array of hash codes over the 'data' of the results + * to be blocked + */ +void +GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle, + unsigned int num_results, + const struct GNUNET_HashCode *results) +{ + unsigned int needed; + + needed = get_handle->seen_results_end + num_results; + if (needed > get_handle->seen_results_size) + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_size, + needed); + memcpy (&get_handle->seen_results[get_handle->seen_results_end], + results, + num_results * sizeof (struct GNUNET_HashCode)); + get_handle->seen_results_end += num_results; + queue_filter_messages (get_handle); + process_pending_messages (get_handle->dht_handle); +} + + /** * Stop async DHT-get. * @@ -1244,8 +1362,10 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) get_handle->message->in_pending_queue = GNUNET_NO; } GNUNET_free (get_handle->message); + GNUNET_array_grow (get_handle->seen_results, + get_handle->seen_results_end, + 0); GNUNET_free (get_handle); - process_pending_messages (handle); } @@ -1266,7 +1386,7 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle) struct GNUNET_DHT_MonitorHandle * GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, enum GNUNET_BLOCK_Type type, - const GNUNET_HashCode *key, + const struct GNUNET_HashCode *key, GNUNET_DHT_MonitorGetCB get_cb, GNUNET_DHT_MonitorGetRespCB get_resp_cb, GNUNET_DHT_MonitorPutCB put_cb, @@ -1287,8 +1407,8 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, h->dht_handle = handle; if (NULL != key) { - h->key = GNUNET_malloc (sizeof(GNUNET_HashCode)); - memcpy (h->key, key, sizeof(GNUNET_HashCode)); + h->key = GNUNET_malloc (sizeof(struct GNUNET_HashCode)); + memcpy (h->key, key, sizeof(struct GNUNET_HashCode)); } pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) + @@ -1305,7 +1425,7 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle, m->put = htons(NULL != put_cb); if (NULL != key) { m->filter_key = htons(1); - memcpy (&m->key, key, sizeof(GNUNET_HashCode)); + memcpy (&m->key, key, sizeof(struct GNUNET_HashCode)); } GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail, pending); @@ -1347,7 +1467,7 @@ GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle) m->put = htons(NULL != handle->put_cb); if (NULL != handle->key) { m->filter_key = htons(1); - memcpy (&m->key, handle->key, sizeof(GNUNET_HashCode)); + memcpy (&m->key, handle->key, sizeof(struct GNUNET_HashCode)); } GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head, handle->dht_handle->pending_tail, |