aboutsummaryrefslogtreecommitdiff
path: root/src/testbed/testbed_api.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/testbed/testbed_api.c')
-rw-r--r--src/testbed/testbed_api.c2702
1 files changed, 2658 insertions, 44 deletions
diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c
index 5168081..054aa3b 100644
--- a/src/testbed/testbed_api.c
+++ b/src/testbed/testbed_api.c
@@ -24,15 +24,1772 @@
* This library is supposed to make it easier to write
* testcases and script large-scale benchmarks.
* @author Christian Grothoff
+ * @author Sree Harsha Totakura
*/
+
+
#include "platform.h"
#include "gnunet_testbed_service.h"
#include "gnunet_core_service.h"
#include "gnunet_constants.h"
#include "gnunet_transport_service.h"
#include "gnunet_hello_lib.h"
+#include <zlib.h>
+
+#include "testbed.h"
+#include "testbed_api.h"
+#include "testbed_api_hosts.h"
+#include "testbed_api_peers.h"
+#include "testbed_api_operations.h"
+
+/**
+ * Generic logging shorthand
+ */
+#define LOG(kind, ...) \
+ GNUNET_log_from (kind, "testbed-api", __VA_ARGS__);
+
+/**
+ * Debug logging
+ */
+#define LOG_DEBUG(...) \
+ LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__);
+
+/**
+ * Relative time seconds shorthand
+ */
+#define TIME_REL_SECS(sec) \
+ GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec)
+
+
+/**
+ * Default server message sending retry timeout
+ */
+#define TIMEOUT_REL TIME_REL_SECS(1)
+
+
+/**
+ * Handle for controller process
+ */
+struct GNUNET_TESTBED_ControllerProc
+{
+ /**
+ * The process handle
+ */
+ struct GNUNET_HELPER_Handle *helper;
+
+ /**
+ * The arguments used to start the helper
+ */
+ char **helper_argv;
+
+ /**
+ * The host where the helper is run
+ */
+ struct GNUNET_TESTBED_Host *host;
+
+ /**
+ * The controller error callback
+ */
+ GNUNET_TESTBED_ControllerStatusCallback cb;
+
+ /**
+ * The closure for the above callback
+ */
+ void *cls;
+
+ /**
+ * The send handle for the helper
+ */
+ struct GNUNET_HELPER_SendHandle *shandle;
+
+ /**
+ * The message corresponding to send handle
+ */
+ struct GNUNET_MessageHeader *msg;
+
+ /**
+ * The configuration of the running testbed service
+ */
+ struct GNUNET_CONFIGURATION_Handle *cfg;
+
+};
+
+
+/**
+ * The message queue for sending messages to the controller service
+ */
+struct MessageQueue
+{
+ /**
+ * The message to be sent
+ */
+ struct GNUNET_MessageHeader *msg;
+
+ /**
+ * next pointer for DLL
+ */
+ struct MessageQueue *next;
+
+ /**
+ * prev pointer for DLL
+ */
+ struct MessageQueue *prev;
+};
+
+
+/**
+ * Structure for a controller link
+ */
+struct ControllerLink
+{
+ /**
+ * The next ptr for DLL
+ */
+ struct ControllerLink *next;
+
+ /**
+ * The prev ptr for DLL
+ */
+ struct ControllerLink *prev;
+
+ /**
+ * The host which will be referred in the peer start request. This is the
+ * host where the peer should be started
+ */
+ struct GNUNET_TESTBED_Host *delegated_host;
+
+ /**
+ * The host which will contacted to delegate the peer start request
+ */
+ struct GNUNET_TESTBED_Host *slave_host;
+
+ /**
+ * The configuration to be used to connect to slave host
+ */
+ const struct GNUNET_CONFIGURATION_Handle *slave_cfg;
+
+ /**
+ * GNUNET_YES if the slave should be started (and stopped) by us; GNUNET_NO
+ * if we are just allowed to use the slave via TCP/IP
+ */
+ int is_subordinate;
+};
+
+
+/**
+ * handle for host registration
+ */
+struct GNUNET_TESTBED_HostRegistrationHandle
+{
+ /**
+ * The host being registered
+ */
+ struct GNUNET_TESTBED_Host *host;
+
+ /**
+ * The controller at which this host is being registered
+ */
+ struct GNUNET_TESTBED_Controller *c;
+
+ /**
+ * The Registartion completion callback
+ */
+ GNUNET_TESTBED_HostRegistrationCompletion cc;
+
+ /**
+ * The closure for above callback
+ */
+ void *cc_cls;
+};
+
+
+/**
+ * Context data for forwarded Operation
+ */
+struct ForwardedOperationData
+{
+
+ /**
+ * The callback to call when reply is available
+ */
+ GNUNET_CLIENT_MessageHandler cc;
+
+ /**
+ * The closure for the above callback
+ */
+ void *cc_cls;
+
+};
+
+
+/**
+ * Context data for get slave config operations
+ */
+struct GetSlaveConfigData
+{
+ /**
+ * The id of the slave controller
+ */
+ uint32_t slave_id;
+
+};
+
+
+/**
+ * Context data for controller link operations
+ */
+struct ControllerLinkData
+{
+ /**
+ * The controller link message
+ */
+ struct GNUNET_TESTBED_ControllerLinkMessage *msg;
+
+};
+
+
+struct SDEntry
+{
+ /**
+ * DLL next pointer
+ */
+ struct SDEntry *next;
+
+ /**
+ * DLL prev pointer
+ */
+ struct SDEntry *prev;
+
+ /**
+ * The value to store
+ */
+ unsigned int amount;
+};
+
+
+struct SDHandle
+{
+ /**
+ * DLL head for storing entries
+ */
+ struct SDEntry *head;
+
+ /**
+ * DLL tail for storing entries
+ */
+ struct SDEntry *tail;
+
+ /**
+ * Squared sum of data values
+ */
+ unsigned long long sqsum;
+
+ /**
+ * Sum of the data values
+ */
+ unsigned long sum;
+
+ /**
+ * The average of data amounts
+ */
+ float avg;
+
+ /**
+ * The variance
+ */
+ double vr;
+
+ /**
+ * Number of data values; also the length of DLL containing SDEntries
+ */
+ unsigned int cnt;
+
+ /**
+ * max number of entries we can have in the DLL
+ */
+ unsigned int max_cnt;
+};
+
+
+/**
+ * This variable is set to the operation that has been last marked as done. It
+ * is used to verify whether the state associated with an operation is valid
+ * after the first notify callback is called. Such checks are necessary for
+ * certain operations where we have 2 notify callbacks. Examples are
+ * OP_PEER_CREATE, OP_PEER_START/STOP, OP_OVERLAY_CONNECT.
+ *
+ * This variable should ONLY be used to compare; it is a dangling pointer!!
+ */
+static const struct GNUNET_TESTBED_Operation *last_finished_operation;
+
+/**
+ * Initialize standard deviation calculation handle
+ *
+ * @param max_cnt the maximum number of readings to keep
+ * @return the initialized handle
+ */
+static struct SDHandle *
+SD_init (unsigned int max_cnt)
+{
+ struct SDHandle *h;
+
+ GNUNET_assert (1 < max_cnt);
+ h = GNUNET_malloc (sizeof (struct SDHandle));
+ h->max_cnt = max_cnt;
+ return h;
+}
+
+
+/**
+ * Frees the memory allocated to the SD handle
+ *
+ * @param h the SD handle
+ */
+static void
+SD_destroy (struct SDHandle *h)
+{
+ struct SDEntry *entry;
+
+ while (NULL != (entry = h->head))
+ {
+ GNUNET_CONTAINER_DLL_remove (h->head, h->tail, entry);
+ GNUNET_free (entry);
+ }
+ GNUNET_free (h);
+}
+
+
+/**
+ * Add a reading to SD
+ *
+ * @param h the SD handle
+ * @param amount the reading value
+ */
+static void
+SD_add_data (struct SDHandle *h, unsigned int amount)
+{
+ struct SDEntry *entry;
+ double sqavg;
+ double sqsum_avg;
+
+ entry = NULL;
+ if (h->cnt == h->max_cnt)
+ {
+ entry = h->head;
+ GNUNET_CONTAINER_DLL_remove (h->head, h->tail, entry);
+ h->sum -= entry->amount;
+ h->sqsum -=
+ ((unsigned long) entry->amount) * ((unsigned long) entry->amount);
+ h->cnt--;
+ }
+ GNUNET_assert (h->cnt < h->max_cnt);
+ if (NULL == entry)
+ entry = GNUNET_malloc (sizeof (struct SDEntry));
+ entry->amount = amount;
+ GNUNET_CONTAINER_DLL_insert_tail (h->head, h->tail, entry);
+ h->sum += amount;
+ h->cnt++;
+ h->avg = ((float) h->sum) / ((float) h->cnt);
+ h->sqsum += ((unsigned long) amount) * ((unsigned long) amount);
+ sqsum_avg = ((double) h->sqsum) / ((double) h->cnt);
+ sqavg = ((double) h->avg) * ((double) h->avg);
+ h->vr = sqsum_avg - sqavg;
+}
+
+
+/**
+ * Returns the factor by which the given amount differs from the standard deviation
+ *
+ * @param h the SDhandle
+ * @param amount the value for which the deviation is returned
+
+ * @return the deviation from the average; GNUNET_SYSERR if the deviation cannot
+ * be calculated OR 0 if the deviation is less than the average; a
+ * maximum of 4 is returned for deviations equal to or larger than 4
+ */
+static int
+SD_deviation_factor (struct SDHandle *h, unsigned int amount)
+{
+ double diff;
+ unsigned int n;
+
+ if (h->cnt < 2)
+ return GNUNET_SYSERR;
+ if (((float) amount) > h->avg)
+ diff = ((float) amount) - h->avg;
+ else
+ return 0; //diff = h->avg - ((float) amount);
+ diff *= diff;
+ for (n = 1; n < 4; n++)
+ if (diff < (((double) (n * n)) * h->vr))
+ break;
+ return n;
+}
+
+
+/**
+ * Returns the operation context with the given id if found in the Operation
+ * context queues of the controller
+ *
+ * @param c the controller whose queues are searched
+ * @param id the id which has to be checked
+ * @return the matching operation context; NULL if no match found
+ */
+static struct OperationContext *
+find_opc (const struct GNUNET_TESTBED_Controller *c, const uint64_t id)
+{
+ struct OperationContext *opc;
+
+ for (opc = c->ocq_head; NULL != opc; opc = opc->next)
+ {
+ if (id == opc->id)
+ return opc;
+ }
+ return NULL;
+}
+
+
+/**
+ * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
+ * not
+ */
+static int
+handle_addhostconfirm (struct GNUNET_TESTBED_Controller *c,
+ const struct GNUNET_TESTBED_HostConfirmedMessage *msg)
+{
+ struct GNUNET_TESTBED_HostRegistrationHandle *rh;
+ char *emsg;
+ uint16_t msg_size;
+
+ rh = c->rh;
+ if (NULL == rh)
+ {
+ return GNUNET_OK;
+ }
+ if (GNUNET_TESTBED_host_get_id_ (rh->host) != ntohl (msg->host_id))
+ {
+ LOG_DEBUG ("Mismatch in host id's %u, %u of host confirm msg\n",
+ GNUNET_TESTBED_host_get_id_ (rh->host), ntohl (msg->host_id));
+ return GNUNET_OK;
+ }
+ c->rh = NULL;
+ msg_size = ntohs (msg->header.size);
+ if (sizeof (struct GNUNET_TESTBED_HostConfirmedMessage) == msg_size)
+ {
+ LOG_DEBUG ("Host %u successfully registered\n", ntohl (msg->host_id));
+ GNUNET_TESTBED_mark_host_registered_at_ (rh->host, c);
+ rh->cc (rh->cc_cls, NULL);
+ GNUNET_free (rh);
+ return GNUNET_OK;
+ }
+ /* We have an error message */
+ emsg = (char *) &msg[1];
+ if ('\0' !=
+ emsg[msg_size - sizeof (struct GNUNET_TESTBED_HostConfirmedMessage)])
+ {
+ GNUNET_break (0);
+ GNUNET_free (rh);
+ return GNUNET_NO;
+ }
+ LOG (GNUNET_ERROR_TYPE_ERROR, _("Adding host %u failed with error: %s\n"),
+ ntohl (msg->host_id), emsg);
+ rh->cc (rh->cc_cls, emsg);
+ GNUNET_free (rh);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Handler for forwarded operations
+ *
+ * @param c the controller handle
+ * @param opc the opearation context
+ * @param msg the message
+ */
+static void
+handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c,
+ struct OperationContext *opc,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct ForwardedOperationData *fo_data;
+
+ fo_data = opc->data;
+ if (NULL != fo_data->cc)
+ fo_data->cc (fo_data->cc_cls, msg);
+ GNUNET_CONTAINER_DLL_remove (c->ocq_head, c->ocq_tail, opc);
+ GNUNET_free (fo_data);
+ GNUNET_free (opc);
+}
+
+
+/**
+ * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
+ * not
+ */
+static int
+handle_opsuccess (struct GNUNET_TESTBED_Controller *c,
+ const struct
+ GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg)
+{
+ struct OperationContext *opc;
+ struct GNUNET_TESTBED_EventInformation event;
+ uint64_t op_id;
+
+ op_id = GNUNET_ntohll (msg->operation_id);
+ LOG_DEBUG ("Operation %lu successful\n", op_id);
+ if (NULL == (opc = find_opc (c, op_id)))
+ {
+ LOG_DEBUG ("Operation not found\n");
+ return GNUNET_YES;
+ }
+ event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
+ event.details.operation_finished.operation = opc->op;
+ event.details.operation_finished.op_cls = opc->op_cls;
+ event.details.operation_finished.emsg = NULL;
+ event.details.operation_finished.generic = NULL;
+ switch (opc->type)
+ {
+ case OP_FORWARDED:
+ {
+ handle_forwarded_operation_msg (c, opc,
+ (const struct GNUNET_MessageHeader *) msg);
+ return GNUNET_YES;
+ }
+ break;
+ case OP_PEER_DESTROY:
+ {
+ struct GNUNET_TESTBED_Peer *peer;
+
+ peer = opc->data;
+ GNUNET_free (peer);
+ opc->data = NULL;
+ //PEERDESTROYDATA
+ }
+ break;
+ case OP_LINK_CONTROLLERS:
+ {
+ struct ControllerLinkData *data;
+
+ data = opc->data;
+ GNUNET_assert (NULL != data);
+ GNUNET_free (data);
+ opc->data = NULL;
+ }
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+ GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+ opc->state = OPC_STATE_FINISHED;
+ if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED)))
+ {
+ if (NULL != c->cc)
+ c->cc (c->cc_cls, &event);
+ }
+ else
+ LOG_DEBUG ("Not calling callback\n");
+ return GNUNET_YES;
+}
+
+
+/**
+ * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS message from
+ * controller (testbed service)
+ *
+ * @param c the controller handle
+ * @param msg message received
+ * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
+ * not
+ */
+static int
+handle_peer_create_success (struct GNUNET_TESTBED_Controller *c,
+ const struct
+ GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg)
+{
+ struct OperationContext *opc;
+ struct PeerCreateData *data;
+ struct GNUNET_TESTBED_Peer *peer;
+ GNUNET_TESTBED_PeerCreateCallback cb;
+ void *cls;
+ uint64_t op_id;
+
+ GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage) ==
+ ntohs (msg->header.size));
+ op_id = GNUNET_ntohll (msg->operation_id);
+ if (NULL == (opc = find_opc (c, op_id)))
+ {
+ LOG_DEBUG ("Operation context for PeerCreateSuccessEvent not found\n");
+ return GNUNET_YES;
+ }
+ if (OP_FORWARDED == opc->type)
+ {
+ handle_forwarded_operation_msg (c, opc,
+ (const struct GNUNET_MessageHeader *) msg);
+ return GNUNET_YES;
+ }
+ GNUNET_assert (OP_PEER_CREATE == opc->type);
+ GNUNET_assert (NULL != opc->data);
+ data = opc->data;
+ GNUNET_assert (NULL != data->peer);
+ peer = data->peer;
+ GNUNET_assert (peer->unique_id == ntohl (msg->peer_id));
+ peer->state = PS_CREATED;
+ cb = data->cb;
+ cls = data->cls;
+ GNUNET_free (opc->data);
+ GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+ opc->state = OPC_STATE_FINISHED;
+ if (NULL != cb)
+ cb (cls, peer, NULL);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
+ * not
+ */
+static int
+handle_peer_event (struct GNUNET_TESTBED_Controller *c,
+ const struct GNUNET_TESTBED_PeerEventMessage *msg)
+{
+ struct OperationContext *opc;
+ struct GNUNET_TESTBED_Peer *peer;
+ struct PeerEventData *data;
+ GNUNET_TESTBED_PeerChurnCallback pcc;
+ void *pcc_cls;
+ struct GNUNET_TESTBED_EventInformation event;
+ uint64_t op_id;
+
+ GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerEventMessage) ==
+ ntohs (msg->header.size));
+ op_id = GNUNET_ntohll (msg->operation_id);
+ if (NULL == (opc = find_opc (c, op_id)))
+ {
+ LOG_DEBUG ("Operation not found\n");
+ return GNUNET_YES;
+ }
+ if (OP_FORWARDED == opc->type)
+ {
+ handle_forwarded_operation_msg (c, opc,
+ (const struct GNUNET_MessageHeader *) msg);
+ return GNUNET_YES;
+ }
+ GNUNET_assert ((OP_PEER_START == opc->type) || (OP_PEER_STOP == opc->type));
+ data = opc->data;
+ GNUNET_assert (NULL != data);
+ peer = data->peer;
+ GNUNET_assert (NULL != peer);
+ event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type);
+ switch (event.type)
+ {
+ case GNUNET_TESTBED_ET_PEER_START:
+ peer->state = PS_STARTED;
+ event.details.peer_start.host = peer->host;
+ event.details.peer_start.peer = peer;
+ break;
+ case GNUNET_TESTBED_ET_PEER_STOP:
+ peer->state = PS_STOPPED;
+ event.details.peer_stop.peer = peer;
+ break;
+ default:
+ GNUNET_assert (0); /* We should never reach this state */
+ }
+ pcc = data->pcc;
+ pcc_cls = data->pcc_cls;
+ GNUNET_free (data);
+ GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+ opc->state = OPC_STATE_FINISHED;
+ if (0 !=
+ ((GNUNET_TESTBED_ET_PEER_START | GNUNET_TESTBED_ET_PEER_STOP) &
+ c->event_mask))
+ {
+ if (NULL != c->cc)
+ c->cc (c->cc_cls, &event);
+ }
+ if (NULL != pcc)
+ pcc (pcc_cls, NULL);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
+ * not
+ */
+static int
+handle_peer_conevent (struct GNUNET_TESTBED_Controller *c,
+ const struct GNUNET_TESTBED_ConnectionEventMessage *msg)
+{
+ struct OperationContext *opc;
+ struct OverlayConnectData *data;
+ GNUNET_TESTBED_OperationCompletionCallback cb;
+ void *cb_cls;
+ struct GNUNET_TESTBED_EventInformation event;
+ uint64_t op_id;
+
+ op_id = GNUNET_ntohll (msg->operation_id);
+ if (NULL == (opc = find_opc (c, op_id)))
+ {
+ LOG_DEBUG ("Operation not found\n");
+ return GNUNET_YES;
+ }
+ if (OP_FORWARDED == opc->type)
+ {
+ handle_forwarded_operation_msg (c, opc,
+ (const struct GNUNET_MessageHeader *) msg);
+ return GNUNET_YES;
+ }
+ GNUNET_assert (OP_OVERLAY_CONNECT == opc->type);
+ data = opc->data;
+ GNUNET_assert (NULL != data);
+ GNUNET_assert ((ntohl (msg->peer1) == data->p1->unique_id) &&
+ (ntohl (msg->peer2) == data->p2->unique_id));
+ event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type);
+ switch (event.type)
+ {
+ case GNUNET_TESTBED_ET_CONNECT:
+ event.details.peer_connect.peer1 = data->p1;
+ event.details.peer_connect.peer2 = data->p2;
+ break;
+ case GNUNET_TESTBED_ET_DISCONNECT:
+ GNUNET_assert (0); /* FIXME: implement */
+ break;
+ default:
+ GNUNET_assert (0); /* Should never reach here */
+ break;
+ }
+ cb = data->cb;
+ cb_cls = data->cb_cls;
+ GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+ opc->state = OPC_STATE_FINISHED;
+ if (NULL != cb)
+ cb (cb_cls, opc->op, NULL);
+ if (0 !=
+ ((GNUNET_TESTBED_ET_CONNECT | GNUNET_TESTBED_ET_DISCONNECT) &
+ c->event_mask))
+ {
+ if (NULL != c->cc)
+ c->cc (c->cc_cls, &event);
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
+ * not
+ */
+static int
+handle_peer_config (struct GNUNET_TESTBED_Controller *c,
+ const struct
+ GNUNET_TESTBED_PeerConfigurationInformationMessage *msg)
+{
+ struct OperationContext *opc;
+ struct GNUNET_TESTBED_Peer *peer;
+ struct PeerInfoData *data;
+ struct GNUNET_TESTBED_PeerInformation *pinfo;
+ GNUNET_TESTBED_PeerInfoCallback cb;
+ void *cb_cls;
+ uint64_t op_id;
+
+ op_id = GNUNET_ntohll (msg->operation_id);
+ if (NULL == (opc = find_opc (c, op_id)))
+ {
+ LOG_DEBUG ("Operation not found\n");
+ return GNUNET_YES;
+ }
+ if (OP_FORWARDED == opc->type)
+ {
+ handle_forwarded_operation_msg (c, opc,
+ (const struct GNUNET_MessageHeader *) msg);
+ return GNUNET_YES;
+ }
+ data = opc->data;
+ GNUNET_assert (NULL != data);
+ peer = data->peer;
+ GNUNET_assert (NULL != peer);
+ GNUNET_assert (ntohl (msg->peer_id) == peer->unique_id);
+ pinfo = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerInformation));
+ pinfo->pit = data->pit;
+ cb = data->cb;
+ cb_cls = data->cb_cls;
+ GNUNET_free (data);
+ opc->data = NULL;
+ switch (pinfo->pit)
+ {
+ case GNUNET_TESTBED_PIT_IDENTITY:
+ pinfo->result.id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity));
+ (void) memcpy (pinfo->result.id, &msg->peer_identity,
+ sizeof (struct GNUNET_PeerIdentity));
+ break;
+ case GNUNET_TESTBED_PIT_CONFIGURATION:
+ pinfo->result.cfg = /* Freed in oprelease_peer_getinfo */
+ GNUNET_TESTBED_extract_config_ (&msg->header);
+ break;
+ case GNUNET_TESTBED_PIT_GENERIC:
+ GNUNET_assert (0); /* never reach here */
+ break;
+ }
+ opc->data = pinfo;
+ GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+ opc->state = OPC_STATE_FINISHED;
+ if (NULL != cb)
+ cb (cb_cls, opc->op, pinfo, NULL);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Handler for GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT message from
+ * controller (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
+ * not
+ */
+static int
+handle_op_fail_event (struct GNUNET_TESTBED_Controller *c,
+ const struct GNUNET_TESTBED_OperationFailureEventMessage
+ *msg)
+{
+ struct OperationContext *opc;
+ const char *emsg;
+ uint64_t op_id;
+ struct GNUNET_TESTBED_EventInformation event;
+
+ op_id = GNUNET_ntohll (msg->operation_id);
+ if (NULL == (opc = find_opc (c, op_id)))
+ {
+ LOG_DEBUG ("Operation not found\n");
+ return GNUNET_YES;
+ }
+ if (OP_FORWARDED == opc->type)
+ {
+ handle_forwarded_operation_msg (c, opc,
+ (const struct GNUNET_MessageHeader *) msg);
+ return GNUNET_YES;
+ }
+ GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+ opc->state = OPC_STATE_FINISHED;
+ emsg = GNUNET_TESTBED_parse_error_string_ (msg);
+ if (NULL == emsg)
+ emsg = "Unknown error";
+ if (OP_PEER_INFO == opc->type)
+ {
+ struct PeerInfoData *data;
+
+ data = opc->data;
+ if (NULL != data->cb)
+ data->cb (data->cb_cls, opc->op, NULL, emsg);
+ GNUNET_free (data);
+ return GNUNET_YES; /* We do not call controller callback for peer info */
+ }
+ if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) &&
+ (NULL != c->cc))
+ {
+ event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
+ event.details.operation_finished.operation = opc->op;
+ event.details.operation_finished.op_cls = opc->op_cls;
+ event.details.operation_finished.emsg = emsg;
+ event.details.operation_finished.generic = NULL;
+ c->cc (c->cc_cls, &event);
+ if (event.details.operation_finished.operation == last_finished_operation)
+ return GNUNET_YES;
+ }
+ switch (opc->type)
+ {
+ case OP_PEER_CREATE:
+ {
+ struct PeerCreateData *data;
+
+ data = opc->data;
+ GNUNET_free (data->peer);
+ if (NULL != data->cb)
+ data->cb (data->cls, NULL, emsg);
+ GNUNET_free (data);
+ }
+ break;
+ case OP_PEER_START:
+ case OP_PEER_STOP:
+ {
+ struct PeerEventData *data;
+
+ data = opc->data;
+ if (NULL != data->pcc)
+ data->pcc (data->pcc_cls, emsg);
+ GNUNET_free (data);
+ }
+ break;
+ case OP_PEER_DESTROY:
+ break;
+ case OP_PEER_INFO:
+ GNUNET_assert (0);
+ case OP_OVERLAY_CONNECT:
+ {
+ struct OverlayConnectData *data;
+
+ data = opc->data;
+ data->failed = GNUNET_YES;
+ if (NULL != data->cb)
+ data->cb (data->cb_cls, opc->op, emsg);
+ }
+ break;
+ case OP_FORWARDED:
+ GNUNET_assert (0);
+ case OP_LINK_CONTROLLERS: /* No secondary callback */
+ break;
+ default:
+ GNUNET_break (0);
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Function to build GET_SLAVE_CONFIG message
+ *
+ * @param op_id the id this message should contain in its operation id field
+ * @param slave_id the id this message should contain in its slave id field
+ * @return newly allocated SlaveGetConfigurationMessage
+ */
+static struct GNUNET_TESTBED_SlaveGetConfigurationMessage *
+GNUNET_TESTBED_generate_slavegetconfig_msg_ (uint64_t op_id, uint32_t slave_id)
+{
+ struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg;
+ uint16_t msize;
+
+ msize = sizeof (struct GNUNET_TESTBED_SlaveGetConfigurationMessage);
+ msg = GNUNET_malloc (msize);
+ msg->header.size = htons (msize);
+ msg->header.type =
+ htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_SLAVE_CONFIGURATION);
+ msg->operation_id = GNUNET_htonll (op_id);
+ msg->slave_id = htonl (slave_id);
+ return msg;
+}
+
+
+/**
+ * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller
+ * (testbed service)
+ *
+ * @param c the controller handler
+ * @param msg message received
+ * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if
+ * not
+ */
+static int
+handle_slave_config (struct GNUNET_TESTBED_Controller *c,
+ const struct GNUNET_TESTBED_SlaveConfiguration *msg)
+{
+ struct OperationContext *opc;
+ uint64_t op_id;
+ struct GNUNET_TESTBED_EventInformation event;
+
+ op_id = GNUNET_ntohll (msg->operation_id);
+ if (NULL == (opc = find_opc (c, op_id)))
+ {
+ LOG_DEBUG ("Operation not found\n");
+ return GNUNET_YES;
+ }
+ if (OP_GET_SLAVE_CONFIG != opc->type)
+ {
+ GNUNET_break (0);
+ return GNUNET_YES;
+ }
+ GNUNET_free (opc->data);
+ opc->data = NULL;
+ opc->state = OPC_STATE_FINISHED;
+ GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc);
+ if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) &&
+ (NULL != c->cc))
+ {
+ opc->data = GNUNET_TESTBED_extract_config_ (&msg->header);
+ event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED;
+ event.details.operation_finished.generic = opc->data;
+ event.details.operation_finished.operation = opc->op;
+ event.details.operation_finished.op_cls = opc->op_cls;
+ event.details.operation_finished.emsg = NULL;
+ c->cc (c->cc_cls, &event);
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Handler for messages from controller (testbed service)
+ *
+ * @param cls the controller handler
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_TESTBED_Controller *c = cls;
+ int status;
+ uint16_t msize;
+
+ c->in_receive = GNUNET_NO;
+ /* FIXME: Add checks for message integrity */
+ if (NULL == msg)
+ {
+ LOG_DEBUG ("Receive timed out or connection to service dropped\n");
+ return;
+ }
+ status = GNUNET_OK;
+ msize = ntohs (msg->size);
+ switch (ntohs (msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS:
+ GNUNET_assert (msize >=
+ sizeof (struct GNUNET_TESTBED_HostConfirmedMessage));
+ status =
+ handle_addhostconfirm (c,
+ (const struct GNUNET_TESTBED_HostConfirmedMessage
+ *) msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS:
+ GNUNET_assert (msize ==
+ sizeof (struct
+ GNUNET_TESTBED_GenericOperationSuccessEventMessage));
+ status =
+ handle_opsuccess (c,
+ (const struct
+ GNUNET_TESTBED_GenericOperationSuccessEventMessage *)
+ msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS:
+ GNUNET_assert (msize ==
+ sizeof (struct
+ GNUNET_TESTBED_PeerCreateSuccessEventMessage));
+ status =
+ handle_peer_create_success (c,
+ (const struct
+ GNUNET_TESTBED_PeerCreateSuccessEventMessage
+ *) msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT:
+ GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage));
+ status =
+ handle_peer_event (c,
+ (const struct GNUNET_TESTBED_PeerEventMessage *)
+ msg);
+
+ break;
+ case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION:
+ GNUNET_assert (msize >=
+ sizeof (struct
+ GNUNET_TESTBED_PeerConfigurationInformationMessage));
+ status =
+ handle_peer_config (c,
+ (const struct
+ GNUNET_TESTBED_PeerConfigurationInformationMessage
+ *) msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT:
+ GNUNET_assert (msize ==
+ sizeof (struct GNUNET_TESTBED_ConnectionEventMessage));
+ status =
+ handle_peer_conevent (c,
+ (const struct
+ GNUNET_TESTBED_ConnectionEventMessage *) msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT:
+ GNUNET_assert (msize >=
+ sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage));
+ status =
+ handle_op_fail_event (c,
+ (const struct
+ GNUNET_TESTBED_OperationFailureEventMessage *)
+ msg);
+ break;
+ case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION:
+ GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration));
+ status =
+ handle_slave_config (c,
+ (const struct GNUNET_TESTBED_SlaveConfiguration *)
+ msg);
+ break;
+ default:
+ GNUNET_assert (0);
+ }
+ if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive))
+ {
+ c->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (c->client, &message_handler, c,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ }
+}
+
+
+/**
+ * Function called to notify a client about the connection begin ready to queue
+ * more data. "buf" will be NULL and "size" zero if the connection was closed
+ * for writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+transmit_ready_notify (void *cls, size_t size, void *buf)
+{
+ struct GNUNET_TESTBED_Controller *c = cls;
+ struct MessageQueue *mq_entry;
+
+ c->th = NULL;
+ mq_entry = c->mq_head;
+ GNUNET_assert (NULL != mq_entry);
+ if ((0 == size) && (NULL == buf)) /* Timeout */
+ {
+ LOG_DEBUG ("Message sending timed out -- retrying\n");
+ c->th =
+ GNUNET_CLIENT_notify_transmit_ready (c->client,
+ ntohs (mq_entry->msg->size),
+ TIMEOUT_REL, GNUNET_YES,
+ &transmit_ready_notify, c);
+ return 0;
+ }
+ GNUNET_assert (ntohs (mq_entry->msg->size) <= size);
+ size = ntohs (mq_entry->msg->size);
+ memcpy (buf, mq_entry->msg, size);
+ LOG_DEBUG ("Message of type: %u and size: %u sent\n",
+ ntohs (mq_entry->msg->type), size);
+ GNUNET_free (mq_entry->msg);
+ GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry);
+ GNUNET_free (mq_entry);
+ mq_entry = c->mq_head;
+ if (NULL != mq_entry)
+ c->th =
+ GNUNET_CLIENT_notify_transmit_ready (c->client,
+ ntohs (mq_entry->msg->size),
+ TIMEOUT_REL, GNUNET_YES,
+ &transmit_ready_notify, c);
+ if (GNUNET_NO == c->in_receive)
+ {
+ c->in_receive = GNUNET_YES;
+ GNUNET_CLIENT_receive (c->client, &message_handler, c,
+ GNUNET_TIME_UNIT_FOREVER_REL);
+ }
+ return size;
+}
+
+
+/**
+ * Queues a message in send queue for sending to the service
+ *
+ * @param controller the handle to the controller
+ * @param msg the message to queue
+ */
+void
+GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller,
+ struct GNUNET_MessageHeader *msg)
+{
+ struct MessageQueue *mq_entry;
+ uint16_t type;
+ uint16_t size;
+
+ type = ntohs (msg->type);
+ size = ntohs (msg->size);
+ GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) &&
+ (GNUNET_MESSAGE_TYPE_TESTBED_MAX > type));
+ mq_entry = GNUNET_malloc (sizeof (struct MessageQueue));
+ mq_entry->msg = msg;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,