aboutsummaryrefslogtreecommitdiff
path: root/src/fs/gnunet-service-fs_pr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/fs/gnunet-service-fs_pr.c')
-rw-r--r--src/fs/gnunet-service-fs_pr.c340
1 files changed, 246 insertions, 94 deletions
diff --git a/src/fs/gnunet-service-fs_pr.c b/src/fs/gnunet-service-fs_pr.c
index 8ca4121..76e04f5 100644
--- a/src/fs/gnunet-service-fs_pr.c
+++ b/src/fs/gnunet-service-fs_pr.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010, 2011, 2012 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -30,6 +30,13 @@
#include "gnunet-service-fs_indexing.h"
#include "gnunet-service-fs_pe.h"
#include "gnunet-service-fs_pr.h"
+#include "gnunet-service-fs_stream.h"
+
+
+/**
+ * Desired replication level for GETs.
+ */
+#define DHT_GET_REPLICATION 5
/**
* Maximum size of the datastore queue for P2P operations. Needs to
@@ -52,6 +59,18 @@
#define MAX_RESULTS (100 * 1024)
/**
+ * Collect an instane number of statistics? May cause excessive IPC.
+ */
+#define INSANE_STATISTICS GNUNET_NO
+
+/**
+ * If obtaining a block via stream fails, how often do we retry it before
+ * giving up for good (and sticking to non-anonymous transfer)?
+ */
+#define STREAM_RETRY_MAX 3
+
+
+/**
* An active request.
*/
struct GSF_PendingRequest
@@ -74,7 +93,7 @@ struct GSF_PendingRequest
/**
* Array of hash codes of replies we've already seen.
*/
- GNUNET_HashCode *replies_seen;
+ struct GNUNET_HashCode *replies_seen;
/**
* Bloomfilter masking replies we've already seen.
@@ -97,6 +116,11 @@ struct GSF_PendingRequest
struct GNUNET_DHT_GetHandle *gh;
/**
+ * Stream request handle for this request (or NULL for none).
+ */
+ struct GSF_StreamRequest *stream_request;
+
+ /**
* Function to call upon completion of the local get
* request, or NULL for none.
*/
@@ -149,6 +173,12 @@ struct GSF_PendingRequest
uint64_t first_uid;
/**
+ * How often have we retried this request via 'stream'?
+ * (used to bound overall retries).
+ */
+ unsigned int stream_retry_count;
+
+ /**
* Number of valid entries in the 'replies_seen' array.
*/
unsigned int replies_seen_count;
@@ -191,12 +221,6 @@ static int active_to_migration;
/**
- * Size of the datastore queue we assume for common requests.
- * Determined based on the network quota.
- */
-static unsigned int datastore_queue_size;
-
-/**
* Heap with the request that will expire next at the top. Contains
* pointers of type "struct PendingRequest*"; these will *also* be
* aliased from the "requests_by_peer" data structures and the
@@ -263,40 +287,52 @@ refresh_bloomfilter (struct GSF_PendingRequest *pr)
struct GSF_PendingRequest *
GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
enum GNUNET_BLOCK_Type type,
- const GNUNET_HashCode * query,
- const GNUNET_HashCode * namespace,
+ const struct GNUNET_HashCode *query,
+ const struct GNUNET_HashCode *namespace,
const struct GNUNET_PeerIdentity *target,
const char *bf_data, size_t bf_size,
uint32_t mingle, uint32_t anonymity_level,
uint32_t priority, int32_t ttl,
GNUNET_PEER_Id sender_pid,
GNUNET_PEER_Id origin_pid,
- const GNUNET_HashCode * replies_seen,
+ const struct GNUNET_HashCode *replies_seen,
unsigned int replies_seen_count,
GSF_PendingRequestReplyHandler rh, void *rh_cls)
{
struct GSF_PendingRequest *pr;
struct GSF_PendingRequest *dpr;
+ size_t extra;
+ struct GNUNET_HashCode *eptr;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating request handle for `%s' of type %d\n",
GNUNET_h2s (query), type);
+#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# Pending requests created"), 1,
GNUNET_NO);
- pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
+#endif
+ extra = 0;
+ if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
+ extra += sizeof (struct GNUNET_HashCode);
+ if (NULL != target)
+ extra += sizeof (struct GNUNET_PeerIdentity);
+ pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest) + extra);
pr->local_result_offset =
GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
pr->public_data.query = *query;
+ eptr = (struct GNUNET_HashCode *) &pr[1];
if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
{
- GNUNET_assert (NULL != namespace);
- pr->public_data.namespace = *namespace;
+ GNUNET_assert (NULL != namespace);
+ pr->public_data.namespace = eptr;
+ memcpy (eptr, namespace, sizeof (struct GNUNET_HashCode));
+ eptr++;
}
if (NULL != target)
{
- pr->public_data.target = *target;
- pr->public_data.has_target = GNUNET_YES;
+ pr->public_data.target = (struct GNUNET_PeerIdentity *) eptr;
+ memcpy (eptr, target, sizeof (struct GNUNET_PeerIdentity));
}
pr->public_data.anonymity_level = anonymity_level;
pr->public_data.priority = priority;
@@ -324,9 +360,9 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
{
pr->replies_seen_size = replies_seen_count;
pr->replies_seen =
- GNUNET_malloc (sizeof (GNUNET_HashCode) * pr->replies_seen_size);
+ GNUNET_malloc (sizeof (struct GNUNET_HashCode) * pr->replies_seen_size);
memcpy (pr->replies_seen, replies_seen,
- replies_seen_count * sizeof (GNUNET_HashCode));
+ replies_seen_count * sizeof (struct GNUNET_HashCode));
pr->replies_seen_count = replies_seen_count;
}
if (NULL != bf_data)
@@ -341,7 +377,8 @@ GSF_pending_request_create_ (enum GSF_PendingRequestOptions options,
{
refresh_bloomfilter (pr);
}
- GNUNET_CONTAINER_multihashmap_put (pr_map, query, pr,
+ GNUNET_CONTAINER_multihashmap_put (pr_map,
+ &pr->public_data.query, pr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
if (0 == (options & GSF_PRO_REQUEST_NEVER_EXPIRES))
{
@@ -398,11 +435,12 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
if ((pra->public_data.type != prb->public_data.type) ||
(0 !=
memcmp (&pra->public_data.query, &prb->public_data.query,
- sizeof (GNUNET_HashCode))) ||
+ sizeof (struct GNUNET_HashCode))) ||
((pra->public_data.type == GNUNET_BLOCK_TYPE_FS_SBLOCK) &&
(0 !=
- memcmp (&pra->public_data.namespace, &prb->public_data.namespace,
- sizeof (GNUNET_HashCode)))))
+ memcmp (pra->public_data.namespace,
+ prb->public_data.namespace,
+ sizeof (struct GNUNET_HashCode)))))
return GNUNET_NO;
return GNUNET_OK;
}
@@ -419,11 +457,11 @@ GSF_pending_request_is_compatible_ (struct GSF_PendingRequest *pra,
*/
void
GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
- const GNUNET_HashCode * replies_seen,
+ const struct GNUNET_HashCode * replies_seen,
unsigned int replies_seen_count)
{
unsigned int i;
- GNUNET_HashCode mhash;
+ struct GNUNET_HashCode mhash;
if (replies_seen_count + pr->replies_seen_count < pr->replies_seen_count)
return; /* integer overflow */
@@ -434,7 +472,7 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
GNUNET_array_grow (pr->replies_seen, pr->replies_seen_size,
replies_seen_count + pr->replies_seen_count);
memcpy (&pr->replies_seen[pr->replies_seen_count], replies_seen,
- sizeof (GNUNET_HashCode) * replies_seen_count);
+ sizeof (struct GNUNET_HashCode) * replies_seen_count);
pr->replies_seen_count += replies_seen_count;
refresh_bloomfilter (pr);
}
@@ -459,6 +497,10 @@ GSF_pending_request_update_ (struct GSF_PendingRequest *pr,
}
}
}
+ if (NULL != pr->gh)
+ GNUNET_DHT_get_filter_known_results (pr->gh,
+ replies_seen_count,
+ replies_seen);
}
@@ -477,7 +519,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
{
char lbuf[GNUNET_SERVER_MAX_MESSAGE_SIZE];
struct GetMessage *gm;
- GNUNET_HashCode *ext;
+ struct GNUNET_HashCode *ext;
size_t msize;
unsigned int k;
uint32_t bm;
@@ -509,13 +551,13 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
bm |= GET_MESSAGE_BIT_SKS_NAMESPACE;
k++;
}
- if (GNUNET_YES == pr->public_data.has_target)
+ if (NULL != pr->public_data.target)
{
bm |= GET_MESSAGE_BIT_TRANSMIT_TO;
k++;
}
bf_size = GNUNET_CONTAINER_bloomfilter_get_size (pr->bf);
- msize = sizeof (struct GetMessage) + bf_size + k * sizeof (GNUNET_HashCode);
+ msize = sizeof (struct GetMessage) + bf_size + k * sizeof (struct GNUNET_HashCode);
GNUNET_assert (msize < GNUNET_SERVER_MAX_MESSAGE_SIZE);
if (buf_size < msize)
return msize;
@@ -530,6 +572,8 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
else
prio = 0;
pr->public_data.priority -= prio;
+ pr->public_data.num_transmissions++;
+ pr->public_data.respect_offered += prio;
gm->priority = htonl (prio);
now = GNUNET_TIME_absolute_get ();
ttl = (int64_t) (pr->public_data.ttl.abs_value - now.abs_value);
@@ -537,15 +581,17 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
gm->filter_mutator = htonl (pr->mingle);
gm->hash_bitmap = htonl (bm);
gm->query = pr->public_data.query;
- ext = (GNUNET_HashCode *) & gm[1];
+ ext = (struct GNUNET_HashCode *) & gm[1];
k = 0;
if (!do_route)
GNUNET_PEER_resolve (pr->sender_pid,
(struct GNUNET_PeerIdentity *) &ext[k++]);
if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
- memcpy (&ext[k++], &pr->public_data.namespace, sizeof (GNUNET_HashCode));
- if (GNUNET_YES == pr->public_data.has_target)
- ext[k++] = pr->public_data.target.hashPubKey;
+ memcpy (&ext[k++], pr->public_data.namespace, sizeof (struct GNUNET_HashCode));
+ if (NULL != pr->public_data.target)
+ memcpy (&ext[k++],
+ pr->public_data.target,
+ sizeof (struct GNUNET_PeerIdentity));
if (pr->bf != NULL)
GNUNET_assert (GNUNET_SYSERR !=
GNUNET_CONTAINER_bloomfilter_get_raw_data (pr->bf,
@@ -565,7 +611,7 @@ GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
* @return GNUNET_YES (we should continue to iterate)
*/
static int
-clean_request (void *cls, const GNUNET_HashCode * key, void *value)
+clean_request (void *cls, const struct GNUNET_HashCode * key, void *value)
{
struct GSF_PendingRequest *pr = value;
GSF_LocalLookupContinuation cont;
@@ -603,6 +649,11 @@ clean_request (void *cls, const GNUNET_HashCode * key, void *value)
GNUNET_DHT_get_stop (pr->gh);
pr->gh = NULL;
}
+ if (NULL != pr->stream_request)
+ {
+ GSF_stream_query_cancel (pr->stream_request);
+ pr->stream_request = NULL;
+ }
if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
{
GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -655,6 +706,11 @@ GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr, int full_cleanup)
GNUNET_DHT_get_stop (pr->gh);
pr->gh = NULL;
}
+ if (NULL != pr->stream_request)
+ {
+ GSF_stream_query_cancel (pr->stream_request);
+ pr->stream_request = NULL;
+ }
if (GNUNET_SCHEDULER_NO_TASK != pr->warn_task)
{
GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -763,11 +819,11 @@ update_request_performance_data (struct ProcessReplyClosure *prq,
* @return GNUNET_YES (we should continue to iterate)
*/
static int
-process_reply (void *cls, const GNUNET_HashCode * key, void *value)
+process_reply (void *cls, const struct GNUNET_HashCode * key, void *value)
{
struct ProcessReplyClosure *prq = cls;
struct GSF_PendingRequest *pr = value;
- GNUNET_HashCode chash;
+ struct GNUNET_HashCode chash;
struct GNUNET_TIME_Absolute last_transmission;
if (NULL == pr->rh)
@@ -780,10 +836,10 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
GNUNET_NO);
prq->eval =
GNUNET_BLOCK_evaluate (GSF_block_ctx, prq->type, key, &pr->bf, pr->mingle,
- &pr->public_data.namespace,
+ pr->public_data.namespace,
(prq->type ==
GNUNET_BLOCK_TYPE_FS_SBLOCK) ?
- sizeof (GNUNET_HashCode) : 0, prq->data,
+ sizeof (struct GNUNET_HashCode) : 0, prq->data,
prq->size);
switch (prq->eval)
{
@@ -796,20 +852,33 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
GNUNET_LOAD_update (GSF_rt_entry_lifetime,
GNUNET_TIME_absolute_get_duration (pr->
public_data.start_time).rel_value);
- if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission))
+ if (GNUNET_YES !=
+ GSF_request_plan_reference_get_last_transmission_ (pr->public_data.pr_head,
+ prq->sender,
+ &last_transmission))
last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value;
/* pass on to other peers / local clients */
pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
last_transmission, prq->type, prq->data, prq->size);
return GNUNET_YES;
case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
+#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# duplicate replies discarded (bloomfilter)"),
1, GNUNET_NO);
+#endif
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Duplicate response, discarding.\n");
return GNUNET_YES; /* duplicate */
+ case GNUNET_BLOCK_EVALUATION_RESULT_IRRELEVANT:
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop
+ ("# irrelevant replies discarded"),
+ 1, GNUNET_NO);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Irrelevant response, ignoring.\n");
+ return GNUNET_YES;
case GNUNET_BLOCK_EVALUATION_RESULT_INVALID:
return GNUNET_YES; /* wrong namespace */
case GNUNET_BLOCK_EVALUATION_REQUEST_VALID:
@@ -845,9 +914,12 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
pr->public_data.results_found++;
prq->request_found = GNUNET_YES;
/* finally, pass on to other peer / local client */
- if (!GSF_request_plan_reference_get_last_transmission_ (pr->public_data.rpr_head, prq->sender, &last_transmission))
+ if (! GSF_request_plan_reference_get_last_transmission_ (pr->public_data.pr_head,
+ prq->sender,
+ &last_transmission))
last_transmission.abs_value = GNUNET_TIME_UNIT_FOREVER_ABS.abs_value;
- pr->rh (pr->rh_cls, prq->eval, pr, prq->anonymity_level, prq->expiration,
+ pr->rh (pr->rh_cls, prq->eval, pr,
+ prq->anonymity_level, prq->expiration,
last_transmission, prq->type, prq->data, prq->size);
return GNUNET_YES;
}
@@ -929,8 +1001,8 @@ put_migration_continuation (void *cls, int success,
if (min_expiration.abs_value > 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Asking to stop migration for %llu ms because datastore is full\n",
- (unsigned long long) GNUNET_TIME_absolute_get_remaining (min_expiration).rel_value);
+ "Asking to stop migration for %s because datastore is full\n",
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_remaining (min_expiration), GNUNET_YES));
GSF_block_peer_migration_ (cp, min_expiration);
}
else
@@ -943,8 +1015,8 @@ put_migration_continuation (void *cls, int success,
ppd->migration_delay.rel_value);
ppd->migration_delay = GNUNET_TIME_relative_multiply (ppd->migration_delay, 2);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Replicated content already exists locally, asking to stop migration for %llu ms\n",
- (unsigned long long) mig_pause.rel_value);
+ "Replicated content already exists locally, asking to stop migration for %s\n",
+ GNUNET_STRINGS_relative_time_to_string (mig_pause, GNUNET_YES));
GSF_block_peer_migration_ (cp, GNUNET_TIME_relative_to_absolute (mig_pause));
}
}
@@ -1000,7 +1072,7 @@ test_put_load_too_high (uint32_t priority)
*/
static void
handle_dht_reply (void *cls, struct GNUNET_TIME_Absolute exp,
- const GNUNET_HashCode * key,
+ const struct GNUNET_HashCode * key,
const struct GNUNET_PeerIdentity *get_path,
unsigned int get_path_length,
const struct GNUNET_PeerIdentity *put_path,
@@ -1057,7 +1129,7 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
const void *xquery;
size_t xquery_size;
struct GNUNET_PeerIdentity pi;
- char buf[sizeof (GNUNET_HashCode) * 2] GNUNET_ALIGN;
+ char buf[sizeof (struct GNUNET_HashCode) * 2] GNUNET_ALIGN;
if (0 != pr->public_data.anonymity_level)
return;
@@ -1071,8 +1143,8 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
{
xquery = buf;
- memcpy (buf, &pr->public_data.namespace, sizeof (GNUNET_HashCode));
- xquery_size = sizeof (GNUNET_HashCode);
+ memcpy (buf, pr->public_data.namespace, sizeof (struct GNUNET_HashCode));
+ xquery_size = sizeof (struct GNUNET_HashCode);
}
if (0 != (pr->public_data.options & GSF_PRO_FORWARD_ONLY))
{
@@ -1084,10 +1156,105 @@ GSF_dht_lookup_ (struct GSF_PendingRequest *pr)
pr->gh =
GNUNET_DHT_get_start (GSF_dht,
pr->public_data.type, &pr->public_data.query,
- 5 /* DEFAULT_GET_REPLICATION */ ,
+ DHT_GET_REPLICATION,
GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
- /* FIXME: can no longer pass pr->bf/pr->mingle... */
xquery, xquery_size, &handle_dht_reply, pr);
+ if ( (NULL != pr->gh) &&
+ (0 != pr->replies_seen_count) )
+ GNUNET_DHT_get_filter_known_results (pr->gh,
+ pr->replies_seen_count,
+ pr->replies_seen);
+}
+
+
+/**
+ * Function called with a reply from the stream.
+ *
+ * @param cls the pending request struct
+ * @param type type of the block, ANY on error
+ * @param expiration expiration time for the block
+ * @param data_size number of bytes in 'data', 0 on error
+ * @param data reply block data, NULL on error
+ */
+static void
+stream_reply_proc (void *cls,
+ enum GNUNET_BLOCK_Type type,
+ struct GNUNET_TIME_Absolute expiration,
+ size_t data_size,
+ const void *data)
+{
+ struct GSF_PendingRequest *pr = cls;
+ struct ProcessReplyClosure prq;
+ struct GNUNET_HashCode query;
+
+ pr->stream_request = NULL;
+ if (GNUNET_BLOCK_TYPE_ANY == type)
+ {
+ GNUNET_break (NULL == data);
+ GNUNET_break (0 == data_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Error retrieiving block via stream\n");
+ pr->stream_retry_count++;
+ if (pr->stream_retry_count >= STREAM_RETRY_MAX)
+ return; /* give up on stream */
+ /* retry -- without delay, as this is non-anonymous
+ and mesh/stream connect will take some time anyway */
+ pr->stream_request = GSF_stream_query (pr->public_data.target,
+ &pr->public_data.query,
+ pr->public_data.type,
+ &stream_reply_proc,
+ pr);
+ return;
+ }
+ if (GNUNET_YES !=
+ GNUNET_BLOCK_get_key (GSF_block_ctx,
+ type,
+ data, data_size, &query))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Failed to derive key for block of type %d\n",
+ (int) type);
+ GNUNET_break_op (0);
+ return;
+ }
+ GNUNET_STATISTICS_update (GSF_stats,
+ gettext_noop ("# Replies received from STREAM"), 1,
+ GNUNET_NO);
+ memset (&prq, 0, sizeof (prq));
+ prq.data = data;
+ prq.expiration = expiration;
+ /* do not allow migrated content to live longer than 1 year */
+ prq.expiration = GNUNET_TIME_absolute_min (GNUNET_TIME_relative_to_absolute (GNUNET_TIME_UNIT_YEARS),
+ prq.expiration);
+ prq.size = data_size;
+ prq.type = type;
+ process_reply (&prq, &query, pr);
+}
+
+
+/**
+ * Consider downloading via stream (if possible)
+ *
+ * @param pr the pending request to process
+ */
+void
+GSF_stream_lookup_ (struct GSF_PendingRequest *pr)
+{
+ if (0 != pr->public_data.anonymity_level)
+ return;
+ if (0 == pr->public_data.target)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Cannot do stream-based download, target peer not known\n");
+ return;
+ }
+ if (NULL != pr->stream_request)
+ return;
+ pr->stream_request = GSF_stream_query (pr->public_data.target,
+ &pr->public_data.query,
+ pr->public_data.type,
+ &stream_reply_proc,
+ pr);
}
@@ -1103,9 +1270,8 @@ warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
struct GSF_PendingRequest *pr = cls;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("Datastore lookup already took %llu ms!\n"),
- (unsigned long long)
- GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value);
+ _("Datastore lookup already took %s!\n"),
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pr->qe_start), GNUNET_YES));
pr->warn_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task,
pr);
@@ -1124,9 +1290,8 @@ odc_warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
struct GSF_PendingRequest *pr = cls;
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- _("On-demand lookup already took %llu ms!\n"),
- (unsigned long long)
- GNUNET_TIME_absolute_get_duration (pr->qe_start).rel_value);
+ _("On-demand lookup already took %s!\n"),
+ GNUNET_STRINGS_relative_time_to_string (GNUNET_TIME_absolute_get_duration (pr->qe_start), GNUNET_YES));
pr->warn_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
&odc_warn_delay_task, pr);
@@ -1151,7 +1316,7 @@ odc_warn_delay_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
* maybe 0 if no unique identifier is available
*/
static void
-process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size,
+process_local_reply (void *cls, const struct GNUNET_HashCode * key, size_t size,
const void *data, enum GNUNET_BLOCK_Type type,
uint32_t priority, uint32_t anonymity,
struct GNUNET_TIME_Absolute expiration, uint64_t uid)
@@ -1159,7 +1324,7 @@ process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size,
struct GSF_PendingRequest *pr = cls;
GSF_LocalLookupContinuation cont;
struct ProcessReplyClosure prq;
- GNUNET_HashCode query;
+ struct GNUNET_HashCode query;
unsigned int old_rf;
GNUNET_SCHEDULER_cancel (pr->warn_task);
@@ -1169,10 +1334,12 @@ process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size,
pr->qe = NULL;
if (NULL == key)
{
+#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# Datastore lookups concluded (no results)"),
1, GNUNET_NO);
+#endif
}
if (GNUNET_NO == pr->have_first_uid)
{
@@ -1204,12 +1371,14 @@ process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size,
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK,
"No further local responses available.\n");
+#if INSANE_STATISTICS
if ((pr->public_data.type == GNUNET_BLOCK_TYPE_FS_DBLOCK) ||
(pr->public_data.type == GNUNET_BLOCK_TYPE_FS_IBLOCK))
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop
("# requested DBLOCK or IBLOCK not found"), 1,
GNUNET_NO);
+#endif
goto check_error_and_continue;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1258,7 +1427,7 @@ process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size,
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED &
pr->public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
@@ -1298,7 +1467,7 @@ process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size,
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED &
pr->public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
@@ -1356,7 +1525,7 @@ process_local_reply (void *cls, const GNUNET_HashCode * key, size_t size,
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED & pr->
public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
@@ -1413,6 +1582,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
GSF_LocalLookupContinuation cont, void *cont_cls)
{
GNUNET_assert (NULL == pr->gh);
+ GNUNET_assert (NULL == pr->stream_request);
GNUNET_assert (NULL == pr->llc_cont);
pr->llc_cont = cont;
pr->llc_cont_cls = cont_cls;
@@ -1420,9 +1590,11 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
pr->warn_task =
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, &warn_delay_task,
pr);
+#if INSANE_STATISTICS
GNUNET_STATISTICS_update (GSF_stats,
gettext_noop ("# Datastore lookups initiated"), 1,
GNUNET_NO);
+#endif
pr->qe =
GNUNET_DATASTORE_get_key (GSF_dsh, pr->local_result_offset++,
&pr->public_data.query,
@@ -1436,7 +1608,7 @@ GSF_local_lookup_ (struct GSF_PendingRequest *pr,
(0 !=
(GSF_PRO_PRIORITY_UNLIMITED & pr->
public_data.options)) ? UINT_MAX :
- datastore_queue_size
+ GSF_datastore_queue_size
/* max queue size */ ,
GNUNET_TIME_UNIT_FOREVER_REL,
&process_local_reply, pr);
@@ -1477,7 +1649,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
size_t dsize;
enum GNUNET_BLOCK_Type type;
struct GNUNET_TIME_Absolute expiration;
- GNUNET_HashCode query;
+ struct GNUNET_HashCode query;
struct ProcessReplyClosure prq;
struct GNUNET_TIME_Relative block_time;
double putl;
@@ -1509,10 +1681,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
GNUNET_NO);
/* now, lookup 'query' */
prq.data = (const void *) &put[1];
- if (NULL != cp)
- prq.sender = cp;
- else
- prq.sender = NULL;
+ prq.sender = cp;
prq.size = dsize;
prq.type = type;
prq.expiration = expiration;
@@ -1526,9 +1695,10 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
GSF_connected_peer_change_preference_ (cp,
CONTENT_BANDWIDTH_VALUE +
1000 * prq.priority);
- GSF_get_peer_performance_data_ (cp)->trust += prq.priority;
+ GSF_get_peer_performance_data_ (cp)->respect += prq.priority;
}
if ((GNUNET_YES == active_to_migration) &&
+ (NULL != cp) &&
(GNUNET_NO == test_put_load_too_high (prq.priority)))
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1551,7 +1721,7 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
put_migration_continuation (pmc, GNUNET_SYSERR, GNUNET_TIME_UNIT_ZERO_ABS, NULL);
}
}
- else
+ else if (NULL != cp)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Choosing not to keep content `%s' (%d/%d)\n",
@@ -1559,9 +1729,10 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
test_put_load_too_high (prq.priority));
}
putl = GNUNET_LOAD_get_load (datastore_put_load);
- if ((NULL != (cp = prq.sender)) && (GNUNET_NO == prq.request_found) &&
- ((GNUNET_YES != active_to_migration) ||
- (putl > 2.5 * (1 + prq.priority))))
+ if ( (NULL != cp) &&
+ (GNUNET_NO == prq.request_found) &&
+ ( (GNUNET_YES != active_to_migration) ||
+ (putl > 2.5 * (1 + prq.priority)) ) )
{
if (GNUNET_YES != active_to_migration)
putl = 1.0 + GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 5);
@@ -1589,37 +1760,18 @@ GSF_handle_p2p_content_ (struct GSF_ConnectedPeer *cp,
void
GSF_pending_request_init_ ()
{
- unsigned long long bps;
-
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (GSF_cfg, "fs",
"MAX_PENDING_REQUESTS",
&max_pending_requests))
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _
- ("Configuration fails to specify `%s', assuming default value."),
- "MAX_PENDING_REQUESTS");
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_INFO,
+ "fs", "MAX_PENDING_REQUESTS");
}
- if (GNUNET_OK !=
- GNUNET_CONFIGURATION_get_value_size (GSF_cfg, "ats", "WAN_QUOTA_OUT",
- &bps))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _
- ("Configuration fails to specify `%s', assuming default value."),
- "WAN_QUOTA_OUT");
- bps = 65536;
- }
- /* queue size should be #queries we can have pending and satisfy within
- * a carry interval: */
- datastore_queue_size =
- bps * GNUNET_CONSTANTS_MAX_BANDWIDTH_CARRY_S / DBLOCK_SIZE;
-
active_to_migration =
GNUNET_CONFIGURATION_get_value_yesno (GSF_cfg, "FS", "CONTENT_CACHING");
datastore_put_load = GNUNET_LOAD_value_init (DATASTORE_LOAD_AUTODECLINE);
- pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024);
+ pr_map = GNUNET_CONTAINER_multihashmap_create (32 * 1024, GNUNET_YES);
requests_by_expiration_heap =
GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
}