diff options
author | Sree Harsha Totakura <totakura@in.tum.de> | 2014-02-18 15:43:38 +0000 |
---|---|---|
committer | Sree Harsha Totakura <totakura@in.tum.de> | 2014-02-18 15:43:38 +0000 |
commit | 6b75b4dc34ff61e06844277c7cc9ff38d49fffeb (patch) | |
tree | 4b27400b361686d853b12c758f8450e74e4f5887 /src/testbed | |
parent | 31c3c7c755dcf707f73b00959970bfd27da4b7ca (diff) |
Consider the resources from failed operations as overloaded and not use them
until the parallelism is refreshed.
This commit also fixes a bug where the parallelism is set to 0 and hence no
progress can be made.
Diffstat (limited to 'src/testbed')
-rw-r--r-- | src/testbed/testbed_api_operations.c | 62 |
1 files changed, 51 insertions, 11 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c index 2a559a90e4..07fb4940dc 100644 --- a/src/testbed/testbed_api_operations.c +++ b/src/testbed/testbed_api_operations.c @@ -29,6 +29,17 @@ #include "testbed_api_operations.h" #include "testbed_api_sd.h" +/** + * The number of readings containing past operation's timing information that we + * keep track of for adaptive queues + */ +#define ADAPTIVE_QUEUE_DEFAULT_HISTORY 10 + +/** + * The number of parallel opeartions we start with by default for adaptive + * queues + */ +#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4 /** * An entry in the operation queue @@ -141,7 +152,6 @@ struct FeedbackCtx * Number of operations that have failed */ unsigned int nfailed; - }; @@ -216,10 +226,16 @@ struct OperationQueue * Max number of operations which can be active at any time in this queue. * This value can be changed either by calling * GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive - * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE + * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE */ unsigned int max_active; + /** + * The number of resources occupied by failed operations in the current shot. + * This is only relavant if the operation queue is of type + * #OPERATION_QUEUE_TYPE_ADAPTIVE + */ + unsigned int overload; }; @@ -613,6 +629,7 @@ decide_capacity (struct OperationQueue *opq, unsigned int n_ops; unsigned int n_evict_entries; unsigned int need; + unsigned int max; int deficit; int rval; @@ -623,14 +640,22 @@ decide_capacity (struct OperationQueue *opq, evict_entries = NULL; n_evict_entries = 0; rval = GNUNET_YES; - if (opq->active > opq->max_active) + if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type) + { + GNUNET_assert (NULL != opq->fctx); + GNUNET_assert (opq->max_active >= opq->overload); + max = opq->max_active - opq->overload; + } + else + max = opq->max_active; + if (opq->active > max) { rval = GNUNET_NO; goto ret; } - if ((opq->active + need) <= opq->max_active) + if ((opq->active + need) <= max) goto ret; - deficit = need - (opq->max_active - opq->active); + deficit = need - (max - opq->active); for (entry = opq->nq_head; (0 < deficit) && (NULL != entry); entry = entry->next) @@ -825,6 +850,7 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n) n = GNUNET_MIN (n ,fctx->max_active_bound); fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot)); fctx->nfailed = 0; + FPRINTF (stderr, "Parallelism: %u\n", n); for (cnt = 0; cnt < n; cnt++) { tslot = &fctx->tslots_freeptr[cnt]; @@ -881,14 +907,19 @@ adapt_parallelism (struct OperationQueue *queue) adaptive_queue_set_max_active (queue, queue->max_active); /* no change */ return; } - if (1 == sd) - adaptive_queue_set_max_active (queue, queue->max_active - 1); - if (2 <= sd) - adaptive_queue_set_max_active (queue, queue->max_active / 2); if (-1 == sd) adaptive_queue_set_max_active (queue, queue->max_active + 1); if (sd <= -2) adaptive_queue_set_max_active (queue, queue->max_active * 2); + if (1 == queue->max_active) + { + adaptive_queue_set_max_active (queue, 1); + return; + } + if (1 == sd) + adaptive_queue_set_max_active (queue, queue->max_active - 1); + if (2 <= sd) + adaptive_queue_set_max_active (queue, queue->max_active / 2); #if 0 /* old algorithm */ if (sd < 0) @@ -934,6 +965,7 @@ update_tslots (struct GNUNET_TESTBED_Operation *op) struct GNUNET_TIME_Relative t; struct TimeSlot *tslot; struct FeedbackCtx *fctx; + unsigned int i; t = GNUNET_TIME_absolute_get_duration (op->tstart); while (NULL != (tslot = op->tslots_head)) /* update time slots */ @@ -945,7 +977,14 @@ update_tslots (struct GNUNET_TESTBED_Operation *op) GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail, tslot); if (op->failed) + { fctx->nfailed++; + for (i = 0; i < op->nqueues; i++) + if (queue == op->queues[i]) + break; + GNUNET_assert (i != op->nqueues); + op->queues[i]->overload += op->nres[i]; + } tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t); if (0 != tslot->nvals++) continue; @@ -1004,9 +1043,9 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type, { fctx = GNUNET_new (struct FeedbackCtx); fctx->max_active_bound = max_active; - fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */ + fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY); queue->fctx = fctx; - adaptive_queue_set_max_active (queue, 4); /* start with 4 */ + adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE); } return queue; } @@ -1090,6 +1129,7 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue, struct QueueEntry *entry; queue->max_active = max_active; + queue->overload = 0; while ( (queue->active > queue->max_active) && (NULL != (entry = queue->rq_head)) ) defer (entry->op); |