/*
This file is part of GNUnet.
Copyright (C) 2009-2017 GNUnet e.V.
GNUnet is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see
* * Note that we should not ALWAYS select the closest peer to the * target, peers further away from the target should be chosen with * exponentially declining probability. * * FIXME: double-check that this is fine * * * @param key the key we are selecting a peer to route to * @param bloom a bloomfilter containing entries this request has seen already * @param hops how many hops has this message traversed thus far * @return Peer to route to, or NULL on error */ static struct PeerInfo * select_peer (const struct GNUNET_HashCode *key, const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops) { unsigned int bc; unsigned int count; unsigned int selected; struct PeerInfo *pos; unsigned int dist; unsigned int smallest_distance; struct PeerInfo *chosen; if (hops >= GDS_NSE_get ()) { /* greedy selection (closest peer that is not in bloomfilter) */ smallest_distance = UINT_MAX; chosen = NULL; for (bc = 0; bc <= closest_bucket; bc++) { pos = k_buckets[bc].head; count = 0; while ((pos != NULL) && (count < bucket_size)) { if ( (NULL == bloom) || (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->phash))) { dist = get_distance (key, &pos->phash); if (dist < smallest_distance) { chosen = pos; smallest_distance = dist; } } else { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Excluded peer `%s' due to BF match in greedy routing for %s\n", GNUNET_i2s (pos->id), GNUNET_h2s (key)); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers excluded from routing due to Bloomfilter"), 1, GNUNET_NO); dist = get_distance (key, &pos->phash); if (dist < smallest_distance) { chosen = NULL; smallest_distance = dist; } } count++; pos = pos->next; } } if (NULL == chosen) GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peer selection failed"), 1, GNUNET_NO); else GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Selected peer `%s' in greedy routing for %s\n", GNUNET_i2s (chosen->id), GNUNET_h2s (key)); return chosen; } /* select "random" peer */ /* count number of peers that are available and not filtered */ count = 0; for (bc = 0; bc <= closest_bucket; bc++) { pos = k_buckets[bc].head; while ( (NULL != pos) && (count < bucket_size) ) { if ( (NULL != bloom) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->phash)) ) { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers excluded from routing due to Bloomfilter"), 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Excluded peer `%s' due to BF match in random routing for %s\n", GNUNET_i2s (pos->id), GNUNET_h2s (key)); pos = pos->next; continue; /* Ignore bloomfiltered peers */ } count++; pos = pos->next; } } if (0 == count) /* No peers to select from! */ { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peer selection failed"), 1, GNUNET_NO); return NULL; } /* Now actually choose a peer */ selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count); count = 0; for (bc = 0; bc <= closest_bucket; bc++) { for (pos = k_buckets[bc].head; ((pos != NULL) && (count < bucket_size)); pos = pos->next) { if ((bloom != NULL) && (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->phash))) { continue; /* Ignore bloomfiltered peers */ } if (0 == selected--) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Selected peer `%s' in random routing for %s\n", GNUNET_i2s (pos->id), GNUNET_h2s (key)); return pos; } } } GNUNET_break (0); return NULL; } /** * Compute the set of peers that the given request should be * forwarded to. * * @param key routing key * @param bloom bloom filter excluding peers as targets, all selected * peers will be added to the bloom filter * @param hop_count number of hops the request has traversed so far * @param target_replication desired number of replicas * @param targets where to store an array of target peers (to be * free'd by the caller) * @return number of peers returned in 'targets'. */ static unsigned int get_target_peers (const struct GNUNET_HashCode *key, struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hop_count, uint32_t target_replication, struct PeerInfo ***targets) { unsigned int ret; unsigned int off; struct PeerInfo **rtargets; struct PeerInfo *nxt; GNUNET_assert (NULL != bloom); ret = get_forward_count (hop_count, target_replication); if (0 == ret) { *targets = NULL; return 0; } rtargets = GNUNET_new_array (ret, struct PeerInfo *); for (off = 0; off < ret; off++) { nxt = select_peer (key, bloom, hop_count); if (NULL == nxt) break; rtargets[off] = nxt; GNUNET_break (GNUNET_NO == GNUNET_CONTAINER_bloomfilter_test (bloom, &nxt->phash)); GNUNET_CONTAINER_bloomfilter_add (bloom, &nxt->phash); } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Selected %u/%u peers at hop %u for %s (target was %u)\n", off, GNUNET_CONTAINER_multipeermap_size (all_connected_peers), (unsigned int) hop_count, GNUNET_h2s (key), ret); if (0 == off) { GNUNET_free (rtargets); *targets = NULL; return 0; } *targets = rtargets; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Forwarding query `%s' to %u peers (goal was %u peers)\n", GNUNET_h2s (key), off, ret); return off; } /** * Perform a PUT operation. Forwards the given request to other * peers. Does not store the data locally. Does not give the * data to local clients. May do nothing if this is the only * peer in the network (or if we are the closest peer in the * network). * * @param type type of the block * @param options routing options * @param desired_replication_level desired replication count * @param expiration_time when does the content expire * @param hop_count how many hops has this message traversed so far * @param bf Bloom filter of peers this PUT has already traversed * @param key key for the content * @param put_path_length number of entries in @a put_path * @param put_path peers this request has traversed so far (if tracked) * @param data payload to store * @param data_size number of bytes in @a data * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not */ int GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, enum GNUNET_DHT_RouteOption options, uint32_t desired_replication_level, struct GNUNET_TIME_Absolute expiration_time, uint32_t hop_count, struct GNUNET_CONTAINER_BloomFilter *bf, const struct GNUNET_HashCode *key, unsigned int put_path_length, struct GNUNET_PeerIdentity *put_path, const void *data, size_t data_size) { unsigned int target_count; unsigned int i; struct PeerInfo **targets; struct PeerInfo *target; size_t msize; struct GNUNET_MQ_Envelope *env; struct PeerPutMessage *ppm; struct GNUNET_PeerIdentity *pp; unsigned int skip_count; GNUNET_assert (NULL != bf); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding myself (%s) to PUT bloomfilter for %s\n", GNUNET_i2s (&my_identity), GNUNET_h2s (key)); GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity_hash); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"), 1, GNUNET_NO); target_count = get_target_peers (key, bf, hop_count, desired_replication_level, &targets); if (0 == target_count) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing PUT for %s terminates after %u hops at %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (&my_identity)); return GNUNET_NO; } msize = put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size; if (msize + sizeof (struct PeerPutMessage) >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { put_path_length = 0; msize = data_size; } if (msize + sizeof (struct PeerPutMessage) >= GNUNET_CONSTANTS_MAX_ENCRYPTED_MESSAGE_SIZE) { GNUNET_break (0); GNUNET_free (targets); return GNUNET_NO; } GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT messages queued for transmission"), target_count, GNUNET_NO); skip_count = 0; for (i = 0; i < target_count; i++) { target = targets[i]; if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"), 1, GNUNET_NO); skip_count++; continue; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (target->id)); env = GNUNET_MQ_msg_extra (ppm, msize, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT); ppm->options = htonl (options); ppm->type = htonl (type); ppm->hop_count = htonl (hop_count + 1); ppm->desired_replication_level = htonl (desired_replication_level); ppm->put_path_length = htonl (put_path_length); ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time); GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &target->phash)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data (bf, ppm->bloomfilter, DHT_BLOOM_SIZE)); ppm->key = *key; pp = (struct GNUNET_PeerIdentity *) &ppm[1]; GNUNET_memcpy (pp, put_path, sizeof (struct GNUNET_PeerIdentity) * put_path_length); GNUNET_memcpy (&pp[put_path_length], data, data_size); GNUNET_MQ_send (target->mq, env); } GNUNET_free (targets); return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO; } /** * Perform a GET operation. Forwards the given request to other * peers. Does not lookup the key locally. May do nothing if this is * the only peer in the network (or if we are the closest peer in the * network). * * @param type type of the block * @param options routing options * @param desired_replication_level desired replication count * @param hop_count how many hops did this request traverse so far? * @param key key for the content * @param xquery extended query * @param xquery_size number of bytes in @a xquery * @param bg group to use for filtering replies * @param peer_bf filter for peers not to select (again) * @return #GNUNET_OK if the request was forwarded, #GNUNET_NO if not */ int GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, enum GNUNET_DHT_RouteOption options, uint32_t desired_replication_level, uint32_t hop_count, const struct GNUNET_HashCode *key, const void *xquery, size_t xquery_size, struct GNUNET_BLOCK_Group *bg, struct GNUNET_CONTAINER_BloomFilter *peer_bf) { unsigned int target_count; struct PeerInfo **targets; struct PeerInfo *target; struct GNUNET_MQ_Envelope *env; size_t msize; struct PeerGetMessage *pgm; char *xq; size_t reply_bf_size; void *reply_bf; unsigned int skip_count; uint32_t bf_nonce; GNUNET_assert (NULL != peer_bf); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"), 1, GNUNET_NO); target_count = get_target_peers (key, peer_bf, hop_count, desired_replication_level, &targets); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding myself (%s) to GET bloomfilter for %s\n", GNUNET_i2s (&my_identity), GNUNET_h2s (key)); GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity_hash); if (0 == target_count) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing GET for %s terminates after %u hops at %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (&my_identity)); return GNUNET_NO; } if (GNUNET_OK != GNUNET_BLOCK_group_serialize (bg, &bf_nonce, &reply_bf, &reply_bf_size)) { reply_bf = NULL; reply_bf_size = 0; bf_nonce = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); } msize = xquery_size + reply_bf_size; if (msize + sizeof (struct PeerGetMessage) >= GNUNET_MAX_MESSAGE_SIZE) { GNUNET_break (0); GNUNET_free_non_null (reply_bf); GNUNET_free (targets); return GNUNET_NO; } GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET messages queued for transmission"), target_count, GNUNET_NO); /* forward request */ skip_count = 0; for (unsigned int i = 0; i < target_count; i++) { target = targets[i]; if (GNUNET_MQ_get_length (target->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"), 1, GNUNET_NO); skip_count++; continue; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key), (unsigned int) hop_count, GNUNET_i2s (target->id)); env = GNUNET_MQ_msg_extra (pgm, msize, GNUNET_MESSAGE_TYPE_DHT_P2P_GET); pgm->options = htonl (options); pgm->type = htonl (type); pgm->hop_count = htonl (hop_count + 1); pgm->desired_replication_level = htonl (desired_replication_level); pgm->xquery_size = htonl (xquery_size); pgm->bf_mutator = bf_nonce; GNUNET_break (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &target->phash)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf, pgm->bloomfilter, DHT_BLOOM_SIZE)); pgm->key = *key; xq = (char *) &pgm[1]; GNUNET_memcpy (xq, xquery, xquery_size); GNUNET_memcpy (&xq[xquery_size], reply_bf, reply_bf_size); GNUNET_MQ_send (target->mq, env); } GNUNET_free (targets); GNUNET_free_non_null (reply_bf); return (skip_count < target_count) ? GNUNET_OK : GNUNET_NO; } /** * Handle a reply (route to origin). Only forwards the reply back to * the given peer. Does not do local caching or forwarding to local * clients. * * @param target neighbour that should receive the block (if still connected) * @param type type of the block * @param expiration_time when does the content expire * @param key key for the content * @param put_path_length number of entries in @a put_path * @param put_path peers the original PUT traversed (if tracked) * @param get_path_length number of entries in @a get_path * @param get_path peers this reply has traversed so far (if tracked) * @param data payload of the reply * @param data_size number of bytes in @a data */ void GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target, enum GNUNET_BLOCK_Type type, struct GNUNET_TIME_Absolute expiration_time, const struct GNUNET_HashCode *key, unsigned int put_path_length, const struct GNUNET_PeerIdentity *put_path, unsigned int get_path_length, const struct GNUNET_PeerIdentity *get_path, const void *data, size_t data_size) { struct PeerInfo *pi; struct GNUNET_MQ_Envelope *env; size_t msize; struct PeerResultMessage *prm; struct GNUNET_PeerIdentity *paths; msize = data_size + (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity); if ((msize + sizeof (struct PeerResultMessage) >= GNUNET_MAX_MESSAGE_SIZE) || (get_path_length > GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || (put_path_length > GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || (data_size > GNUNET_MAX_MESSAGE_SIZE)) { GNUNET_break (0); return; } pi = GNUNET_CONTAINER_multipeermap_get (all_connected_peers, target); if (NULL == pi) { /* peer disconnected in the meantime, drop reply */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No matching peer for reply for key %s\n", GNUNET_h2s (key)); return; } if (GNUNET_MQ_get_length (pi->mq) >= MAXIMUM_PENDING_PER_PEER) { /* skip */ GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P messages dropped due to full queue"), 1, GNUNET_NO); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Peer queue full, ignoring reply for key %s\n", GNUNET_h2s (key)); return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Forwarding reply for key %s to peer %s\n", GNUNET_h2s (key), GNUNET_i2s (target)); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# RESULT messages queued for transmission"), 1, GNUNET_NO); env = GNUNET_MQ_msg_extra (prm, msize, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT); prm->type = htonl (type); prm->put_path_length = htonl (put_path_length); prm->get_path_length = htonl (get_path_length); prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time); prm->key = *key; paths = (struct GNUNET_PeerIdentity *) &prm[1]; GNUNET_memcpy (paths, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity)); GNUNET_memcpy (&paths[put_path_length], get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity)); GNUNET_memcpy (&paths[put_path_length + get_path_length], data, data_size); GNUNET_MQ_send (pi->mq, env); } /** * To be called on core init/fail. * * @param cls service closure * @param identity the public identity of this peer */ static void core_init (void *cls, const struct GNUNET_PeerIdentity *identity) { (void) cls; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "CORE called, I am %s\n", GNUNET_i2s (identity)); my_identity = *identity; GNUNET_CRYPTO_hash (identity, sizeof (struct GNUNET_PeerIdentity), &my_identity_hash); GNUNET_SERVICE_resume (GDS_service); } /** * Check validity of a p2p put request. * * @param cls closure with the `struct PeerInfo` of the sender * @param message message * @return #GNUNET_OK if the message is valid */ static int check_dht_p2p_put (void *cls, const struct PeerPutMessage *put) { uint32_t putlen; uint16_t msize; (void) cls; msize = ntohs (put->header.size); putlen = ntohl (put->put_path_length); if ((msize < sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)) || (putlen > GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Core handler for p2p put requests. * * @param cls closure with the `struct PeerInfo` of the sender * @param message message */ static void handle_dht_p2p_put (void *cls, const struct PeerPutMessage *put) { struct PeerInfo *peer = cls; const struct GNUNET_PeerIdentity *put_path; const void *payload; uint32_t putlen; uint16_t msize; size_t payload_size; enum GNUNET_DHT_RouteOption options; struct GNUNET_CONTAINER_BloomFilter *bf; struct GNUNET_HashCode test_key; int forwarded; struct GNUNET_TIME_Absolute exp_time; exp_time = GNUNET_TIME_absolute_ntoh (put->expiration_time); if (0 == GNUNET_TIME_absolute_get_remaining (exp_time).rel_value_us) { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Expired PUTs discarded"), 1, GNUNET_NO); return; } msize = ntohs (put->header.size); putlen = ntohl (put->put_path_length); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P PUT requests received"), 1, GNUNET_NO); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P PUT bytes received"), msize, GNUNET_NO); put_path = (const struct GNUNET_PeerIdentity *) &put[1]; payload = &put_path[putlen]; options = ntohl (put->options); payload_size = msize - (sizeof (struct PeerPutMessage) + putlen * sizeof (struct GNUNET_PeerIdentity)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for `%s' from %s\n", GNUNET_h2s (&put->key), GNUNET_i2s (peer->id)); if (GNUNET_YES == log_route_details_stderr) { char *tmp; char *pp; pp = GNUNET_STRINGS_pp2s (put_path, putlen); tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N PUT %s: %s->%s (%u, %u=>%u, PP: %s)\n", GNUNET_h2s (&put->key), GNUNET_i2s (peer->id), tmp, ntohl(put->hop_count), GNUNET_CRYPTO_hash_matching_bits (&peer->phash, &put->key), GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &put->key), pp); GNUNET_free (pp); GNUNET_free (tmp); } switch (GNUNET_BLOCK_get_key (GDS_block_context, ntohl (put->type), payload, payload_size, &test_key)) { case GNUNET_YES: if (0 != memcmp (&test_key, &put->key, sizeof (struct GNUNET_HashCode))) { char *put_s = GNUNET_strdup (GNUNET_h2s_full (&put->key)); GNUNET_break_op (0); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "PUT with key `%s' for block with key %s\n", put_s, GNUNET_h2s_full (&test_key)); GNUNET_free (put_s); return; } break; case GNUNET_NO: GNUNET_break_op (0); return; case GNUNET_SYSERR: /* cannot verify, good luck */ break; } if (ntohl (put->type) == GNUNET_BLOCK_TYPE_REGEX) /* FIXME: do for all tpyes */ { switch (GNUNET_BLOCK_evaluate (GDS_block_context, ntohl (put->type), NULL, /* query group */ GNUNET_BLOCK_EO_NONE, NULL, /* query */ NULL, 0, /* xquery */ payload, payload_size)) { case GNUNET_BLOCK_EVALUATION_OK_MORE: case GNUNET_BLOCK_EVALUATION_OK_LAST: break; case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: case GNUNET_BLOCK_EVALUATION_RESULT_INVALID: case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT: case GNUNET_BLOCK_EVALUATION_REQUEST_VALID: case GNUNET_BLOCK_EVALUATION_REQUEST_INVALID: case GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED: default: GNUNET_break_op (0); return; } } bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K); GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &peer->phash)); { struct GNUNET_PeerIdentity pp[putlen + 1]; /* extend 'put path' by sender */ if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE)) { #if SANITY_CHECKS for (unsigned int i=0;i<=putlen;i++) { for (unsigned int j=0;jid, sizeof (struct GNUNET_PeerIdentity))); } #endif GNUNET_memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity)); pp[putlen] = *peer->id; putlen++; } else putlen = 0; /* give to local clients */ GDS_CLIENTS_handle_reply (exp_time, &put->key, 0, NULL, putlen, pp, ntohl (put->type), payload_size, payload); /* store locally */ if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || (GDS_am_closest_peer (&put->key, bf))) GDS_DATACACHE_handle_put (exp_time, &put->key, putlen, pp, ntohl (put->type), payload_size, payload); /* route to other peers */ forwarded = GDS_NEIGHBOURS_handle_put (ntohl (put->type), options, ntohl (put->desired_replication_level), exp_time, ntohl (put->hop_count), bf, &put->key, putlen, pp, payload, payload_size); /* notify monitoring clients */ GDS_CLIENTS_process_put (options | ( (GNUNET_OK == forwarded) ? GNUNET_DHT_RO_LAST_HOP : 0 ), ntohl (put->type), ntohl (put->hop_count), ntohl (put->desired_replication_level), putlen, pp, exp_time, &put->key, payload, payload_size); } GNUNET_CONTAINER_bloomfilter_free (bf); } /** * We have received a FIND PEER request. Send matching * HELLOs back. * * @param sender sender of the FIND PEER request * @param key peers close to this key are desired * @param bg group for filtering peers */ static void handle_find_peer (const struct GNUNET_PeerIdentity *sender, const struct GNUNET_HashCode *key, struct GNUNET_BLOCK_Group *bg) { int bucket_idx; struct PeerBucket *bucket; struct PeerInfo *peer; unsigned int choice; const struct GNUNET_HELLO_Message *hello; size_t hello_size; /* first, check about our own HELLO */ if (NULL != GDS_my_hello) { hello_size = GNUNET_HELLO_size ((const struct GNUNET_HELLO_Message *) GDS_my_hello); GNUNET_break (hello_size >= sizeof (struct GNUNET_MessageHeader)); if (GNUNET_BLOCK_EVALUATION_OK_MORE == GNUNET_BLOCK_evaluate (GDS_block_context, GNUNET_BLOCK_TYPE_DHT_HELLO, bg, GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO, &my_identity_hash, NULL, 0, GDS_my_hello, hello_size)) { GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO, GNUNET_TIME_relative_to_absolute (hello_expiration), key, 0, NULL, 0, NULL, GDS_my_hello, hello_size); } else { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# FIND PEER requests ignored due to Bloomfilter"), 1, GNUNET_NO); } } else { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# FIND PEER requests ignored due to lack of HELLO"), 1, GNUNET_NO); } /* then, also consider sending a random HELLO from the closest bucket */ if (0 == memcmp (&my_identity_hash, key, sizeof (struct GNUNET_HashCode))) bucket_idx = closest_bucket; else bucket_idx = GNUNET_MIN ((int) closest_bucket, find_bucket (key)); if (bucket_idx < 0) return; bucket = &k_buckets[bucket_idx]; if (bucket->peers_size == 0) return; choice = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size); peer = bucket->head; while (choice > 0) { GNUNET_assert (NULL != peer); peer = peer->next; choice--; } choice = bucket->peers_size; do { peer = peer->next; if (0 == choice--) return; /* no non-masked peer available */ if (NULL == peer) peer = bucket->head; hello = GDS_HELLO_get (peer->id); } while ( (NULL == hello) || (GNUNET_BLOCK_EVALUATION_OK_MORE != GNUNET_BLOCK_evaluate (GDS_block_context, GNUNET_BLOCK_TYPE_DHT_HELLO, bg, GNUNET_BLOCK_EO_LOCAL_SKIP_CRYPTO, &peer->phash, NULL, 0, hello, (hello_size = GNUNET_HELLO_size (hello)))) ); GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO, GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key, 0, NULL, 0, NULL, hello, hello_size); } /** * Handle a result from local datacache for a GET operation. * * @param cls the `struct PeerInfo` for which this is a reply * @param type type of the block * @param expiration_time when does the content expire * @param key key for the content * @param put_path_length number of entries in @a put_path * @param put_path peers the original PUT traversed (if tracked) * @param get_path_length number of entries in @a get_path * @param get_path peers this reply has traversed so far (if tracked) * @param data payload of the reply * @param data_size number of bytes in @a data */ static void handle_local_result (void *cls, enum GNUNET_BLOCK_Type type, struct GNUNET_TIME_Absolute expiration_time, const struct GNUNET_HashCode *key, unsigned int put_path_length, const struct GNUNET_PeerIdentity *put_path, unsigned int get_path_length, const struct GNUNET_PeerIdentity *get_path, const void *data, size_t data_size) { struct PeerInfo *peer = cls; char *pp; pp = GNUNET_STRINGS_pp2s (put_path, put_path_length); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found local result for %s (PP: %s)\n", GNUNET_h2s (key), pp); GNUNET_free (pp); GDS_NEIGHBOURS_handle_reply (peer->id, type, expiration_time, key, put_path_length, put_path, get_path_length, get_path, data, data_size); } /** * Check validity of p2p get request. * * @param cls closure with the `struct PeerInfo` of the sender * @param get the message * @return #GNUNET_OK if the message is well-formed */ static int check_dht_p2p_get (void *cls, const struct PeerGetMessage *get) { uint32_t xquery_size; uint16_t msize; (void) cls; msize = ntohs (get->header.size); xquery_size = ntohl (get->xquery_size); if (msize < sizeof (struct PeerGetMessage) + xquery_size) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Core handler for p2p get requests. * * @param cls closure with the `struct PeerInfo` of the sender * @param get the message */ static void handle_dht_p2p_get (void *cls, const struct PeerGetMessage *get) { struct PeerInfo *peer = cls; uint32_t xquery_size; size_t reply_bf_size; uint16_t msize; enum GNUNET_BLOCK_Type type; enum GNUNET_DHT_RouteOption options; enum GNUNET_BLOCK_EvaluationResult eval; struct GNUNET_BLOCK_Group *bg; struct GNUNET_CONTAINER_BloomFilter *peer_bf; const char *xquery; int forwarded; /* parse and validate message */ msize = ntohs (get->header.size); xquery_size = ntohl (get->xquery_size); reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size); type = ntohl (get->type); options = ntohl (get->options); xquery = (const char *) &get[1]; GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P GET requests received"), 1, GNUNET_NO); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P GET bytes received"), msize, GNUNET_NO); if (GNUNET_YES == log_route_details_stderr) { char *tmp; tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N GET %s: %s->%s (%u, %u=>%u) xq: %.*s\n", GNUNET_h2s (&get->key), GNUNET_i2s (peer->id), tmp, ntohl(get->hop_count), GNUNET_CRYPTO_hash_matching_bits (&peer->phash, &get->key), GNUNET_CRYPTO_hash_matching_bits (&my_identity_hash, &get->key), ntohl(get->xquery_size), xquery); GNUNET_free (tmp); } eval = GNUNET_BLOCK_evaluate (GDS_block_context, type, NULL, GNUNET_BLOCK_EO_NONE, &get->key, xquery, xquery_size, NULL, 0); if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID) { /* request invalid or block type not supported */ GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED); return; } peer_bf = GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE, GNUNET_CONSTANTS_BLOOMFILTER_K); GNUNET_break_op (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (peer_bf, &peer->phash)); bg = GNUNET_BLOCK_group_create (GDS_block_context, type, get->bf_mutator, &xquery[xquery_size], reply_bf_size, "filter-size", reply_bf_size, NULL); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n", GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity), (unsigned int) ntohl (get->hop_count)); /* local lookup (this may update the reply_bf) */ if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || (GDS_am_closest_peer (&get->key, peer_bf))) { if ((0 != (options & GNUNET_DHT_RO_FIND_PEER))) { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P FIND PEER requests processed"), 1, GNUNET_NO); handle_find_peer (peer->id, &get->key, bg); } else { eval = GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size, bg, &handle_local_result, peer); } } else { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P GET requests ONLY routed"), 1, GNUNET_NO); } /* remember request for routing replies */ GDS_ROUTING_add (peer->id, type, bg, /* bg now owned by routing, but valid at least until end of this function! */ options, &get->key, xquery, xquery_size); /* P2P forwarding */ forwarded = GNUNET_NO; if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) forwarded = GDS_NEIGHBOURS_handle_get (type, options, ntohl (get->desired_replication_level), ntohl (get->hop_count), &get->key, xquery, xquery_size, bg, peer_bf); GDS_CLIENTS_process_get (options | (GNUNET_OK == forwarded) ? GNUNET_DHT_RO_LAST_HOP : 0, type, ntohl (get->hop_count), ntohl (get->desired_replication_level), 0, NULL, &get->key); /* clean up; note that 'bg' is owned by routing now! */ GNUNET_CONTAINER_bloomfilter_free (peer_bf); } /** * Check validity of p2p result message. * * @param cls closure * @param message message * @return #GNUNET_YES if the message is well-formed */ static int check_dht_p2p_result (void *cls, const struct PeerResultMessage *prm) { uint32_t get_path_length; uint32_t put_path_length; uint16_t msize; (void) cls; msize = ntohs (prm->header.size); put_path_length = ntohl (prm->put_path_length); get_path_length = ntohl (prm->get_path_length); if ((msize < sizeof (struct PeerResultMessage) + (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)) || (get_path_length > GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || (put_path_length > GNUNET_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) { GNUNET_break_op (0); return GNUNET_SYSERR; } return GNUNET_OK; } /** * Process a reply, after the @a get_path has been updated. * * @param expiration_time when does the reply expire * @param key key matching the query * @param get_path_length number of entries in @a get_path * @param get_path path the reply has taken * @param put_path_length number of entries in @a put_path * @param put_path path the PUT has taken * @param type type of the block * @param data_size number of bytes in @a data * @param data payload of the reply */ static void process_reply_with_path (struct GNUNET_TIME_Absolute expiration_time, const struct GNUNET_HashCode *key, unsigned int get_path_length, const struct GNUNET_PeerIdentity *get_path, unsigned int put_path_length, const struct GNUNET_PeerIdentity *put_path, enum GNUNET_BLOCK_Type type, size_t data_size, const void *data) { /* forward to local clients */ GDS_CLIENTS_handle_reply (expiration_time, key, get_path_length, get_path, put_path_length, put_path, type, data_size, data); GDS_CLIENTS_process_get_resp (type, get_path, get_path_length, put_path, put_path_length, expiration_time, key, data, data_size); if (GNUNET_YES == cache_results) { struct GNUNET_PeerIdentity xput_path[get_path_length + 1 + put_path_length]; GNUNET_memcpy (xput_path, put_path, put_path_length * sizeof (struct GNUNET_PeerIdentity)); GNUNET_memcpy (&xput_path[put_path_length], get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity)); GDS_DATACACHE_handle_put (expiration_time, key, get_path_length + put_path_length, xput_path, type, data_size, data); } /* forward to other peers */ GDS_ROUTING_process (type, expiration_time, key, put_path_length, put_path, get_path_length, get_path, data, data_size); } /** * Core handler for p2p result messages. * * @param cls closure * @param message message */ static void handle_dht_p2p_result (void *cls, const struct PeerResultMessage *prm) { struct PeerInfo *peer = cls; const struct GNUNET_PeerIdentity *put_path; const struct GNUNET_PeerIdentity *get_path; const void *data; uint32_t get_path_length; uint32_t put_path_length; uint16_t msize; size_t data_size; enum GNUNET_BLOCK_Type type; struct GNUNET_TIME_Absolute exp_time; /* parse and validate message */ exp_time = GNUNET_TIME_absolute_ntoh (prm->expiration_time); if (0 == GNUNET_TIME_absolute_get_remaining (exp_time).rel_value_us) { GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Expired results discarded"), 1, GNUNET_NO); return; } msize = ntohs (prm->header.size); put_path_length = ntohl (prm->put_path_length); get_path_length = ntohl (prm->get_path_length); put_path = (const struct GNUNET_PeerIdentity *) &prm[1]; get_path = &put_path[put_path_length]; type = ntohl (prm->type); data = (const void *) &get_path[get_path_length]; data_size = msize - (sizeof (struct PeerResultMessage) + (get_path_length + put_path_length) * sizeof (struct GNUNET_PeerIdentity)); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"), 1, GNUNET_NO); GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULT bytes received"), msize, GNUNET_NO); if (GNUNET_YES == log_route_details_stderr) { char *tmp; char *pp; char *gp; gp = GNUNET_STRINGS_pp2s (get_path, get_path_length); pp = GNUNET_STRINGS_pp2s (put_path, put_path_length); tmp = GNUNET_strdup (GNUNET_i2s (&my_identity)); LOG_TRAFFIC (GNUNET_ERROR_TYPE_DEBUG, "R5N RESULT %s: %s->%s (GP: %s, PP: %s)\n", GNUNET_h2s (&prm->key), GNUNET_i2s (peer->id), tmp, gp, pp); GNUNET_free (gp); GNUNET_free (pp); GNUNET_free (tmp); } /* if we got a HELLO, consider it for our own routing table */ if (GNUNET_BLOCK_TYPE_DHT_HELLO == type) { const struct GNUNET_MessageHeader *h; struct GNUNET_PeerIdentity pid; /* Should be a HELLO, validate and consider using it! */ if (data_size < sizeof (struct GNUNET_HELLO_Message)) { GNUNET_break_op (0); return; } h = data; if (data_size != ntohs (h->size)) { GNUNET_break_op (0); return; } if (GNUNET_OK != GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h, &pid)) { GNUNET_break_op (0); return; } if ( (GNUNET_YES != disable_try_connect) && (0 != memcmp (&my_identity, &pid, sizeof (struct GNUNET_PeerIdentity))) ) try_connect (&pid, h); } /* First, check if 'peer' is already on the path, and if so, truncate it instead of expanding. */ for (unsigned int i=0;i<=get_path_length;i++) if (0 == memcmp (&get_path[i], peer->id, sizeof (struct GNUNET_PeerIdentity))) { process_reply_with_path (exp_time, &prm->key, i, get_path, put_path_length, put_path, type, data_size, data); return; } /* Need to append 'peer' to 'get_path' (normal case) */ { struct GNUNET_PeerIdentity xget_path[get_path_length + 1]; GNUNET_memcpy (xget_path, get_path, get_path_length * sizeof (struct GNUNET_PeerIdentity)); xget_path[get_path_length] = *peer->id; process_reply_with_path (exp_time, &prm->key, get_path_length + 1, xget_path, put_path_length, put_path, type, data_size, data); } } /** * Initialize neighbours subsystem. * * @return #GNUNET_OK on success, #GNUNET_SYSERR on error */ int GDS_NEIGHBOURS_init () { struct GNUNET_MQ_MessageHandler core_handlers[] = { GNUNET_MQ_hd_var_size (dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, struct PeerGetMessage, NULL), GNUNET_MQ_hd_var_size (dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, struct PeerPutMessage, NULL), GNUNET_MQ_hd_var_size (dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, struct PeerResultMessage, NULL), GNUNET_MQ_handler_end () }; unsigned long long temp_config_num; disable_try_connect = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "DISABLE_TRY_CONNECT"); if (GNUNET_OK == GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size", &temp_config_num)) bucket_size = (unsigned int) temp_config_num; cache_results = GNUNET_CONFIGURATION_get_value_yesno (GDS_cfg, "DHT", "CACHE_RESULTS"); log_route_details_stderr = (NULL != getenv("GNUNET_DHT_ROUTE_DEBUG")) ? GNUNET_YES : GNUNET_NO; ats_ch = GNUNET_ATS_connectivity_init (GDS_cfg); core_api = GNUNET_CORE_connect (GDS_cfg, NULL, &core_init, &handle_core_connect, &handle_core_disconnect, core_handlers); if (NULL == core_api) return GNUNET_SYSERR; all_connected_peers = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_YES); all_desired_peers = GNUNET_CONTAINER_multipeermap_create (256, GNUNET_NO); return GNUNET_OK; } /** * Shutdown neighbours subsystem. */ void GDS_NEIGHBOURS_done () { if (NULL == core_api) return; GNUNET_CORE_disconnect (core_api); core_api = NULL; GNUNET_assert (0 == GNUNET_CONTAINER_multipeermap_size (all_connected_peers)); GNUNET_CONTAINER_multipeermap_destroy (all_connected_peers); all_connected_peers = NULL; GNUNET_CONTAINER_multipeermap_iterate (all_desired_peers, &free_connect_info, NULL); GNUNET_CONTAINER_multipeermap_destroy (all_desired_peers); all_desired_peers = NULL; GNUNET_ATS_connectivity_done (ats_ch); ats_ch = NULL; GNUNET_assert (NULL == find_peer_task); } /** * Get the ID of the local node. * * @return identity of the local node */ struct GNUNET_PeerIdentity * GDS_NEIGHBOURS_get_id () { return &my_identity; } /* end of gnunet-service-dht_neighbours.c */