diff options
author | Julius Bünger <buenger@mytum.de> | 2015-08-02 14:48:28 +0000 |
---|---|---|
committer | Julius Bünger <buenger@mytum.de> | 2015-08-02 14:48:28 +0000 |
commit | bae0066688e7571b4abdebfb914dba6df0578a6b (patch) | |
tree | 4ee2b2171170d169ff8cd55cfe5ad9adf640d5d2 | |
parent | 847e9575ed85eadb979bc416afec7cf898cf00d1 (diff) |
cancellation of request and according test improvements
-rw-r--r-- | src/include/gnunet_protocols.h | 19 | ||||
-rw-r--r-- | src/include/gnunet_rps_service.h | 7 | ||||
-rw-r--r-- | src/rps/Makefile.am | 6 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps.c | 206 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.c | 170 | ||||
-rw-r--r-- | src/rps/gnunet-service-rps_sampler.h | 15 | ||||
-rw-r--r-- | src/rps/rps.h | 16 | ||||
-rw-r--r-- | src/rps/rps_api.c | 25 | ||||
-rw-r--r-- | src/rps/test_rps.c | 606 |
9 files changed, 733 insertions, 337 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index dca1eb5e92..de585c9af8 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2706,17 +2706,17 @@ extern "C" /** * RPS PUSH message to push own ID to another peer */ -#define GNUNET_MESSAGE_TYPE_RPS_PP_PUSH 950 +#define GNUNET_MESSAGE_TYPE_RPS_PP_PUSH 950 /** * RPS PULL REQUEST message to request the local view of another peer */ -#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST 951 +#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REQUEST 951 /** * RPS PULL REPLY message which contains the view of the other peer */ -#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY 952 +#define GNUNET_MESSAGE_TYPE_RPS_PP_PULL_REPLY 952 @@ -2725,23 +2725,28 @@ extern "C" /** * RPS CS REQUEST Message for the Client to request (a) random peer(s) */ -#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST 953 +#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST 953 /** * RPS CS REPLY Message for the Server to send (a) random peer(s) */ -#define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY 954 +#define GNUNET_MESSAGE_TYPE_RPS_CS_REPLY 954 + +/** + * RPS CS REQUEST CANCEL Message for the Client to cancel a request + */ +#define GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL 955 /** * RPS CS SEED Message for the Client to seed peers into rps */ -#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 955 +#define GNUNET_MESSAGE_TYPE_RPS_CS_SEED 956 #ifdef ENABLE_MALICIOUS /** * Turn RPS service malicious */ -#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 956 +#define GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS 957 #endif /* ENABLE_MALICIOUS */ diff --git a/src/include/gnunet_rps_service.h b/src/include/gnunet_rps_service.h index e99072cbfa..47b153b14e 100644 --- a/src/include/gnunet_rps_service.h +++ b/src/include/gnunet_rps_service.h @@ -56,7 +56,9 @@ struct GNUNET_RPS_Request_Handle; * @param num_peers the number of peers returned * @param peers array with num_peers PeerIDs */ -typedef void (* GNUNET_RPS_NotifyReadyCB) (void *cls, uint64_t num_peers, const struct GNUNET_PeerIdentity *peers); +typedef void (* GNUNET_RPS_NotifyReadyCB) (void *cls, + uint64_t num_peers, + const struct GNUNET_PeerIdentity *peers); /** * Connect to the rps service @@ -125,7 +127,8 @@ GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh); GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h, uint32_t type, uint32_t num_peers, - const struct GNUNET_PeerIdentity *ids); + const struct GNUNET_PeerIdentity *ids, + const struct GNUNET_PeerIdentity *target_peer); #endif /* ENABLE_MALICIOUS */ diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am index b1ebffee43..c07434a0fa 100644 --- a/src/rps/Makefile.am +++ b/src/rps/Makefile.am @@ -72,7 +72,8 @@ check_PROGRAMS = \ test_rps_malicious_2 \ test_rps_malicious_3 \ test_rps_seed_request \ - test_rps_single_req + test_rps_single_req \ + test_rps_req_cancel endif ld_rps_test_lib = \ @@ -106,6 +107,9 @@ test_rps_single_req_LDADD = $(ld_rps_test_lib) test_rps_seed_request_SOURCES = $(rps_test_src) test_rps_seed_request_LDADD = $(ld_rps_test_lib) +test_rps_req_cancel_SOURCES = $(rps_test_src) +test_rps_req_cancel_LDADD = $(ld_rps_test_lib) + gnunet_rps_profiler_SOURCES = $(rps_test_src) gnunet_rps_profiler_LDADD = $(ld_rps_test_lib) diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c index 3c98d79da6..8c1a1dc127 100644 --- a/src/rps/gnunet-service-rps.c +++ b/src/rps/gnunet-service-rps.c @@ -75,22 +75,73 @@ get_rand_peer_ignore_list (const struct GNUNET_PeerIdentity *peer_list, unsigned ***********************************************************************/ /** + * Closure used to pass the client and the id to the callback + * that replies to a client's request + */ +struct ReplyCls +{ + /** + * DLL + */ + struct ReplyCls *next; + struct ReplyCls *prev; + + /** + * The identifier of the request + */ + uint32_t id; + + /** + * The handle to the request + */ + struct RPS_SamplerRequestHandle *req_handle; + + /** + * The client handle to send the reply to + */ + struct GNUNET_SERVER_Client *client; +}; + + +/** * Struct used to store the context of a connected client. */ struct ClientContext { /** + * DLL + */ + struct ClientContext *next; + struct ClientContext *prev; + + /** * The message queue to communicate with the client. */ struct GNUNET_MQ_Handle *mq; + + /** + * DLL with handles to single requests from the client + */ + struct ReplyCls *rep_cls_head; + struct ReplyCls *rep_cls_tail; }; /** + * DLL with all clients currently connected to us + */ +struct ClientContext *cli_ctx_head; +struct ClientContext *cli_ctx_tail; + +/** * Used to keep track in what lists single peerIDs are. */ enum PeerFlags { + /** + * If we are waiting for a reply from that peer (sent a pull request). + */ PULL_REPLY_PENDING = 0x01, + IN_OTHER_GOSSIP_LIST = 0x02, // unneeded? IN_OWN_SAMPLER_LIST = 0x04, // unneeded? IN_OWN_GOSSIP_LIST = 0x08, // unneeded? @@ -365,24 +416,6 @@ static struct GNUNET_TIME_Relative request_rate; uint32_t num_hist_update_tasks; -/** - * Closure used to pass the client and the id to the callback - * that replies to a client's request - */ -struct ReplyCls -{ - /** - * The identifier of the request - */ - uint32_t id; - - /** - * The client handle to send the reply to - */ - struct GNUNET_SERVER_Client *client; -}; - - #ifdef ENABLE_MALICIOUS /** * Type of malicious peer @@ -1234,8 +1267,36 @@ new_peer_id (const struct GNUNET_PeerIdentity *peer_id) * /Util functions ***********************************************************************/ +static void +destroy_reply_cls (struct ReplyCls *rep_cls) +{ + struct ClientContext *cli_ctx; + cli_ctx = GNUNET_SERVER_client_get_user_context (rep_cls->client, + struct ClientContext); + GNUNET_assert (NULL != cli_ctx); + GNUNET_CONTAINER_DLL_remove (cli_ctx->rep_cls_head, + cli_ctx->rep_cls_tail, + rep_cls); + GNUNET_free (rep_cls); +} +static void +destroy_cli_ctx (struct ClientContext *cli_ctx) +{ + GNUNET_assert (NULL != cli_ctx); + if (NULL != cli_ctx->rep_cls_head) + { + LOG (GNUNET_ERROR_TYPE_WARNING, + "Trying to destroy the context of a client that still has pending requests. Going to clean those\n"); + while (NULL != cli_ctx->rep_cls_head) + destroy_reply_cls (cli_ctx->rep_cls_head); + } + GNUNET_CONTAINER_DLL_remove (cli_ctx_head, + cli_ctx_tail, + cli_ctx); + GNUNET_free (cli_ctx); +} /** @@ -1316,15 +1377,10 @@ void client_respond (void *cls, num_peers * sizeof (struct GNUNET_PeerIdentity)); GNUNET_free (peer_ids); - cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client, struct ClientContext); - if (NULL == cli_ctx) { - cli_ctx = GNUNET_new (struct ClientContext); - cli_ctx->mq = GNUNET_MQ_queue_for_server_client (reply_cls->client); - GNUNET_SERVER_client_set_user_context (reply_cls->client, cli_ctx); - } - - GNUNET_free (reply_cls); - + cli_ctx = GNUNET_SERVER_client_get_user_context (reply_cls->client, + struct ClientContext); + GNUNET_assert (NULL != cli_ctx); + destroy_reply_cls (reply_cls); GNUNET_MQ_send (cli_ctx->mq, ev); } @@ -1346,6 +1402,7 @@ handle_client_request (void *cls, uint32_t size_needed; struct ReplyCls *reply_cls; uint32_t i; + struct ClientContext *cli_ctx; msg = (struct GNUNET_RPS_CS_RequestMessage *) message; @@ -1371,12 +1428,50 @@ handle_client_request (void *cls, reply_cls = GNUNET_new (struct ReplyCls); reply_cls->id = ntohl (msg->id); reply_cls->client = client; + reply_cls->req_handle = RPS_sampler_get_n_rand_peers (client_sampler, + client_respond, + reply_cls, + num_peers); + + cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext); + GNUNET_assert (NULL != cli_ctx); + GNUNET_CONTAINER_DLL_insert (cli_ctx->rep_cls_head, + cli_ctx->rep_cls_tail, + reply_cls); + GNUNET_SERVER_receive_done (client, + GNUNET_OK); +} - RPS_sampler_get_n_rand_peers (client_sampler, - client_respond, - reply_cls, - num_peers); +/** + * @brief Handle a message that requests the cancellation of a request + * + * @param cls unused + * @param client the client that requests the cancellation + * @param message the message containing the id of the request + */ +static void +handle_client_request_cancel (void *cls, + struct GNUNET_SERVER_Client *client, + const struct GNUNET_MessageHeader *message) +{ + struct GNUNET_RPS_CS_RequestCancelMessage *msg = + (struct GNUNET_RPS_CS_RequestCancelMessage *) message; + struct ClientContext *cli_ctx; + struct ReplyCls *rep_cls; + + cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext); + GNUNET_assert (NULL != cli_ctx->rep_cls_head); + rep_cls = cli_ctx->rep_cls_head; + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client cancels request with id %lu\n", + ntohl (msg->id)); + while ( (NULL != rep_cls->next) && + (rep_cls->id != ntohl (msg->id)) ) + rep_cls = rep_cls->next; + GNUNET_assert (rep_cls->id == ntohl (msg->id)); + RPS_sampler_request_cancel (rep_cls->req_handle); + destroy_reply_cls (rep_cls); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -2584,6 +2679,30 @@ shutdown_task (void *cls, /** + * @brief Get informed about a connecting client. + * + * @param cls unused + * @param client the client that connects + */ +static void +handle_client_connect (void *cls, + struct GNUNET_SERVER_Client *client) +{ + struct ClientContext *cli_ctx; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Client connected\n"); + if (NULL == client) + return; /* Server was destroyed before a client connected. Shutting down */ + cli_ctx = GNUNET_new (struct ClientContext); + cli_ctx->mq = GNUNET_MQ_queue_for_server_client (client); + GNUNET_SERVER_client_set_user_context (client, cli_ctx); + GNUNET_CONTAINER_DLL_insert (cli_ctx_head, + cli_ctx_tail, + cli_ctx); +} + +/** * A client disconnected. Remove all of its data structure entries. * * @param cls closure, NULL @@ -2591,8 +2710,20 @@ shutdown_task (void *cls, */ static void handle_client_disconnect (void *cls, - struct GNUNET_SERVER_Client * client) + struct GNUNET_SERVER_Client *client) { + struct ClientContext *cli_ctx; + + if (NULL == client) + {/* shutdown task */ + while (NULL != cli_ctx_head) + destroy_cli_ctx (cli_ctx_head); + } + else + { + cli_ctx = GNUNET_SERVER_client_get_user_context (client, struct ClientContext); + destroy_cli_ctx (cli_ctx); + } } @@ -2716,16 +2847,21 @@ cleanup_channel (void *cls, rps_start (struct GNUNET_SERVER_Handle *server) { static const struct GNUNET_SERVER_MessageHandler handlers[] = { - {&handle_client_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, + {&handle_client_request, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST, sizeof (struct GNUNET_RPS_CS_RequestMessage)}, - {&handle_client_seed, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0}, + {&handle_client_request_cancel, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL, + sizeof (struct GNUNET_RPS_CS_RequestCancelMessage)}, + {&handle_client_seed, NULL, GNUNET_MESSAGE_TYPE_RPS_CS_SEED, 0}, #ifdef ENABLE_MALICIOUS - {&handle_client_act_malicious, NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0}, + {&handle_client_act_malicious, NULL, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS , 0}, #endif /* ENABLE_MALICIOUS */ {NULL, NULL, 0, 0} }; GNUNET_SERVER_add_handlers (server, handlers); + GNUNET_SERVER_connect_notify (server, + &handle_client_connect, + NULL); GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL); diff --git a/src/rps/gnunet-service-rps_sampler.c b/src/rps/gnunet-service-rps_sampler.c index 6857c80ba8..b65dd7c47f 100644 --- a/src/rps/gnunet-service-rps_sampler.c +++ b/src/rps/gnunet-service-rps_sampler.c @@ -73,16 +73,12 @@ struct GetPeerCls * DLL */ struct GetPeerCls *next; - - /** - * DLL - */ struct GetPeerCls *prev; /** - * The sampler this function operates on. + * The #RPS_SamplerRequestHandle this single request belongs to. */ - struct RPS_Sampler *sampler; + struct RPS_SamplerRequestHandle *req_handle; /** * The task for this function. @@ -166,14 +162,10 @@ struct RPS_Sampler RPS_get_peers_type get_peers; /** - * Head for the DLL to store the closures to pending requests. + * Head and tail for the DLL to store the #RPS_SamplerRequestHandle */ - struct GetPeerCls *gpc_head; - - /** - * Tail for the DLL to store the closures to pending requests. - */ - struct GetPeerCls *gpc_tail; + struct RPS_SamplerRequestHandle *req_handle_head; + struct RPS_SamplerRequestHandle *req_handle_tail; #ifdef TO_FILE /** @@ -186,9 +178,15 @@ struct RPS_Sampler /** * Closure to _get_n_rand_peers_ready_cb() */ -struct NRandPeersReadyCls +struct RPS_SamplerRequestHandle { /** + * DLL + */ + struct RPS_SamplerRequestHandle *next; + struct RPS_SamplerRequestHandle *prev; + + /** * Number of peers we are waiting for. */ uint32_t num_peers; @@ -204,6 +202,17 @@ struct NRandPeersReadyCls struct GNUNET_PeerIdentity *ids; /** + * Head and tail for the DLL to store the tasks for single requests + */ + struct GetPeerCls *gpc_head; + struct GetPeerCls *gpc_tail; + + /** + * Sampler. + */ + struct RPS_Sampler *sampler; + + /** * Callback to be called when all ids are available. */ RPS_sampler_n_rand_peers_ready_cb callback; @@ -251,23 +260,23 @@ static void check_n_peers_ready (void *cls, const struct GNUNET_PeerIdentity *id) { - struct NRandPeersReadyCls *n_peers_cls = cls; + struct RPS_SamplerRequestHandle *req_handle = cls; - n_peers_cls->cur_num_peers++; + req_handle->cur_num_peers++; LOG (GNUNET_ERROR_TYPE_DEBUG, "Got %" PRIX32 ". of %" PRIX32 " peers\n", - n_peers_cls->cur_num_peers, n_peers_cls->num_peers); + req_handle->cur_num_peers, req_handle->num_peers); - if (n_peers_cls->num_peers == n_peers_cls->cur_num_peers) + if (req_handle->num_peers == req_handle->cur_num_peers) { /* All peers are ready -- return those to the client */ - GNUNET_assert (NULL != n_peers_cls->callback); + GNUNET_assert (NULL != req_handle->callback); LOG (GNUNET_ERROR_TYPE_DEBUG, "returning %" PRIX32 " peers to the client\n", - n_peers_cls->num_peers); - n_peers_cls->callback (n_peers_cls->cls, n_peers_cls->ids, n_peers_cls->num_peers); + req_handle->num_peers); + req_handle->callback (req_handle->cls, req_handle->ids, req_handle->num_peers); - GNUNET_free (n_peers_cls); + RPS_sampler_request_cancel (req_handle); } } @@ -420,12 +429,8 @@ RPS_sampler_init (size_t init_size, sampler->file_name); #endif /* TO_FILE */ - sampler->sampler_size = 0; - sampler->sampler_elements = NULL; sampler->max_round_interval = max_round_interval; sampler->get_peers = sampler_get_rand_peer; - sampler->gpc_head = NULL; - sampler->gpc_tail = NULL; //sampler->sampler_elements = GNUNET_new_array(init_size, struct GNUNET_PeerIdentity); //GNUNET_array_grow (sampler->sampler_elements, sampler->sampler_size, min_size); RPS_sampler_resize (sampler, init_size); @@ -530,19 +535,21 @@ sampler_get_rand_peer (void *cls, { struct GetPeerCls *gpc = cls; uint32_t r_index; + struct RPS_Sampler *sampler; gpc->get_peer_task = NULL; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + sampler = gpc->req_handle->sampler; /**; * Choose the r_index of the peer we want to return * at random from the interval of the gossip list */ r_index = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_STRONG, - gpc->sampler->sampler_size); + sampler->sampler_size); - if (EMPTY == gpc->sampler->sampler_elements[r_index]->is_empty) + if (EMPTY == sampler->sampler_elements[r_index]->is_empty) { //LOG (GNUNET_ERROR_TYPE_DEBUG, // "Not returning randomly selected, empty PeerID. - Rescheduling.\n"); @@ -552,20 +559,18 @@ sampler_get_rand_peer (void *cls, * Counter? */ gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply ( - GNUNET_TIME_UNIT_SECONDS, 0.1), - &sampler_get_rand_peer, - cls); + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), + &sampler_get_rand_peer, + cls); return; } - *gpc->id = gpc->sampler->sampler_elements[r_index]->peer_id; - - gpc->cont (gpc->cont_cls, gpc->id); - - GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head, - gpc->sampler->gpc_tail, + GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, + gpc->req_handle->gpc_tail, gpc); + *gpc->id = sampler->sampler_elements[r_index]->peer_id; + gpc->cont (gpc->cont_cls, gpc->id); GNUNET_free (gpc); } @@ -584,17 +589,19 @@ sampler_mod_get_rand_peer (void *cls, struct GetPeerCls *gpc = cls; struct RPS_SamplerElement *s_elem; struct GNUNET_TIME_Relative last_request_diff; + struct RPS_Sampler *sampler; gpc->get_peer_task = NULL; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; + sampler = gpc->req_handle->sampler; LOG (GNUNET_ERROR_TYPE_DEBUG, "Single peer was requested\n"); /* Cycle the #client_get_index one step further */ - client_get_index = (client_get_index + 1) % gpc->sampler->sampler_size; + client_get_index = (client_get_index + 1) % sampler->sampler_size; - s_elem = gpc->sampler->sampler_elements[client_get_index]; + s_elem = sampler->sampler_elements[client_get_index]; *gpc->id = s_elem->peer_id; GNUNET_assert (NULL != s_elem); @@ -603,7 +610,7 @@ sampler_mod_get_rand_peer (void *cls, LOG (GNUNET_ERROR_TYPE_DEBUG, "Sampler_mod element empty, rescheduling.\n"); GNUNET_assert (NULL == gpc->get_peer_task); gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, + GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, &sampler_mod_get_rand_peer, cls); return; @@ -617,7 +624,7 @@ sampler_mod_get_rand_peer (void *cls, GNUNET_TIME_absolute_get ()); /* We're not going to give it back now if it was * already requested by a client this round */ - if (last_request_diff.rel_value_us < gpc->sampler->max_round_interval.rel_value_us) + if (last_request_diff.rel_value_us < sampler->max_round_interval.rel_value_us) { LOG (GNUNET_ERROR_TYPE_DEBUG, "Last client request on this sampler was less than max round interval ago -- scheduling for later\n"); @@ -629,7 +636,7 @@ sampler_mod_get_rand_peer (void *cls, /* Schedule it one round later */ GNUNET_assert (NULL == gpc->get_peer_task); gpc->get_peer_task = - GNUNET_SCHEDULER_add_delayed (gpc->sampler->max_round_interval, + GNUNET_SCHEDULER_add_delayed (sampler->max_round_interval, &sampler_mod_get_rand_peer, cls); return; @@ -639,8 +646,8 @@ sampler_mod_get_rand_peer (void *cls, s_elem->last_client_request = GNUNET_TIME_absolute_get (); - GNUNET_CONTAINER_DLL_remove (gpc->sampler->gpc_head, - gpc->sampler->gpc_tail, + GNUNET_CONTAINER_DLL_remove (gpc->req_handle->gpc_head, + gpc->req_handle->gpc_tail, gpc); gpc->cont (gpc->cont_cls, gpc->id); GNUNET_free (gpc); @@ -661,26 +668,30 @@ sampler_mod_get_rand_peer (void *cls, * #GNUNET_NO if used internally * @param num_peers the number of peers requested */ - void +struct RPS_SamplerRequestHandle * RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, RPS_sampler_n_rand_peers_ready_cb cb, void *cls, uint32_t num_peers) { GNUNET_assert (0 != sampler->sampler_size); if (0 == num_peers) - return; + return NULL; // TODO check if we have too much (distinct) sampled peers uint32_t i; - struct NRandPeersReadyCls *cb_cls; + struct RPS_SamplerRequestHandle *req_handle; struct GetPeerCls *gpc; - cb_cls = GNUNET_new (struct NRandPeersReadyCls); - cb_cls->num_peers = num_peers; - cb_cls->cur_num_peers = 0; - cb_cls->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); - cb_cls->callback = cb; - cb_cls->cls = cls; + req_handle = GNUNET_new (struct RPS_SamplerRequestHandle); + req_handle->num_peers = num_peers; + req_handle->cur_num_peers = 0; + req_handle->ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); + req_handle->sampler = sampler; + req_handle->callback = cb; + req_handle->cls = cls; + GNUNET_CONTAINER_DLL_insert (sampler->req_handle_head, + sampler->req_handle_tail, + req_handle); LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduling requests for %" PRIu32 " peers\n", num_peers); @@ -688,18 +699,43 @@ RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, for (i = 0 ; i < num_peers ; i++) { gpc = GNUNET_new (struct GetPeerCls); - gpc->sampler = sampler; + gpc->req_handle = req_handle; gpc->cont = check_n_peers_ready; - gpc->cont_cls = cb_cls; - gpc->id = &cb_cls->ids[i]; + gpc->cont_cls = req_handle; + gpc->id = &req_handle->ids[i]; + GNUNET_CONTAINER_DLL_insert (req_handle->gpc_head, + req_handle->gpc_tail, + gpc); // maybe add a little delay gpc->get_peer_task = GNUNET_SCHEDULER_add_now (sampler->get_peers, gpc); + } + return req_handle; +} - GNUNET_CONTAINER_DLL_insert (sampler->gpc_head, - sampler->gpc_tail, - gpc); +/** + * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle) +{ + struct GetPeerCls *i; + + while (NULL != (i = req_handle->gpc_head) ) + { + GNUNET_CONTAINER_DLL_remove (req_handle->gpc_head, + req_handle->gpc_tail, + i); + if (NULL != i->get_peer_task) + GNUNET_SCHEDULER_cancel (i->get_peer_task); + GNUNET_free (i); } + GNUNET_CONTAINER_DLL_remove (req_handle->sampler->req_handle_head, + req_handle->sampler->req_handle_tail, + req_handle); + GNUNET_free (req_handle); } @@ -735,17 +771,13 @@ RPS_sampler_count_id (struct RPS_Sampler *sampler, void RPS_sampler_destroy (struct RPS_Sampler *sampler) { - struct GetPeerCls *i; - - for (i = sampler->gpc_head; NULL != i; i = sampler->gpc_head) + if (NULL != sampler->req_handle_head) { - GNUNET_CONTAINER_DLL_remove (sampler->gpc_head, - sampler->gpc_tail, - i); - GNUNET_SCHEDULER_cancel (i->get_peer_task); - GNUNET_free (i); + LOG (GNUNET_ERROR_TYPE_WARNING, + "There are still pending requests. Going to remove them.\n"); + while (NULL != sampler->req_handle_head) + RPS_sampler_request_cancel (sampler->req_handle_head); } - sampler_empty (sampler); GNUNET_free (sampler); } diff --git a/src/rps/gnunet-service-rps_sampler.h b/src/rps/gnunet-service-rps_sampler.h index 83705b0137..f33e7430c3 100644 --- a/src/rps/gnunet-service-rps_sampler.h +++ b/src/rps/gnunet-service-rps_sampler.h @@ -34,6 +34,11 @@ */ struct RPS_Sampler; +/** + * A handle to cancel a request. + */ +struct RPS_SamplerRequestHandle; + /** * Callback that is called from _get_n_rand_peers() when the PeerIDs are ready. @@ -130,11 +135,19 @@ RPS_sampler_reinitialise_by_value (struct RPS_Sampler *sampler, * #GNUNET_NO if used internally * @param num_peers the number of peers requested */ - void +struct RPS_SamplerRequestHandle * RPS_sampler_get_n_rand_peers (struct RPS_Sampler *sampler, RPS_sampler_n_rand_peers_ready_cb cb, void *cls, uint32_t num_peers); +/** + * Cancle a request issued through #RPS_sampler_n_rand_peers_ready_cb. + * + * @param req_handle the handle to the request + */ +void +RPS_sampler_request_cancel (struct RPS_SamplerRequestHandle *req_handle); + /** * Counts how many Samplers currently hold a given PeerID. diff --git a/src/rps/rps.h b/src/rps/rps.h index 9a16e7593f..44b93e3966 100644 --- a/src/rps/rps.h +++ b/src/rps/rps.h @@ -106,6 +106,22 @@ struct GNUNET_RPS_CS_ReplyMessage }; /** + * Message from client to RPS service to cancel request. + */ +struct GNUNET_RPS_CS_RequestCancelMessage +{ + /** + * Header including size and type in NBO + */ + struct GNUNET_MessageHeader header; + + /** + * Identifyer of the message. + */ + uint32_t id GNUNET_PACKED; +}; + +/** * Message from client to service with seed of peers. */ struct GNUNET_RPS_CS_SeedMessage diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c index 854ea25cf2..42cec97613 100644 --- a/src/rps/rps_api.c +++ b/src/rps/rps_api.c @@ -339,7 +339,8 @@ GNUNET_RPS_seed_ids (struct GNUNET_RPS_Handle *h, GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h, uint32_t type, uint32_t num_peers, - const struct GNUNET_PeerIdentity *peer_ids) + const struct GNUNET_PeerIdentity *peer_ids, + const struct GNUNET_PeerIdentity *target_peer) { size_t size_needed; uint32_t num_peers_max; @@ -379,8 +380,8 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); msg->type = htonl (type); msg->num_peers = htonl (num_peers_max); - if (2 == type - || 3 == type) + if ( (2 == type) || + (3 == type) ) msg->attacked_peer = peer_ids[num_peers]; memcpy (&msg[1], tmp_peer_pointer, @@ -400,9 +401,9 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h, GNUNET_MESSAGE_TYPE_RPS_ACT_MALICIOUS); msg->type = htonl (type); msg->num_peers = htonl (num_peers); - if (2 == type - || 3 == type) - msg->attacked_peer = peer_ids[num_peers]; + if ( (2 == type) || + (3 == type) ) + msg->attacked_peer = *target_peer; memcpy (&msg[1], tmp_peer_pointer, num_peers * sizeof (struct GNUNET_PeerIdentity)); GNUNET_MQ_send (h->mq, ev); @@ -418,7 +419,17 @@ GNUNET_RPS_act_malicious (struct GNUNET_RPS_Handle *h, void GNUNET_RPS_request_cancel (struct GNUNET_RPS_Request_Handle *rh) { - // TODO + struct GNUNET_MQ_Envelope *ev; + struct GNUNET_RPS_CS_RequestCancelMessage*msg; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling request with id %" PRIu32 "\n", + rh->id); + + GNUNET_array_append (req_handlers, req_handlers_size, *rh); + ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_REQUEST_CANCEL); + msg->id = htonl (rh->id); + GNUNET_MQ_send (rh->rps_handle->mq, ev); } diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c index 01777bd907..4a4a9ee1ae 100644 --- a/src/rps/test_rps.c +++ b/src/rps/test_rps.c @@ -46,6 +46,7 @@ uint32_t num_peers; //#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) static struct GNUNET_TIME_Relative timeout; + /** * Portion of malicious peers */ @@ -106,6 +107,52 @@ static struct OpListEntry *oplist_tail; /** + * A pending reply: A request was sent and the reply is pending. + */ +struct PendingReply +{ + /** + * DLL next,prev ptr + */ + struct PendingReply *next; + struct PendingReply *prev; + + /** + * Handle to the request we are waiting for + */ + struct GNUNET_RPS_Request_Handle *req_handle; + + /** + * The peer that requested + */ + struct RPSPeer *rps_peer; +}; + + +/** + * A pending request: A request was not made yet but is scheduled for later. + */ +struct PendingRequest +{ + /** + * DLL next,prev ptr + */ + struct PendingRequest *next; + struct PendingRequest *prev; + + /** + * Handle to the request we are waiting for + */ + struct GNUNET_SCHEDULER_Task *request_task; + + /** + * The peer that requested + */ + struct RPSPeer *rps_peer; +}; + + +/** * Information we track for each peer. */ struct RPSPeer @@ -141,6 +188,33 @@ struct RPSPeer int online; /** + * Number of Peer IDs to request + */ + unsigned int num_ids_to_request; + + /** + * Pending requests DLL + */ + struct PendingRequest *pending_req_head; + struct PendingRequest *pending_req_tail; + + /** + * Number of pending requests + */ + unsigned int num_pending_reqs; + + /** + * Pending replies DLL + */ + struct PendingReply *pending_rep_head; + struct PendingReply *pending_rep_tail; + + /** + * Number of pending replies + */ + unsigned int num_pending_reps; + + /** * Received PeerIDs */ struct GNUNET_PeerIdentity *rec_ids; @@ -168,6 +242,16 @@ static struct GNUNET_CONTAINER_MultiPeerMap *peer_map; static struct GNUNET_PeerIdentity *rps_peer_ids; /** + * ID of the targeted peer. + */ +static struct GNUNET_PeerIdentity *target_peer; + +/** + * ID of the peer that requests for the evaluation. + */ +static struct RPSPeer *eval_peer; + +/** * Number of online peers. */ static unsigned int num_peers_online; @@ -185,6 +269,11 @@ static struct GNUNET_SCHEDULER_Task *churn_task; /** + * Called to initialise the given RPSPeer + */ +typedef void (*InitPeer) (struct RPSPeer *rps_peer); + +/** * Called directly after connecting to the service */ typedef void (*PreTest) (void *cls, struct GNUNET_RPS_Handle *h); @@ -224,6 +313,11 @@ struct SingleTestRun char *name; /** + * Called to initialise peer + */ + InitPeer init_peer; + + /** * Called directly after connecting to the service */ PreTest pre_test; @@ -398,80 +492,6 @@ make_oplist_entry () /** - * Callback to be called when RPS service is started or stopped at peers - * - * @param cls NULL - * @param op the operation handle - * @param emsg NULL on success; otherwise an error description - */ -static void -churn_cb (void *cls, - struct GNUNET_TESTBED_Operation *op, - const char *emsg) -{ - // FIXME - struct OpListEntry *entry = cls; - - GNUNET_TESTBED_operation_done (entry->op); - if (NULL != emsg) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n"); - GNUNET_SCHEDULER_shutdown (); - return; - } - GNUNET_assert (0 != entry->delta); - - num_peers_online += entry->delta; - - if (0 > entry->delta) - { /* Peer hopefully just went offline */ - if (GNUNET_YES != rps_peers[entry->index].online) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "peer %s was expected to go offline but is still marked as online\n", - GNUNET_i2s (rps_peers[entry->index].peer_id)); - GNUNET_break (0); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "peer %s probably went offline as expected\n", - GNUNET_i2s (rps_peers[entry->index].peer_id)); - } - rps_peers[entry->index].online = GNUNET_NO; - } - - else if (0 < entry->delta) - { /* Peer hopefully just went online */ - if (GNUNET_NO != rps_peers[entry->index].online) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "peer %s was expected to go online but is still marked as offline\n", - GNUNET_i2s (rps_peers[entry->index].peer_id)); - GNUNET_break (0); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "peer %s probably went online as expected\n", - GNUNET_i2s (rps_peers[entry->index].peer_id)); - if (NULL != cur_test_run.pre_test) - { - cur_test_run.pre_test (&rps_peers[entry->index], - rps_peers[entry->index].rps_handle); - } - } - rps_peers[entry->index].online = GNUNET_YES; - } - - GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry); - GNUNET_free (entry); - //if (num_peers_in_round[current_round] == peers_running) - // run_round (); -} - - -/** * Task run on timeout to shut everything down. */ static void @@ -543,14 +563,15 @@ info_cb (void *cb_cls, &rps_peer_ids[entry->index], &rps_peers[entry->index], GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY); - tofile ("/tmp/rps/peer_ids", "%u\t%s\n", entry->index, GNUNET_i2s_full (&rps_peer_ids[entry->index])); - GNUNET_TESTBED_operation_done (entry->op); + if (NULL != cur_test_run.init_peer) + cur_test_run.init_peer (&rps_peers[entry->index]); + GNUNET_TESTBED_operation_done (entry->op); GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry); GNUNET_free (entry); } @@ -650,7 +671,15 @@ default_eval_cb (void) static int no_eval (void) { - return 1; + return 0; +} + +/** + * Initialise given RPSPeer + */ +static void default_init_peer (struct RPSPeer *rps_peer) +{ + rps_peer->num_ids_to_request = 1; } /** @@ -665,9 +694,15 @@ default_reply_handle (void *cls, uint64_t n, const struct GNUNET_PeerIdentity *recv_peers) { - struct RPSPeer *rps_peer = (struct RPSPeer *) cls; + struct RPSPeer *rps_peer; + struct PendingReply *pending_rep = (struct PendingReply *) cls; unsigned int i; + rps_peer = pending_rep->rps_peer; + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head, + rps_peer->pending_rep_tail, + pending_rep); + rps_peer->num_pending_reps--; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "[%s] got %" PRIu64 " peers:\n", GNUNET_i2s (rps_peer->peer_id), @@ -691,20 +726,119 @@ static void request_peers (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct RPSPeer *rps_peer = (struct RPSPeer *) cls; + struct RPSPeer *rps_peer; + struct PendingRequest *pending_req = (struct PendingRequest *) cls; + struct PendingReply *pending_rep; if (GNUNET_YES == in_shutdown) return; + rps_peer = pending_req->rps_peer; + GNUNET_assert (1 <= rps_peer->num_pending_reqs); + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head, + rps_peer->pending_req_tail, + pending_req); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Requesting one peer\n"); + pending_rep = GNUNET_new (struct PendingReply); + pending_rep->rps_peer = rps_peer; + pending_rep->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, + 1, + cur_test_run.reply_handle, + pending_rep); + GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_rep_head, + rps_peer->pending_rep_tail, + pending_rep); + rps_peer->num_pending_reps++; + rps_peer->num_pending_reqs--; +} - GNUNET_free (GNUNET_RPS_request_peers (rps_peer->rps_handle, - 1, - cur_test_run.reply_handle, - rps_peer)); - //rps_peer->req_handle = GNUNET_RPS_request_peers (rps_peer->rps_handle, 1, handle_reply, rps_peer); +static void +cancel_pending_req (struct PendingRequest *pending_req) +{ + struct RPSPeer *rps_peer; + + rps_peer = pending_req->rps_peer; + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_req_head, + rps_peer->pending_req_tail, + pending_req); + rps_peer->num_pending_reqs--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling pending request\n"); + GNUNET_SCHEDULER_cancel (pending_req->request_task); + GNUNET_free (pending_req); } +static void +cancel_request (struct PendingReply *pending_rep) +{ + struct RPSPeer *rps_peer; + + rps_peer = pending_rep->rps_peer; + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head, + rps_peer->pending_rep_tail, + pending_rep); + rps_peer->num_pending_reps--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Cancelling request\n"); + GNUNET_RPS_request_cancel (pending_rep->req_handle); + GNUNET_free (pending_rep); +} + +/** + * Cancel a request. + */ +static void +cancel_request_cb (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct PendingReply *pending_rep; + struct RPSPeer *rps_peer = (struct RPSPeer *) cls; + + if (GNUNET_YES == in_shutdown) + return; + pending_rep = rps_peer->pending_rep_head; + GNUNET_assert (1 <= rps_peer->num_pending_reps); + cancel_request (pending_rep); +} + + +/** + * Schedule requests for peer @a rps_peer that have neither been scheduled, nor + * issued, nor replied + */ +void +schedule_missing_requests (struct RPSPeer *rps_peer) +{ + unsigned int i; + struct PendingRequest *pending_req; + + for (i = rps_peer->num_pending_reqs + rps_peer->num_pending_reps; + i < rps_peer->num_ids_to_request; i++) + { + pending_req = GNUNET_new (struct PendingRequest); + pending_req->rps_peer = rps_peer; + pending_req->request_task = GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + cur_test_run.request_interval * i), + request_peers, + pending_req); + GNUNET_CONTAINER_DLL_insert_tail (rps_peer->pending_req_head, + rps_peer->pending_req_tail, + pending_req); + rps_peer->num_pending_reqs++; + } +} + +void +cancel_pending_req_rep (struct RPSPeer *rps_peer) +{ + while (NULL != rps_peer->pending_req_head) + cancel_pending_req (rps_peer->pending_req_head); + GNUNET_assert (0 == rps_peer->num_pending_reqs); + while (NULL != rps_peer->pending_rep_head) + cancel_request (rps_peer->pending_rep_head); + GNUNET_assert (0 == rps_peer->num_pending_reps); +} /*********************************** * MALICIOUS @@ -716,8 +850,8 @@ mal_pre (void *cls, struct GNUNET_RPS_Handle *h) uint32_t num_mal_peers; struct RPSPeer *rps_peer = (struct RPSPeer *) cls; - GNUNET_assert (1 >= portion - && 0 < portion); + GNUNET_assert ( (1 >= portion) && + (0 < portion) ); num_mal_peers = round (portion * num_peers); if (rps_peer->index < num_mal_peers) @@ -728,7 +862,8 @@ mal_pre (void *cls, struct GNUNET_RPS_Handle *h) GNUNET_i2s (rps_peer->peer_id), num_mal_peers); - GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers, rps_peer_ids); + GNUNET_RPS_act_malicious (h, mal_type, num_mal_peers, + rps_peer_ids, target_peer); } #endif /* ENABLE_MALICIOUS */ } @@ -739,8 +874,8 @@ mal_cb (struct RPSPeer *rps_peer) uint32_t num_mal_peers; #ifdef ENABLE_MALICIOUS - GNUNET_assert (1 >= portion - && 0 < portion); + GNUNET_assert ( (1 >= portion) && + (0 < portion) ); num_mal_peers = round (portion * num_peers); if (rps_peer->index >= num_mal_peers) @@ -748,8 +883,7 @@ mal_cb (struct RPSPeer *rps_peer) it's not sampling */ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2), seed_peers, rps_peer); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), - request_peers, rps_peer); + schedule_missing_requests (rps_peer); } #endif /* ENABLE_MALICIOUS */ } @@ -772,8 +906,7 @@ mal_eval (void) static void single_req_cb (struct RPSPeer *rps_peer) { - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), - request_peers, rps_peer); + schedule_missing_requests (rps_peer); } /*********************************** @@ -782,10 +915,7 @@ single_req_cb (struct RPSPeer *rps_peer) static void delay_req_cb (struct RPSPeer *rps_peer) { - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), - request_peers, rps_peer); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), - request_peers, rps_peer); + schedule_missing_requests (rps_peer); } /*********************************** @@ -824,8 +954,7 @@ seed_req_cb (struct RPSPeer *rps_peer) { GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 2), seed_peers, rps_peer); - GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15), - request_peers, rps_peer); + schedule_missing_requests (rps_peer); } //TODO start big mal @@ -836,16 +965,128 @@ seed_req_cb (struct RPSPeer *rps_peer) static void req_cancel_cb (struct RPSPeer *rps_peer) { - // TODO + schedule_missing_requests (rps_peer); + GNUNET_SCHEDULER_add_delayed ( + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, + (cur_test_run.request_interval + 1)), + cancel_request_cb, rps_peer); } /*********************************** * PROFILER ***********************************/ + +/** + * Callback to be called when RPS service is started or stopped at peers + * + * @param cls NULL + * @param op the operation handle + * @param emsg NULL on success; otherwise an error description + */ static void -churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +churn_cb (void *cls, + struct GNUNET_TESTBED_Operation *op, + const char *emsg) +{ + // FIXME + struct OpListEntry *entry = cls; + + GNUNET_TESTBED_operation_done (entry->op); + if (NULL != emsg) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start/stop RPS at a peer\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + GNUNET_assert (0 != entry->delta); + + num_peers_online += entry->delta; + + if (0 > entry->delta) + { /* Peer hopefully just went offline */ + if (GNUNET_YES != rps_peers[entry->index].online) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "peer %s was expected to go offline but is still marked as online\n", + GNUNET_i2s (rps_peers[entry->index].peer_id)); + GNUNET_break (0); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "peer %s probably went offline as expected\n", + GNUNET_i2s (rps_peers[entry->index].peer_id)); + } + rps_peers[entry->index].online = GNUNET_NO; + } + + else if (0 < entry->delta) + { /* Peer hopefully just went online */ + if (GNUNET_NO != rps_peers[entry->index].online) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "peer %s was expected to go online but is still marked as offline\n", + GNUNET_i2s (rps_peers[entry->index].peer_id)); + GNUNET_break (0); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "peer %s probably went online as expected\n", + GNUNET_i2s (rps_peers[entry->index].peer_id)); + if (NULL != cur_test_run.pre_test) + { + cur_test_run.pre_test (&rps_peers[entry->index], + rps_peers[entry->index].rps_handle); + schedule_missing_requests (&rps_peers[entry->index]); + } + } + rps_peers[entry->index].online = GNUNET_YES; + } + + GNUNET_CONTAINER_DLL_remove (oplist_head, oplist_tail, entry); + GNUNET_free (entry); + //if (num_peers_in_round[current_round] == peers_running) + // run_round (); +} + +static void +manage_service_wrapper (unsigned int i, unsigned int j, int delta, + double prob_go_on_off) { struct OpListEntry *entry; + uint32_t prob; + + prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, + UINT32_MAX); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%u. selected peer (%u: %s) is %s.\n", + i, + j, + GNUNET_i2s (rps_peers[j].peer_id), + (delta < 0)? "online" : "offline"); + if (prob < prob_go_on_off * UINT32_MAX) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "%s goes %s\n", + GNUNET_i2s (rps_peers[j].peer_id), + (delta < 0) ? "offline" : "online"); + + entry = make_oplist_entry (); + entry->delta = delta; + entry->index = j; + entry->op = GNUNET_TESTBED_peer_manage_service (NULL, + testbed_peers[j], + "rps", + &churn_cb, + entry, + (delta < 0) ? 0 : 1); + } +} + +static void +churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ unsigned int i; unsigned int j; double portion_online; @@ -853,7 +1094,6 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) double prob_go_offline; double portion_go_online; double portion_go_offline; - uint32_t prob; /* Compute the probability for an online peer to go offline * this round */ @@ -878,65 +1118,22 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) (unsigned int) num_peers); /* Go over 50% randomly chosen peers */ - for (i = 0 ; i < .5 * num_peers ; i++) + for (i = 0; i < .5 * num_peers; i++) { j = permut[i]; /* If online, shut down with certain probability */ if (GNUNET_YES == rps_peers[j].online) { - prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u. selected peer (%u: %s) is online.\n", - i, - j, - GNUNET_i2s (rps_peers[j].peer_id)); - if (prob < prob_go_offline * UINT32_MAX) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s goes offline\n", - GNUNET_i2s (rps_peers[j].peer_id)); - - entry = make_oplist_entry (); - entry->delta = -1; - entry->index = j; - entry->op = GNUNET_TESTBED_peer_manage_service (NULL, - testbed_peers[j], - "rps", - &churn_cb, - entry, - 0); - } - } - - /* If offline, restart with certain probability */ - else if (GNUNET_NO == rps_peers[j].online) - { - prob = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, - UINT32_MAX); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u. selected peer (%u: %s) is offline.\n", - i, - j, - GNUNET_i2s (rps_peers[j].peer_id)); - if (prob < .66 * UINT32_MAX) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%s goes online\n", - GNUNET_i2s (rps_peers[j].peer_id)); - - entry = make_oplist_entry (); - entry->delta = 1; - entry->index = j; - entry->op = GNUNET_TESTBED_peer_manage_service (NULL, - testbed_peers[j], - "rps", - &churn_cb, - entry, - 1); - } - } + cancel_pending_req_rep (&rps_peers[j]); + manage_service_wrapper (i, j, -1, prob_go_offline); + } + + /* If offline, restart with certain probability */ + else if (GNUNET_NO == rps_peers[j].online) + { + manage_service_wrapper (i, j, 1, 0.66); + } } GNUNET_free (permut); @@ -948,21 +1145,13 @@ churn (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } -static void -profiler_pre (void *cls, struct GNUNET_RPS_Handle *h) +/** + * Initialise given RPSPeer + */ +static void profiler_init_peer (struct RPSPeer *rps_peer) { - //churn_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - // 10), - // churn, NULL); - mal_pre (cls, h); - - /* if (NULL == churn_task) - { - churn_task = GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), - churn, - NULL); - } */ + if (num_peers - 1 == rps_peer->index) + rps_peer->num_ids_to_request = cur_test_run.num_requests; } @@ -978,52 +1167,50 @@ profiler_reply_handle (void *cls, uint64_t n, const struct GNUNET_PeerIdentity *recv_peers) { - struct RPSPeer *rps_peer = (struct RPSPeer *) cls; + struct RPSPeer *rps_peer; struct RPSPeer *rcv_rps_peer; char *file_name; char *file_name_dh; unsigned int i; + struct PendingReply *pending_rep = (struct PendingReply *) cls; + rps_peer = pending_rep->rps_peer; file_name = "/tmp/rps/received_ids"; file_name_dh = "/tmp/rps/diehard_input"; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "[%s] got %" PRIu64 " peers:\n", GNUNET_i2s (rps_peer->peer_id), n); - - for (i = 0 ; i < n ; i++) + for (i = 0; i < n; i++) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "%u: %s\n", i, GNUNET_i2s (&recv_peers[i])); - - /* GNUNET_array_append (rps_peer->rec_ids, rps_peer->num_rec_ids, recv_peers[i]); */ tofile (file_name, "%s\n", GNUNET_i2s_full (&recv_peers[i])); - rcv_rps_peer = GNUNET_CONTAINER_multipeermap_get (peer_map, &recv_peers[i]); - tofile (file_name_dh, "%" PRIu32 "\n", (uint32_t) rcv_rps_peer->index); } + /* Find #PendingReply holding the request handle */ + GNUNET_CONTAINER_DLL_remove (rps_peer->pending_rep_head, + rps_peer->pending_rep_tail, + pending_rep); + rps_peer->num_pending_reps--; } static void profiler_cb (struct RPSPeer *rps_peer) { - uint32_t i; - - /* Churn only at peers that do not request peers for evaluation */ - if (NULL == churn_task && - rps_peer->index != num_peers - 2) + /* Start churn */ + if (NULL == churn_task) { churn_task = GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10), + GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), churn, NULL); } @@ -1031,17 +1218,8 @@ profiler_cb (struct RPSPeer *rps_peer) /* Only request peer ids at one peer. * (It's the before-last because last one is target of the focussed attack.) */ - if (rps_peer->index == num_peers - 2) - { - for (i = 0 ; i < cur_test_run.num_requests ; i++) - { - GNUNET_SCHEDULER_add_delayed ( - GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, - cur_test_run.request_interval * i), - request_peers, - rps_peer); - } - } + if (eval_peer == rps_peer) + schedule_missing_requests (rps_peer); } /** @@ -1126,7 +1304,6 @@ run (void *cls, testbed_peers = peers; num_peers_online = 0; - for (i = 0 ; i < num_peers ; i++) { entry = make_oplist_entry (); @@ -1137,16 +1314,6 @@ run (void *cls, entry); } - - // This seems not to work - //if (NULL != strstr (cur_test_run.name, "profiler")) - //{ - // churn_task = GNUNET_SCHEDULER_add_delayed ( - // GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5), - // churn, - // NULL); - //} - GNUNET_assert (num_peers == n_peers); for (i = 0 ; i < n_peers ; i++) { @@ -1161,6 +1328,9 @@ run (void *cls, &rps_disconnect_adapter, &rps_peers[i]); } + + if (NULL != churn_task) + GNUNET_SCHEDULER_cancel (churn_task); GNUNET_SCHEDULER_add_delayed (timeout, &shutdown_task, NULL); } @@ -1177,13 +1347,13 @@ main (int argc, char *argv[]) { int ret_value; + num_peers = 5; cur_test_run.name = "test-rps-default"; + cur_test_run.init_peer = default_init_peer; cur_test_run.pre_test = NULL; cur_test_run.reply_handle = default_reply_handle; cur_test_run.eval_cb = default_eval_cb; churn_task = NULL; - - num_peers = 5; timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30); if (strstr (argv[0], "malicious") != NULL) @@ -1259,31 +1429,39 @@ main (int argc, char *argv[]) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test cancelling a request\n"); cur_test_run.name = "test-rps-req-cancel"; + num_peers = 1; cur_test_run.main_test = req_cancel_cb; + cur_test_run.eval_cb = no_eval; } else if (strstr (argv[0], "profiler") != NULL) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "This is the profiler\n"); cur_test_run.name = "test-rps-profiler"; + num_peers = 10; mal_type = 3; - cur_test_run.pre_test = profiler_pre; + cur_test_run.init_peer = profiler_init_peer; + cur_test_run.pre_test = mal_pre; cur_test_run.main_test = profiler_cb; cur_test_run.reply_handle = profiler_reply_handle; cur_test_run.eval_cb = profiler_eval; cur_test_run.request_interval = 2; - cur_test_run.num_requests = 50; - - num_peers = 50; + cur_test_run.num_requests = 5; + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90); + /* 'Clean' directory */ (void) GNUNET_DISK_directory_remove ("/tmp/rps/"); GNUNET_DISK_directory_create ("/tmp/rps/"); - timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 90); } rps_peers = GNUNET_new_array (num_peers, struct RPSPeer); - rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); peer_map = GNUNET_CONTAINER_multipeermap_create (num_peers, GNUNET_NO); + rps_peer_ids = GNUNET_new_array (num_peers, struct GNUNET_PeerIdentity); + if ( (2 == mal_type) || + (3 == mal_type)) + target_peer = &rps_peer_ids[num_peers - 2]; + if (profiler_eval == cur_test_run.eval_cb) + eval_peer = &rps_peers[num_peers - 1]; ok = 1; (void) GNUNET_TESTBED_test_run (cur_test_run.name, @@ -1293,11 +1471,9 @@ main (int argc, char *argv[]) &run, NULL); ret_value = cur_test_run.eval_cb(); - GNUNET_free (rps_peers ); GNUNET_free (rps_peer_ids); GNUNET_CONTAINER_multipeermap_destroy (peer_map); - return ret_value; } |