aboutsummaryrefslogtreecommitdiff
path: root/src/testbed
diff options
context:
space:
mode:
authorSree Harsha Totakura <totakura@in.tum.de>2014-02-18 15:43:38 +0000
committerSree Harsha Totakura <totakura@in.tum.de>2014-02-18 15:43:38 +0000
commit6b75b4dc34ff61e06844277c7cc9ff38d49fffeb (patch)
tree4b27400b361686d853b12c758f8450e74e4f5887 /src/testbed
parent31c3c7c755dcf707f73b00959970bfd27da4b7ca (diff)
Consider the resources from failed operations as overloaded and not use them
until the parallelism is refreshed. This commit also fixes a bug where the parallelism is set to 0 and hence no progress can be made.
Diffstat (limited to 'src/testbed')
-rw-r--r--src/testbed/testbed_api_operations.c62
1 files changed, 51 insertions, 11 deletions
diff --git a/src/testbed/testbed_api_operations.c b/src/testbed/testbed_api_operations.c
index 2a559a90e4..07fb4940dc 100644
--- a/src/testbed/testbed_api_operations.c
+++ b/src/testbed/testbed_api_operations.c
@@ -29,6 +29,17 @@
#include "testbed_api_operations.h"
#include "testbed_api_sd.h"
+/**
+ * The number of readings containing past operation's timing information that we
+ * keep track of for adaptive queues
+ */
+#define ADAPTIVE_QUEUE_DEFAULT_HISTORY 10
+
+/**
+ * The number of parallel opeartions we start with by default for adaptive
+ * queues
+ */
+#define ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE 4
/**
* An entry in the operation queue
@@ -141,7 +152,6 @@ struct FeedbackCtx
* Number of operations that have failed
*/
unsigned int nfailed;
-
};
@@ -216,10 +226,16 @@ struct OperationQueue
* Max number of operations which can be active at any time in this queue.
* This value can be changed either by calling
* GNUNET_TESTBED_operation_queue_reset_max_active_() or by the adaptive
- * algorithm if this operation queue is of type OPERATION_QUEUE_TYPE_ADAPTIVE
+ * algorithm if this operation queue is of type #OPERATION_QUEUE_TYPE_ADAPTIVE
*/
unsigned int max_active;
+ /**
+ * The number of resources occupied by failed operations in the current shot.
+ * This is only relavant if the operation queue is of type
+ * #OPERATION_QUEUE_TYPE_ADAPTIVE
+ */
+ unsigned int overload;
};
@@ -613,6 +629,7 @@ decide_capacity (struct OperationQueue *opq,
unsigned int n_ops;
unsigned int n_evict_entries;
unsigned int need;
+ unsigned int max;
int deficit;
int rval;
@@ -623,14 +640,22 @@ decide_capacity (struct OperationQueue *opq,
evict_entries = NULL;
n_evict_entries = 0;
rval = GNUNET_YES;
- if (opq->active > opq->max_active)
+ if (OPERATION_QUEUE_TYPE_ADAPTIVE == opq->type)
+ {
+ GNUNET_assert (NULL != opq->fctx);
+ GNUNET_assert (opq->max_active >= opq->overload);
+ max = opq->max_active - opq->overload;
+ }
+ else
+ max = opq->max_active;
+ if (opq->active > max)
{
rval = GNUNET_NO;
goto ret;
}
- if ((opq->active + need) <= opq->max_active)
+ if ((opq->active + need) <= max)
goto ret;
- deficit = need - (opq->max_active - opq->active);
+ deficit = need - (max - opq->active);
for (entry = opq->nq_head;
(0 < deficit) && (NULL != entry);
entry = entry->next)
@@ -825,6 +850,7 @@ adaptive_queue_set_max_active (struct OperationQueue *queue, unsigned int n)
n = GNUNET_MIN (n ,fctx->max_active_bound);
fctx->tslots_freeptr = GNUNET_malloc (n * sizeof (struct TimeSlot));
fctx->nfailed = 0;
+ FPRINTF (stderr, "Parallelism: %u\n", n);
for (cnt = 0; cnt < n; cnt++)
{
tslot = &fctx->tslots_freeptr[cnt];
@@ -881,14 +907,19 @@ adapt_parallelism (struct OperationQueue *queue)
adaptive_queue_set_max_active (queue, queue->max_active); /* no change */
return;
}
- if (1 == sd)
- adaptive_queue_set_max_active (queue, queue->max_active - 1);
- if (2 <= sd)
- adaptive_queue_set_max_active (queue, queue->max_active / 2);
if (-1 == sd)
adaptive_queue_set_max_active (queue, queue->max_active + 1);
if (sd <= -2)
adaptive_queue_set_max_active (queue, queue->max_active * 2);
+ if (1 == queue->max_active)
+ {
+ adaptive_queue_set_max_active (queue, 1);
+ return;
+ }
+ if (1 == sd)
+ adaptive_queue_set_max_active (queue, queue->max_active - 1);
+ if (2 <= sd)
+ adaptive_queue_set_max_active (queue, queue->max_active / 2);
#if 0 /* old algorithm */
if (sd < 0)
@@ -934,6 +965,7 @@ update_tslots (struct GNUNET_TESTBED_Operation *op)
struct GNUNET_TIME_Relative t;
struct TimeSlot *tslot;
struct FeedbackCtx *fctx;
+ unsigned int i;
t = GNUNET_TIME_absolute_get_duration (op->tstart);
while (NULL != (tslot = op->tslots_head)) /* update time slots */
@@ -945,7 +977,14 @@ update_tslots (struct GNUNET_TESTBED_Operation *op)
GNUNET_CONTAINER_DLL_insert_tail (fctx->alloc_head, fctx->alloc_tail,
tslot);
if (op->failed)
+ {
fctx->nfailed++;
+ for (i = 0; i < op->nqueues; i++)
+ if (queue == op->queues[i])
+ break;
+ GNUNET_assert (i != op->nqueues);
+ op->queues[i]->overload += op->nres[i];
+ }
tslot->tsum = GNUNET_TIME_relative_add (tslot->tsum, t);
if (0 != tslot->nvals++)
continue;
@@ -1004,9 +1043,9 @@ GNUNET_TESTBED_operation_queue_create_ (enum OperationQueueType type,
{
fctx = GNUNET_new (struct FeedbackCtx);
fctx->max_active_bound = max_active;
- fctx->sd = GNUNET_TESTBED_SD_init_ (10); /* FIXME: Why 10? */
+ fctx->sd = GNUNET_TESTBED_SD_init_ (ADAPTIVE_QUEUE_DEFAULT_HISTORY);
queue->fctx = fctx;
- adaptive_queue_set_max_active (queue, 4); /* start with 4 */
+ adaptive_queue_set_max_active (queue, ADAPTIVE_QUEUE_DEFAULT_MAX_ACTIVE);
}
return queue;
}
@@ -1090,6 +1129,7 @@ GNUNET_TESTBED_operation_queue_reset_max_active_ (struct OperationQueue *queue,
struct QueueEntry *entry;
queue->max_active = max_active;
+ queue->overload = 0;
while ( (queue->active > queue->max_active)
&& (NULL != (entry = queue->rq_head)) )
defer (entry->op);