diff options
Diffstat (limited to 'src/testbed/testbed_api.c')
-rw-r--r-- | src/testbed/testbed_api.c | 2702 |
1 files changed, 2658 insertions, 44 deletions
diff --git a/src/testbed/testbed_api.c b/src/testbed/testbed_api.c index 5168081..054aa3b 100644 --- a/src/testbed/testbed_api.c +++ b/src/testbed/testbed_api.c @@ -24,15 +24,1772 @@ * This library is supposed to make it easier to write * testcases and script large-scale benchmarks. * @author Christian Grothoff + * @author Sree Harsha Totakura */ + + #include "platform.h" #include "gnunet_testbed_service.h" #include "gnunet_core_service.h" #include "gnunet_constants.h" #include "gnunet_transport_service.h" #include "gnunet_hello_lib.h" +#include <zlib.h> + +#include "testbed.h" +#include "testbed_api.h" +#include "testbed_api_hosts.h" +#include "testbed_api_peers.h" +#include "testbed_api_operations.h" + +/** + * Generic logging shorthand + */ +#define LOG(kind, ...) \ + GNUNET_log_from (kind, "testbed-api", __VA_ARGS__); + +/** + * Debug logging + */ +#define LOG_DEBUG(...) \ + LOG (GNUNET_ERROR_TYPE_DEBUG, __VA_ARGS__); + +/** + * Relative time seconds shorthand + */ +#define TIME_REL_SECS(sec) \ + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, sec) + + +/** + * Default server message sending retry timeout + */ +#define TIMEOUT_REL TIME_REL_SECS(1) + + +/** + * Handle for controller process + */ +struct GNUNET_TESTBED_ControllerProc +{ + /** + * The process handle + */ + struct GNUNET_HELPER_Handle *helper; + + /** + * The arguments used to start the helper + */ + char **helper_argv; + + /** + * The host where the helper is run + */ + struct GNUNET_TESTBED_Host *host; + + /** + * The controller error callback + */ + GNUNET_TESTBED_ControllerStatusCallback cb; + + /** + * The closure for the above callback + */ + void *cls; + + /** + * The send handle for the helper + */ + struct GNUNET_HELPER_SendHandle *shandle; + + /** + * The message corresponding to send handle + */ + struct GNUNET_MessageHeader *msg; + + /** + * The configuration of the running testbed service + */ + struct GNUNET_CONFIGURATION_Handle *cfg; + +}; + + +/** + * The message queue for sending messages to the controller service + */ +struct MessageQueue +{ + /** + * The message to be sent + */ + struct GNUNET_MessageHeader *msg; + + /** + * next pointer for DLL + */ + struct MessageQueue *next; + + /** + * prev pointer for DLL + */ + struct MessageQueue *prev; +}; + + +/** + * Structure for a controller link + */ +struct ControllerLink +{ + /** + * The next ptr for DLL + */ + struct ControllerLink *next; + + /** + * The prev ptr for DLL + */ + struct ControllerLink *prev; + + /** + * The host which will be referred in the peer start request. This is the + * host where the peer should be started + */ + struct GNUNET_TESTBED_Host *delegated_host; + + /** + * The host which will contacted to delegate the peer start request + */ + struct GNUNET_TESTBED_Host *slave_host; + + /** + * The configuration to be used to connect to slave host + */ + const struct GNUNET_CONFIGURATION_Handle *slave_cfg; + + /** + * GNUNET_YES if the slave should be started (and stopped) by us; GNUNET_NO + * if we are just allowed to use the slave via TCP/IP + */ + int is_subordinate; +}; + + +/** + * handle for host registration + */ +struct GNUNET_TESTBED_HostRegistrationHandle +{ + /** + * The host being registered + */ + struct GNUNET_TESTBED_Host *host; + + /** + * The controller at which this host is being registered + */ + struct GNUNET_TESTBED_Controller *c; + + /** + * The Registartion completion callback + */ + GNUNET_TESTBED_HostRegistrationCompletion cc; + + /** + * The closure for above callback + */ + void *cc_cls; +}; + + +/** + * Context data for forwarded Operation + */ +struct ForwardedOperationData +{ + + /** + * The callback to call when reply is available + */ + GNUNET_CLIENT_MessageHandler cc; + + /** + * The closure for the above callback + */ + void *cc_cls; + +}; + + +/** + * Context data for get slave config operations + */ +struct GetSlaveConfigData +{ + /** + * The id of the slave controller + */ + uint32_t slave_id; + +}; + + +/** + * Context data for controller link operations + */ +struct ControllerLinkData +{ + /** + * The controller link message + */ + struct GNUNET_TESTBED_ControllerLinkMessage *msg; + +}; + + +struct SDEntry +{ + /** + * DLL next pointer + */ + struct SDEntry *next; + + /** + * DLL prev pointer + */ + struct SDEntry *prev; + + /** + * The value to store + */ + unsigned int amount; +}; + + +struct SDHandle +{ + /** + * DLL head for storing entries + */ + struct SDEntry *head; + + /** + * DLL tail for storing entries + */ + struct SDEntry *tail; + + /** + * Squared sum of data values + */ + unsigned long long sqsum; + + /** + * Sum of the data values + */ + unsigned long sum; + + /** + * The average of data amounts + */ + float avg; + + /** + * The variance + */ + double vr; + + /** + * Number of data values; also the length of DLL containing SDEntries + */ + unsigned int cnt; + + /** + * max number of entries we can have in the DLL + */ + unsigned int max_cnt; +}; + + +/** + * This variable is set to the operation that has been last marked as done. It + * is used to verify whether the state associated with an operation is valid + * after the first notify callback is called. Such checks are necessary for + * certain operations where we have 2 notify callbacks. Examples are + * OP_PEER_CREATE, OP_PEER_START/STOP, OP_OVERLAY_CONNECT. + * + * This variable should ONLY be used to compare; it is a dangling pointer!! + */ +static const struct GNUNET_TESTBED_Operation *last_finished_operation; + +/** + * Initialize standard deviation calculation handle + * + * @param max_cnt the maximum number of readings to keep + * @return the initialized handle + */ +static struct SDHandle * +SD_init (unsigned int max_cnt) +{ + struct SDHandle *h; + + GNUNET_assert (1 < max_cnt); + h = GNUNET_malloc (sizeof (struct SDHandle)); + h->max_cnt = max_cnt; + return h; +} + + +/** + * Frees the memory allocated to the SD handle + * + * @param h the SD handle + */ +static void +SD_destroy (struct SDHandle *h) +{ + struct SDEntry *entry; + + while (NULL != (entry = h->head)) + { + GNUNET_CONTAINER_DLL_remove (h->head, h->tail, entry); + GNUNET_free (entry); + } + GNUNET_free (h); +} + + +/** + * Add a reading to SD + * + * @param h the SD handle + * @param amount the reading value + */ +static void +SD_add_data (struct SDHandle *h, unsigned int amount) +{ + struct SDEntry *entry; + double sqavg; + double sqsum_avg; + + entry = NULL; + if (h->cnt == h->max_cnt) + { + entry = h->head; + GNUNET_CONTAINER_DLL_remove (h->head, h->tail, entry); + h->sum -= entry->amount; + h->sqsum -= + ((unsigned long) entry->amount) * ((unsigned long) entry->amount); + h->cnt--; + } + GNUNET_assert (h->cnt < h->max_cnt); + if (NULL == entry) + entry = GNUNET_malloc (sizeof (struct SDEntry)); + entry->amount = amount; + GNUNET_CONTAINER_DLL_insert_tail (h->head, h->tail, entry); + h->sum += amount; + h->cnt++; + h->avg = ((float) h->sum) / ((float) h->cnt); + h->sqsum += ((unsigned long) amount) * ((unsigned long) amount); + sqsum_avg = ((double) h->sqsum) / ((double) h->cnt); + sqavg = ((double) h->avg) * ((double) h->avg); + h->vr = sqsum_avg - sqavg; +} + + +/** + * Returns the factor by which the given amount differs from the standard deviation + * + * @param h the SDhandle + * @param amount the value for which the deviation is returned + + * @return the deviation from the average; GNUNET_SYSERR if the deviation cannot + * be calculated OR 0 if the deviation is less than the average; a + * maximum of 4 is returned for deviations equal to or larger than 4 + */ +static int +SD_deviation_factor (struct SDHandle *h, unsigned int amount) +{ + double diff; + unsigned int n; + + if (h->cnt < 2) + return GNUNET_SYSERR; + if (((float) amount) > h->avg) + diff = ((float) amount) - h->avg; + else + return 0; //diff = h->avg - ((float) amount); + diff *= diff; + for (n = 1; n < 4; n++) + if (diff < (((double) (n * n)) * h->vr)) + break; + return n; +} + + +/** + * Returns the operation context with the given id if found in the Operation + * context queues of the controller + * + * @param c the controller whose queues are searched + * @param id the id which has to be checked + * @return the matching operation context; NULL if no match found + */ +static struct OperationContext * +find_opc (const struct GNUNET_TESTBED_Controller *c, const uint64_t id) +{ + struct OperationContext *opc; + + for (opc = c->ocq_head; NULL != opc; opc = opc->next) + { + if (id == opc->id) + return opc; + } + return NULL; +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if + * not + */ +static int +handle_addhostconfirm (struct GNUNET_TESTBED_Controller *c, + const struct GNUNET_TESTBED_HostConfirmedMessage *msg) +{ + struct GNUNET_TESTBED_HostRegistrationHandle *rh; + char *emsg; + uint16_t msg_size; + + rh = c->rh; + if (NULL == rh) + { + return GNUNET_OK; + } + if (GNUNET_TESTBED_host_get_id_ (rh->host) != ntohl (msg->host_id)) + { + LOG_DEBUG ("Mismatch in host id's %u, %u of host confirm msg\n", + GNUNET_TESTBED_host_get_id_ (rh->host), ntohl (msg->host_id)); + return GNUNET_OK; + } + c->rh = NULL; + msg_size = ntohs (msg->header.size); + if (sizeof (struct GNUNET_TESTBED_HostConfirmedMessage) == msg_size) + { + LOG_DEBUG ("Host %u successfully registered\n", ntohl (msg->host_id)); + GNUNET_TESTBED_mark_host_registered_at_ (rh->host, c); + rh->cc (rh->cc_cls, NULL); + GNUNET_free (rh); + return GNUNET_OK; + } + /* We have an error message */ + emsg = (char *) &msg[1]; + if ('\0' != + emsg[msg_size - sizeof (struct GNUNET_TESTBED_HostConfirmedMessage)]) + { + GNUNET_break (0); + GNUNET_free (rh); + return GNUNET_NO; + } + LOG (GNUNET_ERROR_TYPE_ERROR, _("Adding host %u failed with error: %s\n"), + ntohl (msg->host_id), emsg); + rh->cc (rh->cc_cls, emsg); + GNUNET_free (rh); + return GNUNET_OK; +} + + +/** + * Handler for forwarded operations + * + * @param c the controller handle + * @param opc the opearation context + * @param msg the message + */ +static void +handle_forwarded_operation_msg (struct GNUNET_TESTBED_Controller *c, + struct OperationContext *opc, + const struct GNUNET_MessageHeader *msg) +{ + struct ForwardedOperationData *fo_data; + + fo_data = opc->data; + if (NULL != fo_data->cc) + fo_data->cc (fo_data->cc_cls, msg); + GNUNET_CONTAINER_DLL_remove (c->ocq_head, c->ocq_tail, opc); + GNUNET_free (fo_data); + GNUNET_free (opc); +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_ADDHOSTCONFIRM message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if + * not + */ +static int +handle_opsuccess (struct GNUNET_TESTBED_Controller *c, + const struct + GNUNET_TESTBED_GenericOperationSuccessEventMessage *msg) +{ + struct OperationContext *opc; + struct GNUNET_TESTBED_EventInformation event; + uint64_t op_id; + + op_id = GNUNET_ntohll (msg->operation_id); + LOG_DEBUG ("Operation %lu successful\n", op_id); + if (NULL == (opc = find_opc (c, op_id))) + { + LOG_DEBUG ("Operation not found\n"); + return GNUNET_YES; + } + event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.details.operation_finished.operation = opc->op; + event.details.operation_finished.op_cls = opc->op_cls; + event.details.operation_finished.emsg = NULL; + event.details.operation_finished.generic = NULL; + switch (opc->type) + { + case OP_FORWARDED: + { + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return GNUNET_YES; + } + break; + case OP_PEER_DESTROY: + { + struct GNUNET_TESTBED_Peer *peer; + + peer = opc->data; + GNUNET_free (peer); + opc->data = NULL; + //PEERDESTROYDATA + } + break; + case OP_LINK_CONTROLLERS: + { + struct ControllerLinkData *data; + + data = opc->data; + GNUNET_assert (NULL != data); + GNUNET_free (data); + opc->data = NULL; + } + break; + default: + GNUNET_assert (0); + } + GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + opc->state = OPC_STATE_FINISHED; + if (0 != (c->event_mask & (1L << GNUNET_TESTBED_ET_OPERATION_FINISHED))) + { + if (NULL != c->cc) + c->cc (c->cc_cls, &event); + } + else + LOG_DEBUG ("Not calling callback\n"); + return GNUNET_YES; +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCREATESUCCESS message from + * controller (testbed service) + * + * @param c the controller handle + * @param msg message received + * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if + * not + */ +static int +handle_peer_create_success (struct GNUNET_TESTBED_Controller *c, + const struct + GNUNET_TESTBED_PeerCreateSuccessEventMessage *msg) +{ + struct OperationContext *opc; + struct PeerCreateData *data; + struct GNUNET_TESTBED_Peer *peer; + GNUNET_TESTBED_PeerCreateCallback cb; + void *cls; + uint64_t op_id; + + GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerCreateSuccessEventMessage) == + ntohs (msg->header.size)); + op_id = GNUNET_ntohll (msg->operation_id); + if (NULL == (opc = find_opc (c, op_id))) + { + LOG_DEBUG ("Operation context for PeerCreateSuccessEvent not found\n"); + return GNUNET_YES; + } + if (OP_FORWARDED == opc->type) + { + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return GNUNET_YES; + } + GNUNET_assert (OP_PEER_CREATE == opc->type); + GNUNET_assert (NULL != opc->data); + data = opc->data; + GNUNET_assert (NULL != data->peer); + peer = data->peer; + GNUNET_assert (peer->unique_id == ntohl (msg->peer_id)); + peer->state = PS_CREATED; + cb = data->cb; + cls = data->cls; + GNUNET_free (opc->data); + GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + opc->state = OPC_STATE_FINISHED; + if (NULL != cb) + cb (cls, peer, NULL); + return GNUNET_YES; +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEEREVENT message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if + * not + */ +static int +handle_peer_event (struct GNUNET_TESTBED_Controller *c, + const struct GNUNET_TESTBED_PeerEventMessage *msg) +{ + struct OperationContext *opc; + struct GNUNET_TESTBED_Peer *peer; + struct PeerEventData *data; + GNUNET_TESTBED_PeerChurnCallback pcc; + void *pcc_cls; + struct GNUNET_TESTBED_EventInformation event; + uint64_t op_id; + + GNUNET_assert (sizeof (struct GNUNET_TESTBED_PeerEventMessage) == + ntohs (msg->header.size)); + op_id = GNUNET_ntohll (msg->operation_id); + if (NULL == (opc = find_opc (c, op_id))) + { + LOG_DEBUG ("Operation not found\n"); + return GNUNET_YES; + } + if (OP_FORWARDED == opc->type) + { + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return GNUNET_YES; + } + GNUNET_assert ((OP_PEER_START == opc->type) || (OP_PEER_STOP == opc->type)); + data = opc->data; + GNUNET_assert (NULL != data); + peer = data->peer; + GNUNET_assert (NULL != peer); + event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type); + switch (event.type) + { + case GNUNET_TESTBED_ET_PEER_START: + peer->state = PS_STARTED; + event.details.peer_start.host = peer->host; + event.details.peer_start.peer = peer; + break; + case GNUNET_TESTBED_ET_PEER_STOP: + peer->state = PS_STOPPED; + event.details.peer_stop.peer = peer; + break; + default: + GNUNET_assert (0); /* We should never reach this state */ + } + pcc = data->pcc; + pcc_cls = data->pcc_cls; + GNUNET_free (data); + GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + opc->state = OPC_STATE_FINISHED; + if (0 != + ((GNUNET_TESTBED_ET_PEER_START | GNUNET_TESTBED_ET_PEER_STOP) & + c->event_mask)) + { + if (NULL != c->cc) + c->cc (c->cc_cls, &event); + } + if (NULL != pcc) + pcc (pcc_cls, NULL); + return GNUNET_YES; +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONEVENT message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if + * not + */ +static int +handle_peer_conevent (struct GNUNET_TESTBED_Controller *c, + const struct GNUNET_TESTBED_ConnectionEventMessage *msg) +{ + struct OperationContext *opc; + struct OverlayConnectData *data; + GNUNET_TESTBED_OperationCompletionCallback cb; + void *cb_cls; + struct GNUNET_TESTBED_EventInformation event; + uint64_t op_id; + + op_id = GNUNET_ntohll (msg->operation_id); + if (NULL == (opc = find_opc (c, op_id))) + { + LOG_DEBUG ("Operation not found\n"); + return GNUNET_YES; + } + if (OP_FORWARDED == opc->type) + { + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return GNUNET_YES; + } + GNUNET_assert (OP_OVERLAY_CONNECT == opc->type); + data = opc->data; + GNUNET_assert (NULL != data); + GNUNET_assert ((ntohl (msg->peer1) == data->p1->unique_id) && + (ntohl (msg->peer2) == data->p2->unique_id)); + event.type = (enum GNUNET_TESTBED_EventType) ntohl (msg->event_type); + switch (event.type) + { + case GNUNET_TESTBED_ET_CONNECT: + event.details.peer_connect.peer1 = data->p1; + event.details.peer_connect.peer2 = data->p2; + break; + case GNUNET_TESTBED_ET_DISCONNECT: + GNUNET_assert (0); /* FIXME: implement */ + break; + default: + GNUNET_assert (0); /* Should never reach here */ + break; + } + cb = data->cb; + cb_cls = data->cb_cls; + GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + opc->state = OPC_STATE_FINISHED; + if (NULL != cb) + cb (cb_cls, opc->op, NULL); + if (0 != + ((GNUNET_TESTBED_ET_CONNECT | GNUNET_TESTBED_ET_DISCONNECT) & + c->event_mask)) + { + if (NULL != c->cc) + c->cc (c->cc_cls, &event); + } + return GNUNET_YES; +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_PEERCONFIG message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if + * not + */ +static int +handle_peer_config (struct GNUNET_TESTBED_Controller *c, + const struct + GNUNET_TESTBED_PeerConfigurationInformationMessage *msg) +{ + struct OperationContext *opc; + struct GNUNET_TESTBED_Peer *peer; + struct PeerInfoData *data; + struct GNUNET_TESTBED_PeerInformation *pinfo; + GNUNET_TESTBED_PeerInfoCallback cb; + void *cb_cls; + uint64_t op_id; + + op_id = GNUNET_ntohll (msg->operation_id); + if (NULL == (opc = find_opc (c, op_id))) + { + LOG_DEBUG ("Operation not found\n"); + return GNUNET_YES; + } + if (OP_FORWARDED == opc->type) + { + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return GNUNET_YES; + } + data = opc->data; + GNUNET_assert (NULL != data); + peer = data->peer; + GNUNET_assert (NULL != peer); + GNUNET_assert (ntohl (msg->peer_id) == peer->unique_id); + pinfo = GNUNET_malloc (sizeof (struct GNUNET_TESTBED_PeerInformation)); + pinfo->pit = data->pit; + cb = data->cb; + cb_cls = data->cb_cls; + GNUNET_free (data); + opc->data = NULL; + switch (pinfo->pit) + { + case GNUNET_TESTBED_PIT_IDENTITY: + pinfo->result.id = GNUNET_malloc (sizeof (struct GNUNET_PeerIdentity)); + (void) memcpy (pinfo->result.id, &msg->peer_identity, + sizeof (struct GNUNET_PeerIdentity)); + break; + case GNUNET_TESTBED_PIT_CONFIGURATION: + pinfo->result.cfg = /* Freed in oprelease_peer_getinfo */ + GNUNET_TESTBED_extract_config_ (&msg->header); + break; + case GNUNET_TESTBED_PIT_GENERIC: + GNUNET_assert (0); /* never reach here */ + break; + } + opc->data = pinfo; + GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + opc->state = OPC_STATE_FINISHED; + if (NULL != cb) + cb (cb_cls, opc->op, pinfo, NULL); + return GNUNET_YES; +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_OPERATIONFAILEVENT message from + * controller (testbed service) + * + * @param c the controller handler + * @param msg message received + * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if + * not + */ +static int +handle_op_fail_event (struct GNUNET_TESTBED_Controller *c, + const struct GNUNET_TESTBED_OperationFailureEventMessage + *msg) +{ + struct OperationContext *opc; + const char *emsg; + uint64_t op_id; + struct GNUNET_TESTBED_EventInformation event; + + op_id = GNUNET_ntohll (msg->operation_id); + if (NULL == (opc = find_opc (c, op_id))) + { + LOG_DEBUG ("Operation not found\n"); + return GNUNET_YES; + } + if (OP_FORWARDED == opc->type) + { + handle_forwarded_operation_msg (c, opc, + (const struct GNUNET_MessageHeader *) msg); + return GNUNET_YES; + } + GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + opc->state = OPC_STATE_FINISHED; + emsg = GNUNET_TESTBED_parse_error_string_ (msg); + if (NULL == emsg) + emsg = "Unknown error"; + if (OP_PEER_INFO == opc->type) + { + struct PeerInfoData *data; + + data = opc->data; + if (NULL != data->cb) + data->cb (data->cb_cls, opc->op, NULL, emsg); + GNUNET_free (data); + return GNUNET_YES; /* We do not call controller callback for peer info */ + } + if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) && + (NULL != c->cc)) + { + event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.details.operation_finished.operation = opc->op; + event.details.operation_finished.op_cls = opc->op_cls; + event.details.operation_finished.emsg = emsg; + event.details.operation_finished.generic = NULL; + c->cc (c->cc_cls, &event); + if (event.details.operation_finished.operation == last_finished_operation) + return GNUNET_YES; + } + switch (opc->type) + { + case OP_PEER_CREATE: + { + struct PeerCreateData *data; + + data = opc->data; + GNUNET_free (data->peer); + if (NULL != data->cb) + data->cb (data->cls, NULL, emsg); + GNUNET_free (data); + } + break; + case OP_PEER_START: + case OP_PEER_STOP: + { + struct PeerEventData *data; + + data = opc->data; + if (NULL != data->pcc) + data->pcc (data->pcc_cls, emsg); + GNUNET_free (data); + } + break; + case OP_PEER_DESTROY: + break; + case OP_PEER_INFO: + GNUNET_assert (0); + case OP_OVERLAY_CONNECT: + { + struct OverlayConnectData *data; + + data = opc->data; + data->failed = GNUNET_YES; + if (NULL != data->cb) + data->cb (data->cb_cls, opc->op, emsg); + } + break; + case OP_FORWARDED: + GNUNET_assert (0); + case OP_LINK_CONTROLLERS: /* No secondary callback */ + break; + default: + GNUNET_break (0); + } + return GNUNET_YES; +} + + +/** + * Function to build GET_SLAVE_CONFIG message + * + * @param op_id the id this message should contain in its operation id field + * @param slave_id the id this message should contain in its slave id field + * @return newly allocated SlaveGetConfigurationMessage + */ +static struct GNUNET_TESTBED_SlaveGetConfigurationMessage * +GNUNET_TESTBED_generate_slavegetconfig_msg_ (uint64_t op_id, uint32_t slave_id) +{ + struct GNUNET_TESTBED_SlaveGetConfigurationMessage *msg; + uint16_t msize; + + msize = sizeof (struct GNUNET_TESTBED_SlaveGetConfigurationMessage); + msg = GNUNET_malloc (msize); + msg->header.size = htons (msize); + msg->header.type = + htons (GNUNET_MESSAGE_TYPE_TESTBED_GET_SLAVE_CONFIGURATION); + msg->operation_id = GNUNET_htonll (op_id); + msg->slave_id = htonl (slave_id); + return msg; +} + + +/** + * Handler for GNUNET_MESSAGE_TYPE_TESTBED_SLAVECONFIG message from controller + * (testbed service) + * + * @param c the controller handler + * @param msg message received + * @return GNUNET_YES if we can continue receiving from service; GNUNET_NO if + * not + */ +static int +handle_slave_config (struct GNUNET_TESTBED_Controller *c, + const struct GNUNET_TESTBED_SlaveConfiguration *msg) +{ + struct OperationContext *opc; + uint64_t op_id; + struct GNUNET_TESTBED_EventInformation event; + + op_id = GNUNET_ntohll (msg->operation_id); + if (NULL == (opc = find_opc (c, op_id))) + { + LOG_DEBUG ("Operation not found\n"); + return GNUNET_YES; + } + if (OP_GET_SLAVE_CONFIG != opc->type) + { + GNUNET_break (0); + return GNUNET_YES; + } + GNUNET_free (opc->data); + opc->data = NULL; + opc->state = OPC_STATE_FINISHED; + GNUNET_CONTAINER_DLL_remove (opc->c->ocq_head, opc->c->ocq_tail, opc); + if ((0 != (GNUNET_TESTBED_ET_OPERATION_FINISHED & c->event_mask)) && + (NULL != c->cc)) + { + opc->data = GNUNET_TESTBED_extract_config_ (&msg->header); + event.type = GNUNET_TESTBED_ET_OPERATION_FINISHED; + event.details.operation_finished.generic = opc->data; + event.details.operation_finished.operation = opc->op; + event.details.operation_finished.op_cls = opc->op_cls; + event.details.operation_finished.emsg = NULL; + c->cc (c->cc_cls, &event); + } + return GNUNET_YES; +} + + +/** + * Handler for messages from controller (testbed service) + * + * @param cls the controller handler + * @param msg message received, NULL on timeout or fatal error + */ +static void +message_handler (void *cls, const struct GNUNET_MessageHeader *msg) +{ + struct GNUNET_TESTBED_Controller *c = cls; + int status; + uint16_t msize; + + c->in_receive = GNUNET_NO; + /* FIXME: Add checks for message integrity */ + if (NULL == msg) + { + LOG_DEBUG ("Receive timed out or connection to service dropped\n"); + return; + } + status = GNUNET_OK; + msize = ntohs (msg->size); + switch (ntohs (msg->type)) + { + case GNUNET_MESSAGE_TYPE_TESTBED_ADD_HOST_SUCCESS: + GNUNET_assert (msize >= + sizeof (struct GNUNET_TESTBED_HostConfirmedMessage)); + status = + handle_addhostconfirm (c, + (const struct GNUNET_TESTBED_HostConfirmedMessage + *) msg); + break; + case GNUNET_MESSAGE_TYPE_TESTBED_GENERIC_OPERATION_SUCCESS: + GNUNET_assert (msize == + sizeof (struct + GNUNET_TESTBED_GenericOperationSuccessEventMessage)); + status = + handle_opsuccess (c, + (const struct + GNUNET_TESTBED_GenericOperationSuccessEventMessage *) + msg); + break; + case GNUNET_MESSAGE_TYPE_TESTBED_CREATE_PEER_SUCCESS: + GNUNET_assert (msize == + sizeof (struct + GNUNET_TESTBED_PeerCreateSuccessEventMessage)); + status = + handle_peer_create_success (c, + (const struct + GNUNET_TESTBED_PeerCreateSuccessEventMessage + *) msg); + break; + case GNUNET_MESSAGE_TYPE_TESTBED_PEER_EVENT: + GNUNET_assert (msize == sizeof (struct GNUNET_TESTBED_PeerEventMessage)); + status = + handle_peer_event (c, + (const struct GNUNET_TESTBED_PeerEventMessage *) + msg); + + break; + case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONFIGURATION: + GNUNET_assert (msize >= + sizeof (struct + GNUNET_TESTBED_PeerConfigurationInformationMessage)); + status = + handle_peer_config (c, + (const struct + GNUNET_TESTBED_PeerConfigurationInformationMessage + *) msg); + break; + case GNUNET_MESSAGE_TYPE_TESTBED_PEER_CONNECT_EVENT: + GNUNET_assert (msize == + sizeof (struct GNUNET_TESTBED_ConnectionEventMessage)); + status = + handle_peer_conevent (c, + (const struct + GNUNET_TESTBED_ConnectionEventMessage *) msg); + break; + case GNUNET_MESSAGE_TYPE_TESTBED_OPERATION_FAIL_EVENT: + GNUNET_assert (msize >= + sizeof (struct GNUNET_TESTBED_OperationFailureEventMessage)); + status = + handle_op_fail_event (c, + (const struct + GNUNET_TESTBED_OperationFailureEventMessage *) + msg); + break; + case GNUNET_MESSAGE_TYPE_TESTBED_SLAVE_CONFIGURATION: + GNUNET_assert (msize > sizeof (struct GNUNET_TESTBED_SlaveConfiguration)); + status = + handle_slave_config (c, + (const struct GNUNET_TESTBED_SlaveConfiguration *) + msg); + break; + default: + GNUNET_assert (0); + } + if ((GNUNET_OK == status) && (GNUNET_NO == c->in_receive)) + { + c->in_receive = GNUNET_YES; + GNUNET_CLIENT_receive (c->client, &message_handler, c, + GNUNET_TIME_UNIT_FOREVER_REL); + } +} + + +/** + * Function called to notify a client about the connection begin ready to queue + * more data. "buf" will be NULL and "size" zero if the connection was closed + * for writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +transmit_ready_notify (void *cls, size_t size, void *buf) +{ + struct GNUNET_TESTBED_Controller *c = cls; + struct MessageQueue *mq_entry; + + c->th = NULL; + mq_entry = c->mq_head; + GNUNET_assert (NULL != mq_entry); + if ((0 == size) && (NULL == buf)) /* Timeout */ + { + LOG_DEBUG ("Message sending timed out -- retrying\n"); + c->th = + GNUNET_CLIENT_notify_transmit_ready (c->client, + ntohs (mq_entry->msg->size), + TIMEOUT_REL, GNUNET_YES, + &transmit_ready_notify, c); + return 0; + } + GNUNET_assert (ntohs (mq_entry->msg->size) <= size); + size = ntohs (mq_entry->msg->size); + memcpy (buf, mq_entry->msg, size); + LOG_DEBUG ("Message of type: %u and size: %u sent\n", + ntohs (mq_entry->msg->type), size); + GNUNET_free (mq_entry->msg); + GNUNET_CONTAINER_DLL_remove (c->mq_head, c->mq_tail, mq_entry); + GNUNET_free (mq_entry); + mq_entry = c->mq_head; + if (NULL != mq_entry) + c->th = + GNUNET_CLIENT_notify_transmit_ready (c->client, + ntohs (mq_entry->msg->size), + TIMEOUT_REL, GNUNET_YES, + &transmit_ready_notify, c); + if (GNUNET_NO == c->in_receive) + { + c->in_receive = GNUNET_YES; + GNUNET_CLIENT_receive (c->client, &message_handler, c, + GNUNET_TIME_UNIT_FOREVER_REL); + } + return size; +} + + +/** + * Queues a message in send queue for sending to the service + * + * @param controller the handle to the controller + * @param msg the message to queue + */ +void +GNUNET_TESTBED_queue_message_ (struct GNUNET_TESTBED_Controller *controller, + struct GNUNET_MessageHeader *msg) +{ + struct MessageQueue *mq_entry; + uint16_t type; + uint16_t size; + + type = ntohs (msg->type); + size = ntohs (msg->size); + GNUNET_assert ((GNUNET_MESSAGE_TYPE_TESTBED_INIT <= type) && + (GNUNET_MESSAGE_TYPE_TESTBED_MAX > type)); + mq_entry = GNUNET_malloc (sizeof (struct MessageQueue)); + mq_entry->msg = msg; + LOG (GNUNET_ERROR_TYPE_DEBUG, |