diff options
Diffstat (limited to 'src/dht/gnunet-service-dht_neighbours.c')
-rw-r--r-- | src/dht/gnunet-service-dht_neighbours.c | 2029 |
1 files changed, 2029 insertions, 0 deletions
diff --git a/src/dht/gnunet-service-dht_neighbours.c b/src/dht/gnunet-service-dht_neighbours.c new file mode 100644 index 0000000..4ea5dd6 --- /dev/null +++ b/src/dht/gnunet-service-dht_neighbours.c @@ -0,0 +1,2029 @@ +/* + This file is part of GNUnet. + (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file dht/gnunet-service-dht_neighbours.c + * @brief GNUnet DHT service's bucket and neighbour management code + * @author Christian Grothoff + * @author Nathan Evans + */ + +#include "platform.h" +#include "gnunet_block_lib.h" +#include "gnunet_util_lib.h" +#include "gnunet_hello_lib.h" +#include "gnunet_constants.h" +#include "gnunet_protocols.h" +#include "gnunet_nse_service.h" +#include "gnunet_ats_service.h" +#include "gnunet_core_service.h" +#include "gnunet_datacache_lib.h" +#include "gnunet_transport_service.h" +#include "gnunet_hello_lib.h" +#include "gnunet_dht_service.h" +#include "gnunet_statistics_service.h" +#include "gnunet-service-dht.h" +#include "gnunet-service-dht_clients.h" +#include "gnunet-service-dht_datacache.h" +#include "gnunet-service-dht_hello.h" +#include "gnunet-service-dht_neighbours.h" +#include "gnunet-service-dht_nse.h" +#include "gnunet-service-dht_routing.h" +#include <fenv.h> +#include "dht.h" + +/** + * How many buckets will we allow total. + */ +#define MAX_BUCKETS sizeof (GNUNET_HashCode) * 8 + +/** + * What is the maximum number of peers in a given bucket. + */ +#define DEFAULT_BUCKET_SIZE 8 + +/** + * Desired replication level for FIND PEER requests + */ +#define FIND_PEER_REPLICATION_LEVEL 4 + +/** + * Maximum allowed replication level for all requests. + */ +#define MAXIMUM_REPLICATION_LEVEL 16 + +/** + * How often to update our preference levels for peers in our routing tables. + */ +#define DHT_DEFAULT_PREFERENCE_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2) + +/** + * How long at least to wait before sending another find peer request. + */ +#define DHT_MINIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_SECONDS, 30) + +/** + * How long at most to wait before sending another find peer request. + */ +#define DHT_MAXIMUM_FIND_PEER_INTERVAL GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 10) + +/** + * How long at most to wait for transmission of a GET request to another peer? + */ +#define GET_TIMEOUT GNUNET_TIME_relative_multiply(GNUNET_TIME_UNIT_MINUTES, 2) + + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * P2P PUT message + */ +struct PeerPutMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_PUT + */ + struct GNUNET_MessageHeader header; + + /** + * Processing options + */ + uint32_t options GNUNET_PACKED; + + /** + * Content type. + */ + uint32_t type GNUNET_PACKED; + + /** + * Hop count + */ + uint32_t hop_count GNUNET_PACKED; + + /** + * Replication level for this message + */ + uint32_t desired_replication_level GNUNET_PACKED; + + /** + * Length of the PUT path that follows (if tracked). + */ + uint32_t put_path_length GNUNET_PACKED; + + /** + * When does the content expire? + */ + struct GNUNET_TIME_AbsoluteNBO expiration_time; + + /** + * Bloomfilter (for peer identities) to stop circular routes + */ + char bloomfilter[DHT_BLOOM_SIZE]; + + /** + * The key we are storing under. + */ + GNUNET_HashCode key; + + /* put path (if tracked) */ + + /* Payload */ + +}; + + +/** + * P2P Result message + */ +struct PeerResultMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT + */ + struct GNUNET_MessageHeader header; + + /** + * Content type. + */ + uint32_t type GNUNET_PACKED; + + /** + * Length of the PUT path that follows (if tracked). + */ + uint32_t put_path_length GNUNET_PACKED; + + /** + * Length of the GET path that follows (if tracked). + */ + uint32_t get_path_length GNUNET_PACKED; + + /** + * When does the content expire? + */ + struct GNUNET_TIME_AbsoluteNBO expiration_time; + + /** + * The key of the corresponding GET request. + */ + GNUNET_HashCode key; + + /* put path (if tracked) */ + + /* get path (if tracked) */ + + /* Payload */ + +}; + + +/** + * P2P GET message + */ +struct PeerGetMessage +{ + /** + * Type: GNUNET_MESSAGE_TYPE_DHT_P2P_GET + */ + struct GNUNET_MessageHeader header; + + /** + * Processing options + */ + uint32_t options GNUNET_PACKED; + + /** + * Desired content type. + */ + uint32_t type GNUNET_PACKED; + + /** + * Hop count + */ + uint32_t hop_count GNUNET_PACKED; + + /** + * Desired replication level for this request. + */ + uint32_t desired_replication_level GNUNET_PACKED; + + /** + * Size of the extended query. + */ + uint32_t xquery_size; + + /** + * Bloomfilter mutator. + */ + uint32_t bf_mutator; + + /** + * Bloomfilter (for peer identities) to stop circular routes + */ + char bloomfilter[DHT_BLOOM_SIZE]; + + /** + * The key we are looking for. + */ + GNUNET_HashCode key; + + /* xquery */ + + /* result bloomfilter */ + +}; +GNUNET_NETWORK_STRUCT_END + +/** + * Linked list of messages to send to a particular other peer. + */ +struct P2PPendingMessage +{ + /** + * Pointer to next item in the list + */ + struct P2PPendingMessage *next; + + /** + * Pointer to previous item in the list + */ + struct P2PPendingMessage *prev; + + /** + * Message importance level. FIXME: used? useful? + */ + unsigned int importance; + + /** + * When does this message time out? + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Actual message to be sent, allocated at the end of the struct: + * // msg = (cast) &pm[1]; + * // memcpy (&pm[1], data, len); + */ + const struct GNUNET_MessageHeader *msg; + +}; + + +/** + * Entry for a peer in a bucket. + */ +struct PeerInfo +{ + /** + * Next peer entry (DLL) + */ + struct PeerInfo *next; + + /** + * Prev peer entry (DLL) + */ + struct PeerInfo *prev; + + /** + * Count of outstanding messages for peer. FIXME: NEEDED? + * FIXME: bound queue size!? + */ + unsigned int pending_count; + + /** + * Head of pending messages to be sent to this peer. + */ + struct P2PPendingMessage *head; + + /** + * Tail of pending messages to be sent to this peer. + */ + struct P2PPendingMessage *tail; + + /** + * Core handle for sending messages to this peer. + */ + struct GNUNET_CORE_TransmitHandle *th; + + /** + * Task for scheduling preference updates + */ + GNUNET_SCHEDULER_TaskIdentifier preference_task; + + /** + * What is the identity of the peer? + */ + struct GNUNET_PeerIdentity id; + +#if 0 + /** + * What is the average latency for replies received? + */ + struct GNUNET_TIME_Relative latency; + + /** + * Transport level distance to peer. + */ + unsigned int distance; +#endif + +}; + + +/** + * Peers are grouped into buckets. + */ +struct PeerBucket +{ + /** + * Head of DLL + */ + struct PeerInfo *head; + + /** + * Tail of DLL + */ + struct PeerInfo *tail; + + /** + * Number of peers in the bucket. + */ + unsigned int peers_size; +}; + + +/** + * The lowest currently used bucket, initially 0 (for 0-bits matching bucket). + */ +static unsigned int closest_bucket; + +/** + * How many peers have we added since we sent out our last + * find peer request? + */ +static unsigned int newly_found_peers; + +/** + * The buckets. Array of size MAX_BUCKET_SIZE. Offset 0 means 0 bits matching. + */ +static struct PeerBucket k_buckets[MAX_BUCKETS]; + +/** + * Hash map of all known peers, for easy removal from k_buckets on disconnect. + */ +static struct GNUNET_CONTAINER_MultiHashMap *all_known_peers; + +/** + * Maximum size for each bucket. + */ +static unsigned int bucket_size = DEFAULT_BUCKET_SIZE; + +/** + * Task that sends FIND PEER requests. + */ +static GNUNET_SCHEDULER_TaskIdentifier find_peer_task; + +/** + * Identity of this peer. + */ +static struct GNUNET_PeerIdentity my_identity; + +/** + * Handle to CORE. + */ +static struct GNUNET_CORE_Handle *coreAPI; + +/** + * Handle to ATS. + */ +static struct GNUNET_ATS_PerformanceHandle *atsAPI; + + + +/** + * Find the optimal bucket for this key. + * + * @param hc the hashcode to compare our identity to + * @return the proper bucket index, or GNUNET_SYSERR + * on error (same hashcode) + */ +static int +find_bucket (const GNUNET_HashCode * hc) +{ + unsigned int bits; + + bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, hc); + if (bits == MAX_BUCKETS) + { + /* How can all bits match? Got my own ID? */ + GNUNET_break (0); + return GNUNET_SYSERR; + } + return MAX_BUCKETS - bits - 1; +} + + +/** + * Let GNUnet core know that we like the given peer. + * + * @param cls the 'struct PeerInfo' of the peer + * @param tc scheduler context. + */ +static void +update_core_preference (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PeerInfo *peer = cls; + uint64_t preference; + unsigned int matching; + int bucket; + + peer->preference_task = GNUNET_SCHEDULER_NO_TASK; + if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) + return; + matching = + GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, + &peer->id.hashPubKey); + if (matching >= 64) + matching = 63; + bucket = find_bucket (&peer->id.hashPubKey); + if (bucket == GNUNET_SYSERR) + preference = 0; + else + { + GNUNET_assert (k_buckets[bucket].peers_size != 0); + preference = (1LL << matching) / k_buckets[bucket].peers_size; + } + if (preference == 0) + { + peer->preference_task = + GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL, + &update_core_preference, peer); + return; + } + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# Preference updates given to core"), + 1, GNUNET_NO); + GNUNET_ATS_change_preference (atsAPI, &peer->id, + GNUNET_ATS_PREFERENCE_BANDWIDTH, + (double) preference, GNUNET_ATS_PREFERENCE_END); + peer->preference_task = + GNUNET_SCHEDULER_add_delayed (DHT_DEFAULT_PREFERENCE_INTERVAL, + &update_core_preference, peer); + + +} + + +/** + * Closure for 'add_known_to_bloom'. + */ +struct BloomConstructorContext +{ + /** + * Bloom filter under construction. + */ + struct GNUNET_CONTAINER_BloomFilter *bloom; + + /** + * Mutator to use. + */ + uint32_t bf_mutator; +}; + + +/** + * Add each of the peers we already know to the bloom filter of + * the request so that we don't get duplicate HELLOs. + * + * @param cls the 'struct BloomConstructorContext'. + * @param key peer identity to add to the bloom filter + * @param value value the peer information (unused) + * @return GNUNET_YES (we should continue to iterate) + */ +static int +add_known_to_bloom (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct BloomConstructorContext *ctx = cls; + GNUNET_HashCode mh; + + GNUNET_BLOCK_mingle_hash (key, ctx->bf_mutator, &mh); +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding known peer (%s) to bloomfilter for FIND PEER with mutation %u\n", + GNUNET_h2s (key), ctx->bf_mutator); +#endif + GNUNET_CONTAINER_bloomfilter_add (ctx->bloom, &mh); + return GNUNET_YES; +} + + +/** + * Task to send a find peer message for our own peer identifier + * so that we can find the closest peers in the network to ourselves + * and attempt to connect to them. + * + * @param cls closure for this task + * @param tc the context under which the task is running + */ +static void +send_find_peer_message (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_TIME_Relative next_send_time; + struct BloomConstructorContext bcc; + struct GNUNET_CONTAINER_BloomFilter *peer_bf; + + find_peer_task = GNUNET_SCHEDULER_NO_TASK; + if ((tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN) != 0) + return; + if (newly_found_peers > bucket_size) + { + /* If we are finding many peers already, no need to send out our request right now! */ + find_peer_task = + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, + &send_find_peer_message, NULL); + newly_found_peers = 0; + return; + } + bcc.bf_mutator = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); + bcc.bloom = + GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, + GNUNET_CONSTANTS_BLOOMFILTER_K); + GNUNET_CONTAINER_multihashmap_iterate (all_known_peers, &add_known_to_bloom, + &bcc); + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# FIND PEER messages initiated"), 1, + GNUNET_NO); + peer_bf = + GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, + GNUNET_CONSTANTS_BLOOMFILTER_K); + // FIXME: pass priority!? + GDS_NEIGHBOURS_handle_get (GNUNET_BLOCK_TYPE_DHT_HELLO, + GNUNET_DHT_RO_FIND_PEER, + FIND_PEER_REPLICATION_LEVEL, 0, + &my_identity.hashPubKey, NULL, 0, bcc.bloom, + bcc.bf_mutator, peer_bf); + GNUNET_CONTAINER_bloomfilter_free (peer_bf); + GNUNET_CONTAINER_bloomfilter_free (bcc.bloom); + /* schedule next round */ + next_send_time.rel_value = + DHT_MINIMUM_FIND_PEER_INTERVAL.rel_value + + GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, + DHT_MAXIMUM_FIND_PEER_INTERVAL.rel_value / + (newly_found_peers + 1)); + newly_found_peers = 0; + find_peer_task = + GNUNET_SCHEDULER_add_delayed (next_send_time, &send_find_peer_message, + NULL); +} + + +/** + * Method called whenever a peer connects. + * + * @param cls closure + * @param peer peer identity this notification is about + * @param atsi performance data + * @param atsi_count number of records in 'atsi' + */ +static void +handle_core_connect (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + struct PeerInfo *ret; + int peer_bucket; + + /* Check for connect to self message */ + if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) + return; +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Connected %s to %s\n", + GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey)); +#endif + if (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (all_known_peers, + &peer->hashPubKey)) + { + GNUNET_break (0); + return; + } + GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), 1, + GNUNET_NO); + peer_bucket = find_bucket (&peer->hashPubKey); + GNUNET_assert ((peer_bucket >= 0) && (peer_bucket < MAX_BUCKETS)); + ret = GNUNET_malloc (sizeof (struct PeerInfo)); +#if 0 + ret->latency = latency; + ret->distance = distance; +#endif + ret->id = *peer; + GNUNET_CONTAINER_DLL_insert_tail (k_buckets[peer_bucket].head, + k_buckets[peer_bucket].tail, ret); + k_buckets[peer_bucket].peers_size++; + closest_bucket = GNUNET_MAX (closest_bucket, peer_bucket); + if ((peer_bucket > 0) && (k_buckets[peer_bucket].peers_size <= bucket_size)) + { + ret->preference_task = + GNUNET_SCHEDULER_add_now (&update_core_preference, ret); + newly_found_peers++; + } + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_multihashmap_put (all_known_peers, + &peer->hashPubKey, ret, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + if (1 == GNUNET_CONTAINER_multihashmap_size (all_known_peers)) + { + /* got a first connection, good time to start with FIND PEER requests... */ + find_peer_task = GNUNET_SCHEDULER_add_now (&send_find_peer_message, NULL); + } +} + + +/** + * Method called whenever a peer disconnects. + * + * @param cls closure + * @param peer peer identity this notification is about + */ +static void +handle_core_disconnect (void *cls, const struct GNUNET_PeerIdentity *peer) +{ + struct PeerInfo *to_remove; + int current_bucket; + struct P2PPendingMessage *pos; + unsigned int discarded; + + /* Check for disconnect from self message */ + if (0 == memcmp (&my_identity, peer, sizeof (struct GNUNET_PeerIdentity))) + return; +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnected %s from %s\n", + GNUNET_i2s (&my_identity), GNUNET_h2s (&peer->hashPubKey)); +#endif + to_remove = + GNUNET_CONTAINER_multihashmap_get (all_known_peers, &peer->hashPubKey); + if (NULL == to_remove) + { + GNUNET_break (0); + return; + } + GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# Peers connected"), -1, + GNUNET_NO); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (all_known_peers, + &peer->hashPubKey, + to_remove)); + if (GNUNET_SCHEDULER_NO_TASK != to_remove->preference_task) + { + GNUNET_SCHEDULER_cancel (to_remove->preference_task); + to_remove->preference_task = GNUNET_SCHEDULER_NO_TASK; + } + current_bucket = find_bucket (&to_remove->id.hashPubKey); + GNUNET_assert (current_bucket >= 0); + GNUNET_CONTAINER_DLL_remove (k_buckets[current_bucket].head, + k_buckets[current_bucket].tail, to_remove); + GNUNET_assert (k_buckets[current_bucket].peers_size > 0); + k_buckets[current_bucket].peers_size--; + while ((closest_bucket > 0) && (k_buckets[closest_bucket].peers_size == 0)) + closest_bucket--; + + if (to_remove->th != NULL) + { + GNUNET_CORE_notify_transmit_ready_cancel (to_remove->th); + to_remove->th = NULL; + } + discarded = 0; + while (NULL != (pos = to_remove->head)) + { + GNUNET_CONTAINER_DLL_remove (to_remove->head, to_remove->tail, pos); + discarded++; + GNUNET_free (pos); + } + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# Queued messages discarded (peer disconnected)"), + discarded, GNUNET_NO); + GNUNET_free (to_remove); +} + + +/** + * Called when core is ready to send a message we asked for + * out to the destination. + * + * @param cls the 'struct PeerInfo' of the target peer + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +core_transmit_notify (void *cls, size_t size, void *buf) +{ + struct PeerInfo *peer = cls; + char *cbuf = buf; + struct P2PPendingMessage *pending; + size_t off; + size_t msize; + + peer->th = NULL; + while ((NULL != (pending = peer->head)) && + (GNUNET_TIME_absolute_get_remaining (pending->timeout).rel_value == 0)) + { + peer->pending_count--; + GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending); + GNUNET_free (pending); + } + if (pending == NULL) + { + /* no messages pending */ + return 0; + } + if (buf == NULL) + { + peer->th = + GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES, + pending->importance, + GNUNET_TIME_absolute_get_remaining + (pending->timeout), &peer->id, + ntohs (pending->msg->size), + &core_transmit_notify, peer); + GNUNET_break (NULL != peer->th); + return 0; + } + off = 0; + while ((NULL != (pending = peer->head)) && + (size - off >= (msize = ntohs (pending->msg->size)))) + { + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# Bytes transmitted to other peers"), msize, + GNUNET_NO); + memcpy (&cbuf[off], pending->msg, msize); + off += msize; + peer->pending_count--; + GNUNET_CONTAINER_DLL_remove (peer->head, peer->tail, pending); + GNUNET_free (pending); + } + if (peer->head != NULL) + { + peer->th = + GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES, + pending->importance, + GNUNET_TIME_absolute_get_remaining + (pending->timeout), &peer->id, msize, + &core_transmit_notify, peer); + GNUNET_break (NULL != peer->th); + } + return off; +} + + +/** + * Transmit all messages in the peer's message queue. + * + * @param peer message queue to process + */ +static void +process_peer_queue (struct PeerInfo *peer) +{ + struct P2PPendingMessage *pending; + + if (NULL == (pending = peer->head)) + return; + if (NULL != peer->th) + return; + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# Bytes of bandwdith requested from core"), + ntohs (pending->msg->size), GNUNET_NO); + peer->th = + GNUNET_CORE_notify_transmit_ready (coreAPI, GNUNET_YES, + pending->importance, + GNUNET_TIME_absolute_get_remaining + (pending->timeout), &peer->id, + ntohs (pending->msg->size), + &core_transmit_notify, peer); + GNUNET_break (NULL != peer->th); +} + + +/** + * To how many peers should we (on average) forward the request to + * obtain the desired target_replication count (on average). + * + * @param hop_count number of hops the message has traversed + * @param target_replication the number of total paths desired + * @return Some number of peers to forward the message to + */ +static unsigned int +get_forward_count (uint32_t hop_count, uint32_t target_replication) +{ + uint32_t random_value; + uint32_t forward_count; + float target_value; + + if (hop_count > GDS_NSE_get () * 6.0) + { + /* forcefully terminate */ + return 0; + } + if (hop_count > GDS_NSE_get () * 4.0) + { + /* Once we have reached our ideal number of hops, only forward to 1 peer */ + return 1; + } + /* bound by system-wide maximum */ + target_replication = + GNUNET_MIN (MAXIMUM_REPLICATION_LEVEL, target_replication); + target_value = + 1 + (target_replication - 1.0) / (GDS_NSE_get () + + ((float) (target_replication - 1.0) * + hop_count)); + /* Set forward count to floor of target_value */ + forward_count = (uint32_t) target_value; + /* Subtract forward_count (floor) from target_value (yields value between 0 and 1) */ + target_value = target_value - forward_count; + random_value = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, UINT32_MAX); + if (random_value < (target_value * UINT32_MAX)) + forward_count++; + return forward_count; +} + + +/** + * Compute the distance between have and target as a 32-bit value. + * Differences in the lower bits must count stronger than differences + * in the higher bits. + * + * @return 0 if have==target, otherwise a number + * that is larger as the distance between + * the two hash codes increases + */ +static unsigned int +get_distance (const GNUNET_HashCode * target, const GNUNET_HashCode * have) +{ + unsigned int bucket; + unsigned int msb; + unsigned int lsb; + unsigned int i; + + /* We have to represent the distance between two 2^9 (=512)-bit + * numbers as a 2^5 (=32)-bit number with "0" being used for the + * two numbers being identical; furthermore, we need to + * guarantee that a difference in the number of matching + * bits is always represented in the result. + * + * We use 2^32/2^9 numerical values to distinguish between + * hash codes that have the same LSB bit distance and + * use the highest 2^9 bits of the result to signify the + * number of (mis)matching LSB bits; if we have 0 matching + * and hence 512 mismatching LSB bits we return -1 (since + * 512 itself cannot be represented with 9 bits) */ + + /* first, calculate the most significant 9 bits of our + * result, aka the number of LSBs */ + bucket = GNUNET_CRYPTO_hash_matching_bits (target, have); + /* bucket is now a value between 0 and 512 */ + if (bucket == 512) + return 0; /* perfect match */ + if (bucket == 0) + return (unsigned int) -1; /* LSB differs; use max (if we did the bit-shifting + * below, we'd end up with max+1 (overflow)) */ + + /* calculate the most significant bits of the final result */ + msb = (512 - bucket) << (32 - 9); + /* calculate the 32-9 least significant bits of the final result by + * looking at the differences in the 32-9 bits following the + * mismatching bit at 'bucket' */ + lsb = 0; + for (i = bucket + 1; + (i < sizeof (GNUNET_HashCode) * 8) && (i < bucket + 1 + 32 - 9); i++) + { + if (GNUNET_CRYPTO_hash_get_bit (target, i) != + GNUNET_CRYPTO_hash_get_bit (have, i)) + lsb |= (1 << (bucket + 32 - 9 - i)); /* first bit set will be 10, + * last bit set will be 31 -- if + * i does not reach 512 first... */ + } + return msb | lsb; +} + + +/** + * Check whether my identity is closer than any known peers. If a + * non-null bloomfilter is given, check if this is the closest peer + * that hasn't already been routed to. + * + * @param key hash code to check closeness to + * @param bloom bloomfilter, exclude these entries from the decision + * @return GNUNET_YES if node location is closest, + * GNUNET_NO otherwise. + */ +static int +am_closest_peer (const GNUNET_HashCode * key, + const struct GNUNET_CONTAINER_BloomFilter *bloom) +{ + int bits; + int other_bits; + int bucket_num; + int count; + struct PeerInfo *pos; + + if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode))) + return GNUNET_YES; + bucket_num = find_bucket (key); + GNUNET_assert (bucket_num >= 0); + bits = GNUNET_CRYPTO_hash_matching_bits (&my_identity.hashPubKey, key); + pos = k_buckets[bucket_num].head; + count = 0; + while ((pos != NULL) && (count < bucket_size)) + { + if ((bloom != NULL) && + (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))) + { + pos = pos->next; + continue; /* Skip already checked entries */ + } + other_bits = GNUNET_CRYPTO_hash_matching_bits (&pos->id.hashPubKey, key); + if (other_bits > bits) + return GNUNET_NO; + if (other_bits == bits) /* We match the same number of bits */ + return GNUNET_YES; + pos = pos->next; + } + /* No peers closer, we are the closest! */ + return GNUNET_YES; +} + + +/** + * Select a peer from the routing table that would be a good routing + * destination for sending a message for "key". The resulting peer + * must not be in the set of blocked peers.<p> + * + * Note that we should not ALWAYS select the closest peer to the + * target, peers further away from the target should be chosen with + * exponentially declining probability. + * + * FIXME: double-check that this is fine + * + * + * @param key the key we are selecting a peer to route to + * @param bloom a bloomfilter containing entries this request has seen already + * @param hops how many hops has this message traversed thus far + * @return Peer to route to, or NULL on error + */ +static struct PeerInfo * +select_peer (const GNUNET_HashCode * key, + const struct GNUNET_CONTAINER_BloomFilter *bloom, uint32_t hops) +{ + unsigned int bc; + unsigned int count; + unsigned int selected; + struct PeerInfo *pos; + unsigned int dist; + unsigned int smallest_distance; + struct PeerInfo *chosen; + + if (hops >= GDS_NSE_get ()) + { + /* greedy selection (closest peer that is not in bloomfilter) */ + smallest_distance = UINT_MAX; + chosen = NULL; + for (bc = 0; bc <= closest_bucket; bc++) + { + pos = k_buckets[bc].head; + count = 0; + while ((pos != NULL) && (count < bucket_size)) + { + if ((bloom == NULL) || + (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))) + { + dist = get_distance (key, &pos->id.hashPubKey); + if (dist < smallest_distance) + { + chosen = pos; + smallest_distance = dist; + } + } + else + { +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Excluded peer `%s' due to BF match in greedy routing for %s\n", + GNUNET_i2s (&pos->id), GNUNET_h2s (key)); +#endif + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# Peers excluded from routing due to Bloomfilter"), + 1, GNUNET_NO); + } + count++; + pos = pos->next; + } + } + if (NULL == chosen) + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# Peer selection failed"), 1, + GNUNET_NO); + return chosen; + } + + /* select "random" peer */ + /* count number of peers that are available and not filtered */ + count = 0; + for (bc = 0; bc <= closest_bucket; bc++) + { + pos = k_buckets[bc].head; + while ((pos != NULL) && (count < bucket_size)) + { + if ((bloom != NULL) && + (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))) + { + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# Peers excluded from routing due to Bloomfilter"), + 1, GNUNET_NO); +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Excluded peer `%s' due to BF match in random routing for %s\n", + GNUNET_i2s (&pos->id), GNUNET_h2s (key)); +#endif + pos = pos->next; + continue; /* Ignore bloomfiltered peers */ + } + count++; + pos = pos->next; + } + } + if (count == 0) /* No peers to select from! */ + { + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# Peer selection failed"), 1, + GNUNET_NO); + return NULL; + } + /* Now actually choose a peer */ + selected = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, count); + count = 0; + for (bc = 0; bc <= closest_bucket; bc++) + { + pos = k_buckets[bc].head; + while ((pos != NULL) && (count < bucket_size)) + { + if ((bloom != NULL) && + (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bloom, &pos->id.hashPubKey))) + { + pos = pos->next; + continue; /* Ignore bloomfiltered peers */ + } + if (0 == selected--) + return pos; + pos = pos->next; + } + } + GNUNET_break (0); + return NULL; +} + + +/** + * Compute the set of peers that the given request should be + * forwarded to. + * + * @param key routing key + * @param bloom bloom filter excluding peers as targets, all selected + * peers will be added to the bloom filter + * @param hop_count number of hops the request has traversed so far + * @param target_replication desired number of replicas + * @param targets where to store an array of target peers (to be + * free'd by the caller) + * @return number of peers returned in 'targets'. + */ +static unsigned int +get_target_peers (const GNUNET_HashCode * key, + struct GNUNET_CONTAINER_BloomFilter *bloom, + uint32_t hop_count, uint32_t target_replication, + struct PeerInfo ***targets) +{ + unsigned int ret; + unsigned int off; + struct PeerInfo **rtargets; + struct PeerInfo *nxt; + + GNUNET_assert (NULL != bloom); + ret = get_forward_count (hop_count, target_replication); + if (ret == 0) + { + *targets = NULL; + return 0; + } + rtargets = GNUNET_malloc (sizeof (struct PeerInfo *) * ret); + for (off = 0; off < ret; off++) + { + nxt = select_peer (key, bloom, hop_count); + if (nxt == NULL) + break; + rtargets[off] = nxt; + GNUNET_break (GNUNET_NO == + GNUNET_CONTAINER_bloomfilter_test (bloom, + &nxt->id.hashPubKey)); + GNUNET_CONTAINER_bloomfilter_add (bloom, &rtargets[off]->id.hashPubKey); + } +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Selected %u/%u peers at hop %u for %s (target was %u)\n", off, + GNUNET_CONTAINER_multihashmap_size (all_known_peers), + (unsigned int) hop_count, GNUNET_h2s (key), ret); +#endif + if (0 == off) + { + GNUNET_free (rtargets); + *targets = NULL; + return 0; + } + *targets = rtargets; + return off; +} + + +/** + * Perform a PUT operation. Forwards the given request to other + * peers. Does not store the data locally. Does not give the + * data to local clients. May do nothing if this is the only + * peer in the network (or if we are the closest peer in the + * network). + * + * @param type type of the block + * @param options routing options + * @param desired_replication_level desired replication count + * @param expiration_time when does the content expire + * @param hop_count how many hops has this message traversed so far + * @param bf Bloom filter of peers this PUT has already traversed + * @param key key for the content + * @param put_path_length number of entries in put_path + * @param put_path peers this request has traversed so far (if tracked) + * @param data payload to store + * @param data_size number of bytes in data + */ +void +GDS_NEIGHBOURS_handle_put (enum GNUNET_BLOCK_Type type, + enum GNUNET_DHT_RouteOption options, + uint32_t desired_replication_level, + struct GNUNET_TIME_Absolute expiration_time, + uint32_t hop_count, + struct GNUNET_CONTAINER_BloomFilter *bf, + const GNUNET_HashCode * key, + unsigned int put_path_length, + struct GNUNET_PeerIdentity *put_path, + const void *data, size_t data_size) +{ + unsigned int target_count; + unsigned int i; + struct PeerInfo **targets; + struct PeerInfo *target; + struct P2PPendingMessage *pending; + size_t msize; + struct PeerPutMessage *ppm; + struct GNUNET_PeerIdentity *pp; + + GNUNET_assert (NULL != bf); +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding myself (%s) to PUT bloomfilter for %s\n", + GNUNET_i2s (&my_identity), GNUNET_h2s (key)); +#endif + GNUNET_CONTAINER_bloomfilter_add (bf, &my_identity.hashPubKey); + GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# PUT requests routed"), + 1, GNUNET_NO); + target_count = + get_target_peers (key, bf, hop_count, desired_replication_level, + &targets); + if (0 == target_count) + { +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Routing PUT for %s terminates after %u hops at %s\n", + GNUNET_h2s (key), (unsigned int) hop_count, + GNUNET_i2s (&my_identity)); +#endif + return; + } + msize = + put_path_length * sizeof (struct GNUNET_PeerIdentity) + data_size + + sizeof (struct PeerPutMessage); + if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + put_path_length = 0; + msize = data_size + sizeof (struct PeerPutMessage); + } + if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + GNUNET_free (targets); + return; + } + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# PUT messages queued for transmission"), + target_count, GNUNET_NO); + for (i = 0; i < target_count; i++) + { + target = targets[i]; +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Routing PUT for %s after %u hops to %s\n", GNUNET_h2s (key), + (unsigned int) hop_count, GNUNET_i2s (&target->id)); +#endif + pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); + pending->importance = 0; /* FIXME */ + pending->timeout = expiration_time; + ppm = (struct PeerPutMessage *) &pending[1]; + pending->msg = &ppm->header; + ppm->header.size = htons (msize); + ppm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_PUT); + ppm->options = htonl (options); + ppm->type = htonl (type); + ppm->hop_count = htonl (hop_count + 1); + ppm->desired_replication_level = htonl (desired_replication_level); + ppm->put_path_length = htonl (put_path_length); + ppm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time); + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bf, + &target->id.hashPubKey)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_bloomfilter_get_raw_data (bf, + ppm->bloomfilter, + DHT_BLOOM_SIZE)); + ppm->key = *key; + pp = (struct GNUNET_PeerIdentity *) &ppm[1]; + memcpy (pp, put_path, + sizeof (struct GNUNET_PeerIdentity) * put_path_length); + memcpy (&pp[put_path_length], data, data_size); + GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending); + target->pending_count++; + process_peer_queue (target); + } + GNUNET_free (targets); +} + + +/** + * Perform a GET operation. Forwards the given request to other + * peers. Does not lookup the key locally. May do nothing if this is + * the only peer in the network (or if we are the closest peer in the + * network). + * + * @param type type of the block + * @param options routing options + * @param desired_replication_level desired replication count + * @param hop_count how many hops did this request traverse so far? + * @param key key for the content + * @param xquery extended query + * @param xquery_size number of bytes in xquery + * @param reply_bf bloomfilter to filter duplicates + * @param reply_bf_mutator mutator for reply_bf + * @param peer_bf filter for peers not to select (again) + */ +void +GDS_NEIGHBOURS_handle_get (enum GNUNET_BLOCK_Type type, + enum GNUNET_DHT_RouteOption options, + uint32_t desired_replication_level, + uint32_t hop_count, const GNUNET_HashCode * key, + const void *xquery, size_t xquery_size, + const struct GNUNET_CONTAINER_BloomFilter *reply_bf, + uint32_t reply_bf_mutator, + struct GNUNET_CONTAINER_BloomFilter *peer_bf) +{ + unsigned int target_count; + unsigned int i; + struct PeerInfo **targets; + struct PeerInfo *target; + struct P2PPendingMessage *pending; + size_t msize; + struct PeerGetMessage *pgm; + char *xq; + size_t reply_bf_size; + + GNUNET_assert (NULL != peer_bf); + GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# GET requests routed"), + 1, GNUNET_NO); + target_count = + get_target_peers (key, peer_bf, hop_count, desired_replication_level, + &targets); +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding myself (%s) to GET bloomfilter for %s\n", + GNUNET_i2s (&my_identity), GNUNET_h2s (key)); +#endif + GNUNET_CONTAINER_bloomfilter_add (peer_bf, &my_identity.hashPubKey); + if (0 == target_count) + { +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Routing GET for %s terminates after %u hops at %s\n", + GNUNET_h2s (key), (unsigned int) hop_count, + GNUNET_i2s (&my_identity)); +#endif + return; + } + reply_bf_size = GNUNET_CONTAINER_bloomfilter_get_size (reply_bf); + msize = xquery_size + sizeof (struct PeerGetMessage) + reply_bf_size; + if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) + { + GNUNET_break (0); + GNUNET_free (targets); + return; + } + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# GET messages queued for transmission"), + target_count, GNUNET_NO); + /* forward request */ + for (i = 0; i < target_count; i++) + { + target = targets[i]; +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Routing GET for %s after %u hops to %s\n", GNUNET_h2s (key), + (unsigned int) hop_count, GNUNET_i2s (&target->id)); +#endif + pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); + pending->importance = 0; /* FIXME */ + pending->timeout = GNUNET_TIME_relative_to_absolute (GET_TIMEOUT); + pgm = (struct PeerGetMessage *) &pending[1]; + pending->msg = &pgm->header; + pgm->header.size = htons (msize); + pgm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_GET); + pgm->options = htonl (options); + pgm->type = htonl (type); + pgm->hop_count = htonl (hop_count + 1); + pgm->desired_replication_level = htonl (desired_replication_level); + pgm->xquery_size = htonl (xquery_size); + pgm->bf_mutator = reply_bf_mutator; + GNUNET_break (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (peer_bf, + &target->id.hashPubKey)); + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_bloomfilter_get_raw_data (peer_bf, + pgm->bloomfilter, + DHT_BLOOM_SIZE)); + pgm->key = *key; + xq = (char *) &pgm[1]; + memcpy (xq, xquery, xquery_size); + if (NULL != reply_bf) + GNUNET_assert (GNUNET_OK == + GNUNET_CONTAINER_bloomfilter_get_raw_data (reply_bf, + &xq + [xquery_size], + reply_bf_size)); + GNUNET_CONTAINER_DLL_insert_tail (target->head, target->tail, pending); + target->pending_count++; + process_peer_queue (target); + } + GNUNET_free (targets); +} + + +/** + * Handle a reply (route to origin). Only forwards the reply back to + * the given peer. Does not do local caching or forwarding to local + * clients. + * + * @param target neighbour that should receive the block (if still connected) + * @param type type of the block + * @param expiration_time when does the content expire + * @param key key for the content + * @param put_path_length number of entries in put_path + * @param put_path peers the original PUT traversed (if tracked) + * @param get_path_length number of entries in put_path + * @param get_path peers this reply has traversed so far (if tracked) + * @param data payload of the reply + * @param data_size number of bytes in data + */ +void +GDS_NEIGHBOURS_handle_reply (const struct GNUNET_PeerIdentity *target, + enum GNUNET_BLOCK_Type type, + struct GNUNET_TIME_Absolute expiration_time, + const GNUNET_HashCode * key, + unsigned int put_path_length, + const struct GNUNET_PeerIdentity *put_path, + unsigned int get_path_length, + const struct GNUNET_PeerIdentity *get_path, + const void *data, size_t data_size) +{ + struct PeerInfo *pi; + struct P2PPendingMessage *pending; + size_t msize; + struct PeerResultMessage *prm; + struct GNUNET_PeerIdentity *paths; + + msize = + data_size + sizeof (struct PeerResultMessage) + (get_path_length + + put_path_length) * + sizeof (struct GNUNET_PeerIdentity); + if ((msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) || + (get_path_length > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || + (put_path_length > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || + (data_size > GNUNET_SERVER_MAX_MESSAGE_SIZE)) + { + GNUNET_break (0); + return; + } + pi = GNUNET_CONTAINER_multihashmap_get (all_known_peers, &target->hashPubKey); + if (NULL == pi) + { + /* peer disconnected in the meantime, drop reply */ + return; + } + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# RESULT messages queued for transmission"), 1, + GNUNET_NO); + pending = GNUNET_malloc (sizeof (struct P2PPendingMessage) + msize); + pending->importance = 0; /* FIXME */ + pending->timeout = expiration_time; + prm = (struct PeerResultMessage *) &pending[1]; + pending->msg = &prm->header; + prm->header.size = htons (msize); + prm->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT); + prm->type = htonl (type); + prm->put_path_length = htonl (put_path_length); + prm->get_path_length = htonl (get_path_length); + prm->expiration_time = GNUNET_TIME_absolute_hton (expiration_time); + prm->key = *key; + paths = (struct GNUNET_PeerIdentity *) &prm[1]; + memcpy (paths, put_path, + put_path_length * sizeof (struct GNUNET_PeerIdentity)); + memcpy (&paths[put_path_length], get_path, + get_path_length * sizeof (struct GNUNET_PeerIdentity)); + memcpy (&paths[put_path_length + get_path_length], data, data_size); + GNUNET_CONTAINER_DLL_insert (pi->head, pi->tail, pending); + pi->pending_count++; + process_peer_queue (pi); +} + + +/** + * To be called on core init/fail. + * + * @param cls service closure + * @param server handle to the server for this service + * @param identity the public identity of this peer + */ +static void +core_init (void *cls, struct GNUNET_CORE_Handle *server, + const struct GNUNET_PeerIdentity *identity) +{ + GNUNET_assert (server != NULL); + my_identity = *identity; +} + + +/** + * Core handler for p2p put requests. + * + * @param cls closure + * @param peer sender of the request + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * @param atsi_count number of records in 'atsi' + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_dht_p2p_put (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + const struct PeerPutMessage *put; + const struct GNUNET_PeerIdentity *put_path; + const void *payload; + uint32_t putlen; + uint16_t msize; + size_t payload_size; + enum GNUNET_DHT_RouteOption options; + struct GNUNET_CONTAINER_BloomFilter *bf; + GNUNET_HashCode test_key; + + msize = ntohs (message->size); + if (msize < sizeof (struct PeerPutMessage)) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + put = (const struct PeerPutMessage *) message; + putlen = ntohl (put->put_path_length); + if ((msize < + sizeof (struct PeerPutMessage) + + putlen * sizeof (struct GNUNET_PeerIdentity)) || + (putlen > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# P2P PUT requests received"), 1, + GNUNET_NO); + put_path = (const struct GNUNET_PeerIdentity *) &put[1]; + payload = &put_path[putlen]; + options = ntohl (put->options); + payload_size = + msize - (sizeof (struct PeerPutMessage) + + putlen * sizeof (struct GNUNET_PeerIdentity)); + switch (GNUNET_BLOCK_get_key + (GDS_block_context, ntohl (put->type), payload, payload_size, + &test_key)) + { + case GNUNET_YES: + if (0 != memcmp (&test_key, &put->key, sizeof (GNUNET_HashCode))) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + break; + case GNUNET_NO: + GNUNET_break_op (0); + return GNUNET_YES; + case GNUNET_SYSERR: + /* cannot verify, good luck */ + break; + } +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "PUT for %s at %s\n", + GNUNET_h2s (&put->key), GNUNET_i2s (&my_identity)); +#endif + bf = GNUNET_CONTAINER_bloomfilter_init (put->bloomfilter, DHT_BLOOM_SIZE, + GNUNET_CONSTANTS_BLOOMFILTER_K); + GNUNET_break_op (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (bf, &peer->hashPubKey)); + { + struct GNUNET_PeerIdentity pp[putlen + 1]; + + /* extend 'put path' by sender */ + if (0 != (options & GNUNET_DHT_RO_RECORD_ROUTE)) + { + memcpy (pp, put_path, putlen * sizeof (struct GNUNET_PeerIdentity)); + pp[putlen] = *peer; + putlen++; + } + else + putlen = 0; + + /* give to local clients */ + GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (put->expiration_time), + &put->key, 0, NULL, putlen, pp, ntohl (put->type), + payload_size, payload); + /* store locally */ + if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || + (am_closest_peer (&put->key, bf))) + GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh + (put->expiration_time), &put->key, putlen, pp, + ntohl (put->type), payload_size, payload); + /* route to other peers */ + GDS_NEIGHBOURS_handle_put (ntohl (put->type), options, + ntohl (put->desired_replication_level), + GNUNET_TIME_absolute_ntoh (put->expiration_time), + ntohl (put->hop_count), bf, &put->key, putlen, + pp, payload, payload_size); + } + GNUNET_CONTAINER_bloomfilter_free (bf); + GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT, + GNUNET_TIME_absolute_ntoh (put->expiration_time), &put->key, + putlen, put_path, 0, NULL, ntohl(put->desired_replication_level), + ntohl (put->type), payload, payload_size); + return GNUNET_YES; +} + + +/** + * We have received a FIND PEER request. Send matching + * HELLOs back. + * + * @param sender sender of the FIND PEER request + * @param key peers close to this key are desired + * @param bf peers matching this bf are excluded + * @param bf_mutator mutator for bf + */ +static void +handle_find_peer (const struct GNUNET_PeerIdentity *sender, + const GNUNET_HashCode * key, + struct GNUNET_CONTAINER_BloomFilter *bf, uint32_t bf_mutator) +{ + int bucket_idx; + struct PeerBucket *bucket; + struct PeerInfo *peer; + unsigned int choice; + GNUNET_HashCode mhash; + const struct GNUNET_HELLO_Message *hello; + + /* first, check about our own HELLO */ + if (NULL != GDS_my_hello) + { + GNUNET_BLOCK_mingle_hash (&my_identity.hashPubKey, bf_mutator, &mhash); + if ((NULL == bf) || + (GNUNET_YES != GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))) + { + GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO, + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), + key, 0, NULL, 0, NULL, GDS_my_hello, + GNUNET_HELLO_size ((const struct + GNUNET_HELLO_Message *) + GDS_my_hello)); + } + else + { + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# FIND PEER requests ignored due to Bloomfilter"), + 1, GNUNET_NO); + } + } + else + { + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# FIND PEER requests ignored due to lack of HELLO"), + 1, GNUNET_NO); + } + + /* then, also consider sending a random HELLO from the closest bucket */ + if (0 == memcmp (&my_identity.hashPubKey, key, sizeof (GNUNET_HashCode))) + bucket_idx = closest_bucket; + else + bucket_idx = GNUNET_MIN (closest_bucket, find_bucket (key)); + if (bucket_idx == GNUNET_SYSERR) + return; + bucket = &k_buckets[bucket_idx]; + if (bucket->peers_size == 0) + return; + choice = + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, bucket->peers_size); + peer = bucket->head; + while (choice > 0) + { + GNUNET_assert (peer != NULL); + peer = peer->next; + choice--; + } + choice = bucket->peers_size; + do + { + peer = peer->next; + if (choice-- == 0) + return; /* no non-masked peer available */ + if (peer == NULL) + peer = bucket->head; + GNUNET_BLOCK_mingle_hash (&peer->id.hashPubKey, bf_mutator, &mhash); + hello = GDS_HELLO_get (&peer->id); + } + while ((hello == NULL) || + (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (bf, &mhash))); + GDS_NEIGHBOURS_handle_reply (sender, GNUNET_BLOCK_TYPE_DHT_HELLO, + GNUNET_TIME_relative_to_absolute + (GNUNET_CONSTANTS_HELLO_ADDRESS_EXPIRATION), key, + 0, NULL, 0, NULL, hello, + GNUNET_HELLO_size (hello)); +} + + +/** + * Core handler for p2p get requests. + * + * @param cls closure + * @param peer sender of the request + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * @param atsi_count number of records in 'atsi' + * @return GNUNET_OK to keep the connection open, + * GNUNET_SYSERR to close it (signal serious error) + */ +static int +handle_dht_p2p_get (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + struct PeerGetMessage *get; + uint32_t xquery_size; + size_t reply_bf_size; + uint16_t msize; + enum GNUNET_BLOCK_Type type; + enum GNUNET_DHT_RouteOption options; + enum GNUNET_BLOCK_EvaluationResult eval; + struct GNUNET_CONTAINER_BloomFilter *reply_bf; + struct GNUNET_CONTAINER_BloomFilter *peer_bf; + const char *xquery; + + GNUNET_break (0 != + memcmp (peer, &my_identity, + sizeof (struct GNUNET_PeerIdentity))); + /* parse and validate message */ + msize = ntohs (message->size); + if (msize < sizeof (struct PeerGetMessage)) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + get = (struct PeerGetMessage *) message; + xquery_size = ntohl (get->xquery_size); + if (msize < sizeof (struct PeerGetMessage) + xquery_size) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# P2P GET requests received"), 1, + GNUNET_NO); + reply_bf_size = msize - (sizeof (struct PeerGetMessage) + xquery_size); + type = ntohl (get->type); + options = ntohl (get->options); + xquery = (const char *) &get[1]; + reply_bf = NULL; + if (reply_bf_size > 0) + reply_bf = + GNUNET_CONTAINER_bloomfilter_init (&xquery[xquery_size], reply_bf_size, + GNUNET_CONSTANTS_BLOOMFILTER_K); + eval = + GNUNET_BLOCK_evaluate (GDS_block_context, type, &get->key, &reply_bf, + get->bf_mutator, xquery, xquery_size, NULL, 0); + if (eval != GNUNET_BLOCK_EVALUATION_REQUEST_VALID) + { + /* request invalid or block type not supported */ + GNUNET_break_op (eval == GNUNET_BLOCK_EVALUATION_TYPE_NOT_SUPPORTED); + if (NULL != reply_bf) + GNUNET_CONTAINER_bloomfilter_free (reply_bf); + return GNUNET_YES; + } + peer_bf = + GNUNET_CONTAINER_bloomfilter_init (get->bloomfilter, DHT_BLOOM_SIZE, + GNUNET_CONSTANTS_BLOOMFILTER_K); + GNUNET_break_op (GNUNET_YES == + GNUNET_CONTAINER_bloomfilter_test (peer_bf, + &peer->hashPubKey)); + /* remember request for routing replies */ + GDS_ROUTING_add (peer, type, options, &get->key, xquery, xquery_size, + reply_bf, get->bf_mutator); +#if DEBUG_DHT + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "GET for %s at %s after %u hops\n", + GNUNET_h2s (&get->key), GNUNET_i2s (&my_identity), + (unsigned int) ntohl (get->hop_count)); +#endif + /* local lookup (this may update the reply_bf) */ + if ((0 != (options & GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE)) || + (am_closest_peer (&get->key, peer_bf))) + { + if ((0 != (options & GNUNET_DHT_RO_FIND_PEER))) + { + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop + ("# P2P FIND PEER requests processed"), 1, + GNUNET_NO); + handle_find_peer (peer, &get->key, reply_bf, get->bf_mutator); + } + else + { + eval = + GDS_DATACACHE_handle_get (&get->key, type, xquery, xquery_size, + &reply_bf, get->bf_mutator); + } + } + else + { + GNUNET_STATISTICS_update (GDS_stats, + gettext_noop ("# P2P GET requests ONLY routed"), + 1, GNUNET_NO); + } + + GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, + GNUNET_TIME_UNIT_FOREVER_ABS, &get->key, 0, NULL, 0, NULL, + ntohl (get->desired_replication_level), type, NULL, 0); + + /* P2P forwarding */ + if (eval != GNUNET_BLOCK_EVALUATION_OK_LAST) + GDS_NEIGHBOURS_handle_get (type, options, + ntohl (get->desired_replication_level), + ntohl (get->hop_count), &get->key, xquery, + xquery_size, reply_bf, get->bf_mutator, peer_bf); + /* clean up */ + if (NULL != reply_bf) + GNUNET_CONTAINER_bloomfilter_free (reply_bf); + GNUNET_CONTAINER_bloomfilter_free (peer_bf); + return GNUNET_YES; +} + + +/** + * Core handler for p2p result messages. + * + * @param cls closure + * @param message message + * @param peer peer identity this notification is about + * @param atsi performance data + * @param atsi_count number of records in 'atsi' + * @return GNUNET_YES (do not cut p2p connection) + */ +static int +handle_dht_p2p_result (void *cls, const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_MessageHeader *message, + const struct GNUNET_ATS_Information *atsi, + unsigned int atsi_count) +{ + const struct PeerResultMessage *prm; + const struct GNUNET_PeerIdentity *put_path; + const struct GNUNET_PeerIdentity *get_path; + const void *data; + uint32_t get_path_length; + uint32_t put_path_length; + uint16_t msize; + size_t data_size; + enum GNUNET_BLOCK_Type type; + + /* parse and validate message */ + msize = ntohs (message->size); + if (msize < sizeof (struct PeerResultMessage)) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + prm = (struct PeerResultMessage *) message; + put_path_length = ntohl (prm->put_path_length); + get_path_length = ntohl (prm->get_path_length); + if ((msize < + sizeof (struct PeerResultMessage) + (get_path_length + + put_path_length) * + sizeof (struct GNUNET_PeerIdentity)) || + (get_path_length > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity)) || + (put_path_length > + GNUNET_SERVER_MAX_MESSAGE_SIZE / sizeof (struct GNUNET_PeerIdentity))) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + GNUNET_STATISTICS_update (GDS_stats, gettext_noop ("# P2P RESULTS received"), + 1, GNUNET_NO); + put_path = (const struct GNUNET_PeerIdentity *) &prm[1]; + get_path = &put_path[put_path_length]; + type = ntohl (prm->type); + data = (const void *) &get_path[get_path_length]; + data_size = + msize - (sizeof (struct PeerResultMessage) + + (get_path_length + + put_path_length) * sizeof (struct GNUNET_PeerIdentity)); + + /* if we got a HELLO, consider it for our own routing table */ + if (type == GNUNET_BLOCK_TYPE_DHT_HELLO) + { + const struct GNUNET_MessageHeader *h; + struct GNUNET_PeerIdentity pid; + int bucket; + + /* Should be a HELLO, validate and consider using it! */ + if (data_size < sizeof (struct GNUNET_MessageHeader)) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + h = data; + if (data_size != ntohs (h->size)) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + if (GNUNET_OK != + GNUNET_HELLO_get_id ((const struct GNUNET_HELLO_Message *) h, &pid)) + { + GNUNET_break_op (0); + return GNUNET_YES; + } + if (0 != memcmp (&my_identity, &pid, sizeof (struct GNUNET_PeerIdentity))) + { + bucket = find_bucket (&pid.hashPubKey); + if ((bucket >= 0) && (k_buckets[bucket].peers_size < bucket_size)) + { + if (NULL != GDS_transport_handle) + { + GNUNET_TRANSPORT_offer_hello (GDS_transport_handle, h, NULL, NULL); + GNUNET_TRANSPORT_try_connect (GDS_transport_handle, &pid); + } + } + } + } + + /* append 'peer' to 'get_path' */ + { + struct GNUNET_PeerIdentity xget_path[get_path_length + 1]; + + memcpy (xget_path, get_path, + get_path_length * sizeof (struct GNUNET_PeerIdentity)); + xget_path[get_path_length] = *peer; + get_path_length++; + + /* forward to local clients */ + GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (prm->expiration_time), + &prm->key, get_path_length, xget_path, + put_path_length, put_path, type, data_size, data); + + /* forward to other peers */ + GDS_ROUTING_process (type, GNUNET_TIME_absolute_ntoh (prm->expiration_time), + &prm->key, put_path_length, put_path, get_path_length, + xget_path, data, data_size); + } + + GDS_CLIENTS_process_monitor (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP, + GNUNET_TIME_absolute_ntoh (prm->expiration_time), &prm->key, + put_path_length, put_path, get_path_length, get_path, + 0, type, data, data_size); + + return GNUNET_YES; +} + + +/** + * Initialize neighbours subsystem. + * + * @return GNUNET_OK on success, GNUNET_SYSERR on error + */ +int +GDS_NEIGHBOURS_init () +{ + static struct GNUNET_CORE_MessageHandler core_handlers[] = { + {&handle_dht_p2p_get, GNUNET_MESSAGE_TYPE_DHT_P2P_GET, 0}, + {&handle_dht_p2p_put, GNUNET_MESSAGE_TYPE_DHT_P2P_PUT, 0}, + {&handle_dht_p2p_result, GNUNET_MESSAGE_TYPE_DHT_P2P_RESULT, 0}, + {NULL, 0, 0} + }; + unsigned long long temp_config_num; + + if (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_number (GDS_cfg, "DHT", "bucket_size", + &temp_config_num)) + bucket_size = (unsigned int) temp_config_num; + atsAPI = GNUNET_ATS_performance_init (GDS_cfg, NULL, NULL); + coreAPI = + GNUNET_CORE_connect (GDS_cfg, 1, NULL, &core_init, &handle_core_connect, + &handle_core_disconnect, NULL, GNUNET_NO, NULL, + GNUNET_NO, core_handlers); + if (coreAPI == NULL) + return GNUNET_SYSERR; + all_known_peers = GNUNET_CONTAINER_multihashmap_create (256); + return GNUNET_OK; +} + + +/** + * Shutdown neighbours subsystem. + */ +void +GDS_NEIGHBOURS_done () +{ + if (coreAPI == NULL) + return; + GNUNET_CORE_disconnect (coreAPI); + coreAPI = NULL; + GNUNET_ATS_performance_done (atsAPI); + atsAPI = NULL; + GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (all_known_peers)); + GNUNET_CONTAINER_multihashmap_destroy (all_known_peers); + all_known_peers = NULL; + if (GNUNET_SCHEDULER_NO_TASK != find_peer_task) + { + GNUNET_SCHEDULER_cancel (find_peer_task); + find_peer_task = GNUNET_SCHEDULER_NO_TASK; + } +} + + +/* end of gnunet-service-dht_neighbours.c */ |