diff options
Diffstat (limited to 'src/dht/gnunet-service-dht_clients.c')
-rw-r--r-- | src/dht/gnunet-service-dht_clients.c | 190 |
1 files changed, 155 insertions, 35 deletions
diff --git a/src/dht/gnunet-service-dht_clients.c b/src/dht/gnunet-service-dht_clients.c index d897d1f..3d9d5be 100644 --- a/src/dht/gnunet-service-dht_clients.c +++ b/src/dht/gnunet-service-dht_clients.c @@ -37,6 +37,12 @@ /** + * Should routing details be logged to stderr (for debugging)? + */ +#define LOG_ROUTE_DETAILS_STDERR GNUNET_NO + + +/** * Linked list of messages to send to clients. */ struct PendingMessage @@ -111,7 +117,7 @@ struct ClientQueryRecord /** * The key this request was about */ - GNUNET_HashCode key; + struct GNUNET_HashCode key; /** * Client responsible for the request. @@ -126,7 +132,7 @@ struct ClientQueryRecord /** * Replies we have already seen for this request. */ - GNUNET_HashCode *seen_replies; + struct GNUNET_HashCode *seen_replies; /** * Pointer to this nodes heap location in the retry-heap (for fast removal) @@ -201,7 +207,7 @@ struct ClientMonitorRecord /** * Key of data of interest, NULL for all. */ - GNUNET_HashCode *key; + struct GNUNET_HashCode *key; /** * Flag whether to notify about GET messages. @@ -322,7 +328,7 @@ find_active_client (struct GNUNET_SERVER_Client *client) * @return GNUNET_YES (we should continue to iterate) */ static int -remove_client_records (void *cls, const GNUNET_HashCode * key, void *value) +remove_client_records (void *cls, const struct GNUNET_HashCode * key, void *value) { struct ClientList *client = cls; struct ClientQueryRecord *record = value; @@ -423,10 +429,7 @@ transmit_request (struct ClientQueryRecord *cqr) GNUNET_CONTAINER_bloomfilter_free (peer_bf); /* exponential back-off for retries, max 1h */ - cqr->retry_frequency = - GNUNET_TIME_relative_min (GNUNET_TIME_UNIT_HOURS, - GNUNET_TIME_relative_multiply - (cqr->retry_frequency, 2)); + cqr->retry_frequency = GNUNET_TIME_STD_BACKOFF (cqr->retry_frequency); cqr->retry_time = GNUNET_TIME_relative_to_absolute (cqr->retry_frequency); } @@ -551,9 +554,7 @@ handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, /** - * Handler for any generic DHT messages, calls the appropriate handler - * depending on message type, sends confirmation if responses aren't otherwise - * expected. + * Handler for DHT GET messages from the client. * * @param cls closure for the service * @param client the client we received this message from @@ -586,19 +587,30 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received request for %s from local client %p\n", GNUNET_h2s (&get->key), client); + + if (LOG_ROUTE_DETAILS_STDERR) + { + fprintf (stderr, + "XDHT CLIENT-GET %s @ %u\n", + GNUNET_h2s (&get->key), + getpid ()); + } + + cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); cqr->key = get->key; cqr->client = find_active_client (client); cqr->xquery = (void *) &cqr[1]; memcpy (&cqr[1], xquery, xquery_size); cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); - cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS; + cqr->retry_frequency = GNUNET_TIME_UNIT_SECONDS; cqr->retry_time = GNUNET_TIME_absolute_get (); cqr->unique_id = get->unique_id; cqr->xquery_size = xquery_size; cqr->replication = ntohl (get->desired_replication_level); cqr->msg_options = ntohl (get->options); cqr->type = ntohl (get->type); + // FIXME use cqr->key, set multihashmap create to GNUNET_YES GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GDS_CLIENTS_process_get (ntohl (get->options), @@ -620,6 +632,103 @@ handle_dht_local_get (void *cls, struct GNUNET_SERVER_Client *client, /** + * Closure for 'find_by_unique_id'. + */ +struct FindByUniqueIdContext +{ + /** + * Where to store the result, if found. + */ + struct ClientQueryRecord *cqr; + + uint64_t unique_id; +}; + + +/** + * Function called for each existing DHT record for the given + * query. Checks if it matches the UID given in the closure + * and if so returns the entry as a result. + * + * @param cls the search context + * @param key query for the lookup (not used) + * @param value the 'struct ClientQueryRecord' + * @return GNUNET_YES to continue iteration (result not yet found) + */ +static int +find_by_unique_id (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct FindByUniqueIdContext *fui_ctx = cls; + struct ClientQueryRecord *cqr = value; + + if (cqr->unique_id != fui_ctx->unique_id) + return GNUNET_YES; + fui_ctx->cqr = cqr; + return GNUNET_NO; +} + + +/** + * Handler for "GET result seen" messages from the client. + * + * @param cls closure for the service + * @param client the client we received this message from + * @param message the actual message received + */ +static void +handle_dht_local_get_result_seen (void *cls, struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + const struct GNUNET_DHT_ClientGetResultSeenMessage *seen; + uint16_t size; + unsigned int hash_count; + unsigned int old_count; + const struct GNUNET_HashCode *hc; + struct FindByUniqueIdContext fui_ctx; + struct ClientQueryRecord *cqr; + + size = ntohs (message->size); + if (size < sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + seen = (const struct GNUNET_DHT_ClientGetResultSeenMessage *) message; + hash_count = (size - sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage)) / sizeof (struct GNUNET_HashCode); + if (size != sizeof (struct GNUNET_DHT_ClientGetResultSeenMessage) + hash_count * sizeof (struct GNUNET_HashCode)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + hc = (const struct GNUNET_HashCode*) &seen[1]; + fui_ctx.unique_id = seen->unique_id; + fui_ctx.cqr = NULL; + GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, + &seen->key, + &find_by_unique_id, + &fui_ctx); + if (NULL == (cqr = fui_ctx.cqr)) + { + GNUNET_break (0); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + return; + } + /* finally, update 'seen' list */ + old_count = cqr->seen_replies_count; + GNUNET_array_grow (cqr->seen_replies, + cqr->seen_replies_count, + cqr->seen_replies_count + hash_count); + memcpy (&cqr->seen_replies[old_count], + hc, + sizeof (struct GNUNET_HashCode) * hash_count); +} + + +/** * Closure for 'remove_by_unique_id'. */ struct RemoveByUniqueIdContext @@ -646,7 +755,7 @@ struct RemoveByUniqueIdContext * @return GNUNET_YES (we should continue to iterate) */ static int -remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value) +remove_by_unique_id (void *cls, const struct GNUNET_HashCode * key, void *value) { const struct RemoveByUniqueIdContext *ctx = cls; struct ClientQueryRecord *record = value; @@ -718,8 +827,8 @@ handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, r->key = NULL; else { - r->key = GNUNET_malloc (sizeof (GNUNET_HashCode)); - memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode)); + r->key = GNUNET_malloc (sizeof (struct GNUNET_HashCode)); + memcpy (r->key, &msg->key, sizeof (struct GNUNET_HashCode)); } GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); GNUNET_SERVER_receive_done (client, GNUNET_OK); @@ -751,7 +860,7 @@ handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *client, else { keys_match = (0 != ntohs(msg->filter_key) - && !memcmp(r->key, &msg->key, sizeof(GNUNET_HashCode))); + && !memcmp(r->key, &msg->key, sizeof(struct GNUNET_HashCode))); } if (find_active_client(client) == r->client && ntohl(msg->type) == r->type @@ -898,7 +1007,7 @@ struct ForwardReplyContext * if the result is mal-formed, GNUNET_NO */ static int -forward_reply (void *cls, const GNUNET_HashCode * key, void *value) +forward_reply (void *cls, const struct GNUNET_HashCode * key, void *value) { struct ForwardReplyContext *frc = cls; struct ClientQueryRecord *record = value; @@ -906,9 +1015,16 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) struct GNUNET_DHT_ClientResultMessage *reply; enum GNUNET_BLOCK_EvaluationResult eval; int do_free; - GNUNET_HashCode ch; + struct GNUNET_HashCode ch; unsigned int i; + if (LOG_ROUTE_DETAILS_STDERR) + { + fprintf (stderr, + "XDHT CLIENT-RESULT %s @ %u\n", + GNUNET_h2s (key), + getpid ()); + } if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type)) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -922,7 +1038,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) } GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch); for (i = 0; i < record->seen_replies_count; i++) - if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode))) + if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (struct GNUNET_HashCode))) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Duplicate reply, not passing request for key %s to local client\n", @@ -962,6 +1078,8 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: GNUNET_break (0); return GNUNET_NO; + case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT: + return GNUNET_YES; case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: GNUNET_log (GNUNET_ERROR_TYPE_WARNING, _("Unsupported block type (%u) in request!\n"), record->type); @@ -984,6 +1102,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) memcpy (pm, frc->pm, sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size)); pm->next = pm->prev = NULL; + pm->msg = (struct GNUNET_MessageHeader *) &pm[1]; } GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# RESULTS queued for clients"), 1, @@ -1017,7 +1136,7 @@ forward_reply (void *cls, const GNUNET_HashCode * key, void *value) */ void GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, - const GNUNET_HashCode * key, + const struct GNUNET_HashCode *key, unsigned int get_path_length, const struct GNUNET_PeerIdentity *get_path, unsigned int put_path_length, @@ -1048,8 +1167,7 @@ GDS_CLIENTS_handle_reply (struct GNUNET_TIME_Absolute expiration, _("Could not pass reply to client, message too big!\n")); return; } - pm = (struct PendingMessage *) GNUNET_malloc (msize + - sizeof (struct PendingMessage)); + pm = GNUNET_malloc (msize + sizeof (struct PendingMessage)); reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; pm->msg = &reply->header; reply->header.size = htons ((uint16_t) msize); @@ -1104,7 +1222,7 @@ GDS_CLIENTS_process_get (uint32_t options, uint32_t desired_replication_level, unsigned int path_length, const struct GNUNET_PeerIdentity *path, - const GNUNET_HashCode * key) + const struct GNUNET_HashCode * key) { struct ClientMonitorRecord *m; struct ClientList **cl; @@ -1116,7 +1234,7 @@ GDS_CLIENTS_process_get (uint32_t options, { if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && (NULL == m->key || - memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) + memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) { struct PendingMessage *pm; struct GNUNET_DHT_MonitorGetMessage *mmsg; @@ -1135,7 +1253,7 @@ GDS_CLIENTS_process_get (uint32_t options, msize = path_length * sizeof (struct GNUNET_PeerIdentity); msize += sizeof (struct GNUNET_DHT_MonitorGetMessage); msize += sizeof (struct PendingMessage); - pm = (struct PendingMessage *) GNUNET_malloc (msize); + pm = GNUNET_malloc (msize); mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1]; pm->msg = &mmsg->header; mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); @@ -1145,7 +1263,7 @@ GDS_CLIENTS_process_get (uint32_t options, mmsg->hop_count = htonl(hop_count); mmsg->desired_replication_level = htonl(desired_replication_level); mmsg->get_path_length = htonl(path_length); - memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); + memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; if (path_length > 0) memcpy (msg_path, path, @@ -1178,7 +1296,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, const struct GNUNET_PeerIdentity *put_path, unsigned int put_path_length, struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, + const struct GNUNET_HashCode * key, const void *data, size_t size) { @@ -1192,7 +1310,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, { if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && (NULL == m->key || - memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) + memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) { struct PendingMessage *pm; struct GNUNET_DHT_MonitorGetRespMessage *mmsg; @@ -1213,7 +1331,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, * sizeof (struct GNUNET_PeerIdentity); msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage); msize += sizeof (struct PendingMessage); - pm = (struct PendingMessage *) GNUNET_malloc (msize); + pm = GNUNET_malloc (msize); mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1]; pm->msg = (struct GNUNET_MessageHeader *) mmsg; mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); @@ -1232,7 +1350,7 @@ GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, memcpy (path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity)); mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); - memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); + memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); if (size > 0) memcpy (&path[get_path_length], data, size); add_pending_message (m->client, pm); @@ -1265,7 +1383,7 @@ GDS_CLIENTS_process_put (uint32_t options, unsigned int path_length, const struct GNUNET_PeerIdentity *path, struct GNUNET_TIME_Absolute exp, - const GNUNET_HashCode * key, + const struct GNUNET_HashCode * key, const void *data, size_t size) { @@ -1279,7 +1397,7 @@ GDS_CLIENTS_process_put (uint32_t options, { if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && (NULL == m->key || - memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) + memcmp (key, m->key, sizeof(struct GNUNET_HashCode)) == 0)) { struct PendingMessage *pm; struct GNUNET_DHT_MonitorPutMessage *mmsg; @@ -1299,7 +1417,7 @@ GDS_CLIENTS_process_put (uint32_t options, msize += path_length * sizeof (struct GNUNET_PeerIdentity); msize += sizeof (struct GNUNET_DHT_MonitorPutMessage); msize += sizeof (struct PendingMessage); - pm = (struct PendingMessage *) GNUNET_malloc (msize); + pm = GNUNET_malloc (msize); mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1]; pm->msg = (struct GNUNET_MessageHeader *) mmsg; mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); @@ -1316,7 +1434,7 @@ GDS_CLIENTS_process_put (uint32_t options, path_length * sizeof (struct GNUNET_PeerIdentity)); } mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); - memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); + memcpy (&mmsg->key, key, sizeof (struct GNUNET_HashCode)); if (size > 0) memcpy (&msg_path[path_length], data, size); add_pending_message (m->client, pm); @@ -1348,9 +1466,11 @@ GDS_CLIENTS_init (struct GNUNET_SERVER_Handle *server) {&handle_dht_local_monitor_stop, NULL, GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, + {&handle_dht_local_get_result_seen, NULL, + GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_RESULTS_KNOWN, 0}, {NULL, NULL, 0, 0} }; - forward_map = GNUNET_CONTAINER_multihashmap_create (1024); + forward_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_NO); retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); GNUNET_SERVER_add_handlers (server, plugin_handlers); GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); |