aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorgrothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96>2012-11-10 21:21:01 +0000
committergrothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96>2012-11-10 21:21:01 +0000
commitb719a21195de43cb31fa793b2120d7f2f427deb9 (patch)
tree9dbdac6a4cabe0367bffd864c69c0ac8358a2c45
parent8ab3e8b96cc9315a90ff83b1e3240ef07630aed0 (diff)
-implementing #2435
git-svn-id: https://gnunet.org/svn/gnunet@24891 140774ce-b5e7-0310-ab8b-a85725594a96
-rw-r--r--src/dht/dht.h35
-rw-r--r--src/dht/dht_api.c131
-rw-r--r--src/dht/gnunet-service-dht_clients.c103
-rw-r--r--src/include/gnunet_dht_service.h24
-rw-r--r--src/include/gnunet_protocols.h5
5 files changed, 284 insertions, 14 deletions
diff --git a/src/dht/dht.h b/src/dht/dht.h
index 772471a7c4..f736c8d757 100644
--- a/src/dht/dht.h
+++ b/src/dht/dht.h
@@ -109,6 +109,39 @@ struct GNUNET_DHT_ClientGetMessage
/**
+ * DHT GET RESULTS KNOWN message sent from clients to service. Indicates that a GET
+ * request should exclude certain results which are already known.
+ */
+struct GNUNET_DHT_ClientGetResultSeenMessage
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Reserved, always 0.
+ */
+ uint32_t reserved GNUNET_PACKED;
+
+ /**
+ * The key we are searching for (to make it easy to find the corresponding
+ * GET inside the service).
+ */
+ struct GNUNET_HashCode key;
+
+ /**
+ * Unique ID identifying this request.
+ */
+ uint64_t unique_id GNUNET_PACKED;
+
+ /* Followed by an array of the hash codes of known results */
+
+};
+
+
+
+/**
* Reply to a GET send from the service to a client.
*/
struct GNUNET_DHT_ClientResultMessage
@@ -325,7 +358,7 @@ struct GNUNET_DHT_MonitorStartStopMessage
struct GNUNET_DHT_MonitorGetMessage
{
/**
- * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT
+ * Type: GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET
*/
struct GNUNET_MessageHeader header;
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 7964ba98f9..f469007784 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -173,15 +173,42 @@ struct GNUNET_DHT_GetHandle
struct PendingMessage *message;
/**
+ * Array of hash codes over the results that we have already
+ * seen.
+ */
+ struct GNUNET_HashCode *seen_results;
+
+ /**
* Key that this get request is for
*/
- struct GNUNET_HashCode key;
+ struct GNUNET_HashCode key;
/**
* Unique identifier for this request (for key collisions).
*/
uint64_t unique_id;
+ /**
+ * Size of the 'seen_results' array. Note that not
+ * all positions might be used (as we over-allocate).
+ */
+ unsigned int seen_results_size;
+
+ /**
+ * Offset into the 'seen_results' array marking the
+ * end of the positions that are actually used.
+ */
+ unsigned int seen_results_end;
+
+ /**
+ * Offset into the 'seen_results' array marking the
+ * position up to where we've send the hash codes to
+ * the DHT for blocking (needed as we might not be
+ * able to send all hash codes at once).
+ */
+ unsigned int seen_results_transmission_offset;
+
+
};
@@ -353,6 +380,50 @@ try_connect (struct GNUNET_DHT_Handle *handle)
/**
+ * Queue messages to DHT to block certain results from the result set.
+ *
+ * @param get_handle GET to generate messages for.
+ */
+static void
+queue_filter_messages (struct GNUNET_DHT_GetHandle *get_handle)
+{
+ struct PendingMessage *pm;
+ struct GNUNET_DHT_ClientGetResultSeenMessage *msg;
+ uint16_t msize;
+ unsigned int delta;
+ unsigned int max;
+
+ while (get_handle->seen_results_transmission_offset < get_handle->seen_results_end)
+ {
+ delta = get_handle->seen_results_end - get_handle->seen_results_transmission_offset;
+ max = (GNUNET_SERVER_MAX_MESSAGE_SIZE - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
+ if (delta > max)
+ delta = max;
+ msize = sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + delta * sizeof (struct GNUNET_HashCode);
+
+ pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+ msg = (struct GNUNET_DHT_ClientGetResultSeenMessage *) &pm[1];
+ pm->msg = &msg->header;
+ pm->handle = get_handle->dht_handle;
+ pm->unique_id = get_handle->unique_id;
+ pm->free_on_send = GNUNET_YES;
+ pm->in_pending_queue = GNUNET_YES;
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN);
+ msg->header.size = htons (msize);
+ msg->key = get_handle->key;
+ msg->unique_id = get_handle->unique_id;
+ memcpy (&msg[1],
+ &get_handle->seen_results[get_handle->seen_results_transmission_offset],
+ sizeof (struct GNUNET_HashCode) * delta);
+ get_handle->seen_results_transmission_offset += delta;
+ GNUNET_CONTAINER_DLL_insert_tail (get_handle->dht_handle->pending_head,
+ get_handle->dht_handle->pending_tail,
+ pm);
+ }
+}
+
+
+/**
* Add the request corresponding to the given route handle
* to the pending queue (if it is not already in there).
*
@@ -365,16 +436,18 @@ static int
add_request_to_pending (void *cls, const struct GNUNET_HashCode * key, void *value)
{
struct GNUNET_DHT_Handle *handle = cls;
- struct GNUNET_DHT_GetHandle *rh = value;
+ struct GNUNET_DHT_GetHandle *get_handle = value;
- if (GNUNET_NO == rh->message->in_pending_queue)
+ if (GNUNET_NO == get_handle->message->in_pending_queue)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Retransmitting request related to %s to DHT %p\n", GNUNET_h2s (key),
handle);
+ get_handle->seen_results_transmission_offset = 0;
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
- rh->message);
- rh->message->in_pending_queue = GNUNET_YES;
+ get_handle->message);
+ queue_filter_messages (get_handle);
+ get_handle->message->in_pending_queue = GNUNET_YES;
}
return GNUNET_YES;
}
@@ -578,6 +651,7 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
struct GNUNET_DHT_GetHandle *get_handle = value;
const struct GNUNET_PeerIdentity *put_path;
const struct GNUNET_PeerIdentity *get_path;
+ struct GNUNET_HashCode hc;
uint32_t put_path_length;
uint32_t get_path_length;
size_t data_length;
@@ -614,6 +688,17 @@ process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
put_path = (const struct GNUNET_PeerIdentity *) &dht_msg[1];
get_path = &put_path[put_path_length];
data = &get_path[get_path_length];
+ /* remember that we've seen this result */
+ GNUNET_CRYPTO_hash (data, data_length, &hc);
+ if (get_handle->seen_results_size == get_handle->seen_results_end)
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_size,
+ get_handle->seen_results_size * 2 + 1);
+ GNUNET_assert (get_handle->seen_results_end == get_handle->seen_results_transmission_offset);
+ get_handle->seen_results[get_handle->seen_results_end++] = hc;
+ /* no need to block it explicitly, service already knows about it! */
+ get_handle->seen_results_transmission_offset++;
+
get_handle->iter (get_handle->iter_cls,
GNUNET_TIME_absolute_ntoh (dht_msg->expiration), key,
get_path, get_path_length, put_path, put_path_length,
@@ -1194,6 +1279,38 @@ GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
}
+
+/**
+ * Tell the DHT not to return any of the following known results
+ * to this client.
+ *
+ * @param get_handle get operation for which results should be filtered
+ * @param num_results number of results to be blocked that are
+ * provided in this call (size of the 'results' array)
+ * @param results array of hash codes over the 'data' of the results
+ * to be blocked
+ */
+void
+GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
+ unsigned int num_results,
+ const struct GNUNET_HashCode *results)
+{
+ unsigned int needed;
+
+ needed = get_handle->seen_results_end + num_results;
+ if (needed > get_handle->seen_results_size)
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_size,
+ needed);
+ memcpy (&get_handle->seen_results[get_handle->seen_results_end],
+ results,
+ num_results * sizeof (struct GNUNET_HashCode));
+ get_handle->seen_results_end += num_results;
+ queue_filter_messages (get_handle);
+ process_pending_messages (get_handle->dht_handle);
+}
+
+
/**
* Stop async DHT-get.
*
@@ -1242,8 +1359,10 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
get_handle->message->in_pending_queue = GNUNET_NO;
}
GNUNET_free (get_handle->message);
+ GNUNET_array_grow (get_handle->seen_results,
+ get_handle->seen_results_end,
+ 0);
GNUNET_free (get_handle);
-
process_pending_messages (handle);
}
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c
index 1d2e1e9bbb..5936745698 100644
--- a/src/dht/gnunet-service-dht_clients.c
+++ b/src/dht/gnunet-service-dht_clients.c
@@ -551,9 +551,7 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
/**
- * Handler for any generic DHT messages, calls the appropriate handler
- * depending on message type, sends confirmation if responses aren't otherwise
- * expected.
+ * Handler for DHT GET messages from the client.
*
* @param cls closure for the service
* @param client the client we received this message from
@@ -621,6 +619,103 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client,
/**
+ * Closure for 'find_by_unique_id'.
+ */
+struct FindByUniqueIdContext
+{
+ /**
+ * Where to store the result, if found.
+ */
+ struct ClientQueryRecord *cqr;
+
+ uint64_t unique_id;
+};
+
+
+/**
+ * Function called for each existing DHT record for the given
+ * query. Checks if it matches the UID given in the closure
+ * and if so returns the entry as a result.
+ *
+ * @param cls the search context
+ * @param key query for the lookup (not used)
+ * @param value the 'struct ClientQueryRecord'
+ * @return GNUNET_YES to continue iteration (result not yet found)
+ */
+static int
+find_by_unique_id (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct FindByUniqueIdContext *fui_ctx = cls;
+ struct ClientQueryRecord *cqr = value;
+
+ if (cqr->unique_id != fui_ctx->unique_id)
+ return GNUNET_YES;
+ fui_ctx->cqr = cqr;
+ return GNUNET_NO;
+}
+
+
+/**
+ * Handler for "GET result seen" messages from the client.
+ *
+ * @param cls closure for the service
+ * @param client the client we received this message from
+ * @param message the actual message received
+ */
+static void
+handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ const struct GNUNET_DHT_ClientGetResultSeenMessage *seen;
+ uint16_t size;
+ unsigned int hash_count;
+ unsigned int old_count;
+ const struct GNUNET_HashCode *hc;
+ struct FindByUniqueIdContext fui_ctx;
+ struct ClientQueryRecord *cqr;
+
+ size = ntohs (message->size);
+ if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message;
+ hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode);
+ if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ hc = (const struct GNUNET_HashCode*) &seen[1];
+ fui_ctx.unique_id = seen->unique_id;
+ fui_ctx.cqr = NULL;
+ GNUNET_CONTAINER_multihashmap_get_multiple (forward_map,
+ &seen->key,
+ &find_by_unique_id,
+ &fui_ctx);
+ if (NULL == (cqr = fui_ctx.cqr))
+ {
+ GNUNET_break (0);
+ GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
+ return;
+ }
+ /* finally, update 'seen' list */
+ old_count = cqr->seen_replies_count;
+ GNUNET_array_grow (cqr->seen_replies,
+ cqr->seen_replies_count,
+ cqr->seen_replies_count + hash_count);
+ memcpy (&cqr->seen_replies[old_count],
+ hc,
+ sizeof (struct GNUNET_HashCode) * hash_count);
+}
+
+
+/**
* Closure for 'remove_by_unique_id'.
*/
struct RemoveByUniqueIdContext
@@ -1350,6 +1445,8 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server)
{&handle_dht_local_monitor_stop, NULL,
GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
+ {&handle_dht_local_get_result_seen, NULL,
+ GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0},
{NULL, NULL, 0, 0}
};
forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO);
diff --git a/src/include/gnunet_dht_service.h b/src/include/gnunet_dht_service.h
index 89f42acb48..0de7551a45 100644
--- a/src/include/gnunet_dht_service.h
+++ b/src/include/gnunet_dht_service.h
@@ -236,14 +236,30 @@ typedef void (*GNUNET_DHT_GetIterator) (void *cls,
*/
struct GNUNET_DHT_GetHandle *
GNUNET_DHT_get_start (struct GNUNET_DHT_Handle *handle,
- enum GNUNET_BLOCK_Type type, const struct GNUNET_HashCode * key,
+ enum GNUNET_BLOCK_Type type,
+ const struct GNUNET_HashCode *key,
uint32_t desired_replication_level,
- enum GNUNET_DHT_RouteOption options, const void *xquery,
- size_t xquery_size, GNUNET_DHT_GetIterator iter,
- void *iter_cls);
+ enum GNUNET_DHT_RouteOption options,
+ const void *xquery, size_t xquery_size,
+ GNUNET_DHT_GetIterator iter, void *iter_cls);
/**
+ * Tell the DHT not to return any of the following known results
+ * to this client.
+ *
+ * @param get_handle get operation for which results should be filtered
+ * @param num_results number of results to be blocked that are
+ * provided in this call (size of the 'results' array)
+ * @param results array of hash codes over the 'data' of the results
+ * to be blocked
+ */
+void
+GNUNET_DHT_get_filter_known_results (struct GNUNET_DHT_GetHandle *get_handle,
+ unsigned int num_results,
+ const struct GNUNET_HashCode *results);
+
+/**
* Stop async DHT-get. Frees associated resources.
*
* @param get_handle GET operation to stop.
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index df123ceb19..8abaf35b40 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -558,6 +558,11 @@ extern "C"
*/
#define GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK 155
+/**
+ * Certain results are already known to the client, filter those.
+ */
+#define GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN 156
+
/*******************************************************************************
* HOSTLIST message types