aboutsummaryrefslogtreecommitdiff
path: root/src/dht/dht_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r--src/dht/dht_api.c198
1 files changed, 159 insertions, 39 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 420eacb..96ca6ab 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -173,15 +173,42 @@ struct GNUNET_DHT_GetHandle
struct PendingMessage *message;
/**
+ * Array of hash codes over the results that we have already
+ * seen.
+ */
+ struct GNUNET_HashCode *seen_results;
+
+ /**
* Key that this get request is for
*/
- GNUNET_HashCode key;
+ struct GNUNET_HashCode key;
/**
* Unique identifier for this request (for key collisions).
*/
uint64_t unique_id;
+ /**
+ * Size of the 'seen_results' array. Note that not
+ * all positions might be used (as we over-allocate).
+ */
+ unsigned int seen_results_size;
+
+ /**
+ * Offset into the 'seen_results' array marking the
+ * end of the positions that are actually used.
+ */
+ unsigned int seen_results_end;
+
+ /**
+ * Offset into the 'seen_results' array marking the
+ * position up to where we've send the hash codes to
+ * the DHT for blocking (needed as we might not be
+ * able to send all hash codes at once).
+ */
+ unsigned int seen_results_transmission_offset;
+
+
};
@@ -213,7 +240,7 @@ struct GNUNET_DHT_MonitorHandle
/**
* Key being looked for, NULL == all.
*/
- GNUNET_HashCode *key;
+ struct GNUNET_HashCode *key;
/**
* Callback for each received message of type get.
@@ -353,6 +380,50 @@ try_connect (struct GNUNET_DHT_Handle *handle)
/**
+ * Queue messages to DHT to block certain results from the result set.
+ *
+ * @param get_handle GET to generate messages for.
+ */
+static void
+queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle)
+{
+ struct PendingMessage *pm;
+ struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
+ uint16_t msize;
+ unsigned int delta;
+ unsigned int max;
+
+ while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end)
+ {
+ delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset;
+ max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
+ if (delta > max)
+ delta = max;
+ msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode);
+
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+ msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1];
+ pm->msg = &msg->header;
+ pm->handle = get_handle->dht_handle;
+ pm->unique_id = get_handle->unique_id;
+ pm->free_on_send = GNUNET_YES;
+ pm->in_pending_queue = GNUNET_YES;
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
+ msg->header.size = htons (msize);
+ msg->key = get_handle->key;
+ msg->unique_id = get_handle->unique_id;
+ memcpy (&msg[1],
+ &get_handle->seen_results[get_handle->seen_results_transmission_offset],
+ sizeof (struct GNUNET_HashCode) * delta);
+ get_handle->seen_results_transmission_offset += delta;
+ GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head,
+ get_handle->dht_handle->pending_tail,
+ pm);
+ }
+}
+
+
+/**
* Add the request corresponding to the given route handle
* to the pending queue (if it is not already in there).
*
@@ -362,19 +433,21 @@ try_connect (struct GNUNET_DHT_Handle *handle)
* @return GNUNET_YES (always)
*/
static int
-add_request_to_pending (void *cls, const GNUNET_HashCode * key, void *value)
+add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value)
{
struct GNUNET_DHT_Handle *handle = cls;
- struct GNUNET_DHT_GetHandle *rh = value;
+ struct GNUNET_DHT_GetHandle *get_handle = value;
- if (GNUNET_NO == rh->message->in_pending_queue)
+ if (GNUNET_NO == get_handle->message->in_pending_queue)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
handle);
+ get_handle->seen_results_transmission_offset = 0;
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
- rh->message);
- rh->message->in_pending_queue = GNUNET_YES;
+ get_handle->message);
+ queue_filter_messages (get_handle);
+ get_handle->message->in_pending_queue = GNUNET_YES;
}
return GNUNET_YES;
}
@@ -401,13 +474,7 @@ try_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
struct GNUNET_DHT_Handle *handle = cls;
LOG (GNUNET_ERROR_TYPE_DEBUG, "Reconnecting with DHT %p\n", handle);
- handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- if (handle->retry_time.rel_value < GNUNET_CONSTANTS_SERVICE_RETRY.rel_value)
- handle->retry_time = GNUNET_CONSTANTS_SERVICE_RETRY;
- else
- handle->retry_time = GNUNET_TIME_relative_multiply (handle->retry_time, 2);
- if (handle->retry_time.rel_value > GNUNET_CONSTANTS_SERVICE_TIMEOUT.rel_value)
- handle->retry_time = GNUNET_CONSTANTS_SERVICE_TIMEOUT;
+ handle->retry_time = GNUNET_TIME_STD_BACKOFF (handle->retry_time);
handle->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
if (GNUNET_YES != try_connect (handle))
{
@@ -438,8 +505,9 @@ do_disconnect (struct GNUNET_DHT_Handle *handle)
GNUNET_CLIENT_notify_transmit_ready_cancel (handle->th);
handle->th = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Disconnecting from DHT service, will try to reconnect in %llu ms\n",
- (unsigned long long) handle->retry_time.rel_value);
+ "Disconnecting from DHT service, will try to reconnect in %s\n",
+ GNUNET_STRINGS_relative_time_to_string (handle->retry_time,
+ GNUNET_YES));
GNUNET_CLIENT_disconnect (handle->client);
handle->client = NULL;
@@ -574,15 +642,16 @@ transmit_pending (void *cls, size_t size, void *buf)
* @param key query of the request
* @param value the 'struct GNUNET_DHT_RouteHandle' of a request matching the same key
* @return GNUNET_YES to continue to iterate over all results,
- * GNUNET_NO if the reply is malformed
+ * GNUNET_NO if the reply is malformed or we found a matching request
*/
static int
-process_reply (void *cls, const GNUNET_HashCode * key, void *value)
+process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
{
const struct GNUNET_DHT_ClientResultMessage *dht_msg = cls;
struct GNUNET_DHT_GetHandle *get_handle = value;
const struct GNUNET_PeerIdentity *put_path;
const struct GNUNET_PeerIdentity *get_path;
+ struct GNUNET_HashCode hc;
uint32_t put_path_length;
uint32_t get_path_length;
size_t data_length;
@@ -619,11 +688,22 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
get_path = &put_path[put_path_length];
data = &get_path[get_path_length];
+ /* remember that we've seen this result */
+ GNUNET_CRYPTO_hash (data, data_length, &hc);
+ if (get_handle->seen_results_size == get_handle->seen_results_end)
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_size,
+ get_handle->seen_results_size * 2 + 1);
+ GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset);
+ get_handle->seen_results[get_handle->seen_results_end++] = hc;
+ /* no need to block it explicitly, service already knows about it! */
+ get_handle->seen_results_transmission_offset++;
+
get_handle->iter (get_handle->iter_cls,
GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
get_path, get_path_length, put_path, put_path_length,
ntohl (dht_msg->type), data_length, data);
- return GNUNET_YES;
+ return GNUNET_NO;
}
/**
@@ -648,7 +728,7 @@ process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
- sizeof (GNUNET_HashCode)));
+ sizeof (struct GNUNET_HashCode)));
if (type_ok && key_ok && (NULL != h->get_cb))
h->get_cb (h->cb_cls,
ntohl (msg->options),
@@ -699,7 +779,7 @@ process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
- sizeof (GNUNET_HashCode)));
+ sizeof (struct GNUNET_HashCode)));
if (type_ok && key_ok && (NULL != h->get_resp_cb))
h->get_resp_cb (h->cb_cls,
(enum GNUNET_BLOCK_Type) ntohl(msg->type),
@@ -749,7 +829,7 @@ process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
type_ok = (GNUNET_BLOCK_TYPE_ANY == h->type) || (h->type == ntohl(msg->type));
key_ok = (NULL == h->key) || (0 == memcmp (h->key, &msg->key,
- sizeof (GNUNET_HashCode)));
+ sizeof (struct GNUNET_HashCode)));
if (type_ok && key_ok && (NULL != h->put_cb))
h->put_cb (h->cb_cls,
ntohl (msg->options),
@@ -820,6 +900,8 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
do_disconnect (handle);
return;
}
+ GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
+ GNUNET_TIME_UNIT_FOREVER_REL);
ret = GNUNET_SYSERR;
msize = ntohs (msg->size);
switch (ntohs (msg->type))
@@ -861,13 +943,15 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
GNUNET_break (0);
break;
}
- ret = GNUNET_OK;
dht_msg = (const struct GNUNET_DHT_ClientResultMessage *) msg;
- LOG (GNUNET_ERROR_TYPE_DEBUG, "Received reply for `%s' from DHT service %p\n",
- GNUNET_h2s (&dht_msg->key), handle);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Received reply for `%s' from DHT service %p\n",
+ GNUNET_h2s (&dht_msg->key), handle);
GNUNET_CONTAINER_multihashmap_get_multiple (handle->active_requests,
- &dht_msg->key, &process_reply,
- (void *) dht_msg);
+ &dht_msg->key,
+ &process_reply,
+ (void *) dht_msg);
+ ret = GNUNET_OK;
break;
case GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK:
if (ntohs (msg->size) != sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage))
@@ -880,6 +964,9 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
break;
default:
GNUNET_break(0);
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Unknown DHT message type: %hu (%hu) size: %hu\n",
+ ntohs (msg->type), msg->type, msize);
break;
}
if (GNUNET_OK != ret)
@@ -888,8 +975,6 @@ service_message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
do_disconnect (handle);
return;
}
- GNUNET_CLIENT_receive (handle->client, &service_message_handler, handle,
- GNUNET_TIME_UNIT_FOREVER_REL);
}
@@ -912,7 +997,7 @@ GNUNET_DHT_connect (const struct GNUNET_CONFIGURATION_Handle *cfg,
handle->cfg = cfg;
handle->uid_gen =
GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
- handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len);
+ handle->active_requests = GNUNET_CONTAINER_multihashmap_create (ht_len, GNUNET_NO);
if (GNUNET_NO == try_connect (handle))
{
GNUNET_DHT_disconnect (handle);
@@ -1040,10 +1125,10 @@ mark_put_message_gone (void *cls,
* @param cont_cls closure for cont
*/
struct GNUNET_DHT_PutHandle *
-GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const GNUNET_HashCode * key,
+GNUNET_DHT_put (struct GNUNET_DHT_Handle *handle, const struct GNUNET_HashCode * key,
uint32_t desired_replication_level,
enum GNUNET_DHT_RouteOption options,
- enum GNUNET_BLOCK_Type type, size_t size, const char *data,
+ enum GNUNET_BLOCK_Type type, size_t size, const void *data,
struct GNUNET_TIME_Absolute exp,
struct GNUNET_TIME_Relative timeout, GNUNET_DHT_PutContinuation cont,
void *cont_cls)
@@ -1148,7 +1233,7 @@ GNUNET_DHT_put_cancel (struct GNUNET_DHT_PutHandle *ph)
*/
struct GNUNET_DHT_GetHandle *
GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
- enum GNUNET_BLOCK_Type type, const GNUNET_HashCode * key,
+ enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key,
uint32_t desired_replication_level,
enum GNUNET_DHT_RouteOption options, const void *xquery,
size_t xquery_size, GNUNET_DHT_GetIterator iter,
@@ -1185,6 +1270,7 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
pending);
pending->in_pending_queue = GNUNET_YES;
get_handle = GNUNET_malloc (sizeof (struct GNUNET_DHT_GetHandle));
+ get_handle->dht_handle = handle;
get_handle->iter = iter;
get_handle->iter_cls = iter_cls;
get_handle->message = pending;
@@ -1196,6 +1282,38 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
}
+
+/**
+ * Tell the DHT not to return any of the following known results
+ * to this client.
+ *
+ * @param get_handle get operation for which results should be filtered
+ * @param num_results number of results to be blocked that are
+ * provided in this call (size of the 'results' array)
+ * @param results array of hash codes over the 'data' of the results
+ * to be blocked
+ */
+void
+GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
+ unsigned int num_results,
+ const struct GNUNET_HashCode *results)
+{
+ unsigned int needed;
+
+ needed = get_handle->seen_results_end + num_results;
+ if (needed > get_handle->seen_results_size)
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_size,
+ needed);
+ memcpy (&get_handle->seen_results[get_handle->seen_results_end],
+ results,
+ num_results * sizeof (struct GNUNET_HashCode));
+ get_handle->seen_results_end += num_results;
+ queue_filter_messages (get_handle);
+ process_pending_messages (get_handle->dht_handle);
+}
+
+
/**
* Stop async DHT-get.
*
@@ -1244,8 +1362,10 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
get_handle->message->in_pending_queue = GNUNET_NO;
}
GNUNET_free (get_handle->message);
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_end,
+ 0);
GNUNET_free (get_handle);
-
process_pending_messages (handle);
}
@@ -1266,7 +1386,7 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
struct GNUNET_DHT_MonitorHandle *
GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
enum GNUNET_BLOCK_Type type,
- const GNUNET_HashCode *key,
+ const struct GNUNET_HashCode *key,
GNUNET_DHT_MonitorGetCB get_cb,
GNUNET_DHT_MonitorGetRespCB get_resp_cb,
GNUNET_DHT_MonitorPutCB put_cb,
@@ -1287,8 +1407,8 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
h->dht_handle = handle;
if (NULL != key)
{
- h->key = GNUNET_malloc (sizeof(GNUNET_HashCode));
- memcpy (h->key, key, sizeof(GNUNET_HashCode));
+ h->key = GNUNET_malloc (sizeof(struct GNUNET_HashCode));
+ memcpy (h->key, key, sizeof(struct GNUNET_HashCode));
}
pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartStopMessage) +
@@ -1305,7 +1425,7 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
m->put = htons(NULL != put_cb);
if (NULL != key) {
m->filter_key = htons(1);
- memcpy (&m->key, key, sizeof(GNUNET_HashCode));
+ memcpy (&m->key, key, sizeof(struct GNUNET_HashCode));
}
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
pending);
@@ -1347,7 +1467,7 @@ GNUNET_DHT_monitor_stop (struct GNUNET_DHT_MonitorHandle *handle)
m->put = htons(NULL != handle->put_cb);
if (NULL != handle->key) {
m->filter_key = htons(1);
- memcpy (&m->key, handle->key, sizeof(GNUNET_HashCode));
+ memcpy (&m->key, handle->key, sizeof(struct GNUNET_HashCode));
}
GNUNET_CONTAINER_DLL_insert (handle->dht_handle->pending_head,
handle->dht_handle->pending_tail,