diff options
author | Gabor X Toth <*@tg-x.net> | 2013-10-10 18:08:53 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2013-10-10 18:08:53 +0000 |
commit | 1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb (patch) | |
tree | 3cd28bfee831af0417c2dcbb543c03481517ad00 /src | |
parent | 67a8e21eedb6d35fec76841d4a1a6b4b41b37879 (diff) |
PSYC: master msg transmission
Diffstat (limited to 'src')
-rw-r--r-- | src/include/gnunet_protocols.h | 37 | ||||
-rw-r--r-- | src/include/gnunet_psyc_service.h | 12 | ||||
-rw-r--r-- | src/multicast/multicast_api.c | 22 | ||||
-rw-r--r-- | src/psyc/gnunet-service-psyc.c | 271 | ||||
-rw-r--r-- | src/psyc/psyc.h | 7 | ||||
-rw-r--r-- | src/psyc/psyc_api.c | 218 | ||||
-rw-r--r-- | src/psyc/test_psyc.c | 90 |
7 files changed, 505 insertions, 152 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h index bfc8f1ab31..84ac9dda1f 100644 --- a/src/include/gnunet_protocols.h +++ b/src/include/gnunet_protocols.h @@ -2083,48 +2083,31 @@ extern "C" #define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM 690 -#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD 691 +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 691 -#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER 692 +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 692 -#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MOD_CONT 693 +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 693 -#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA 694 +#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 694 #define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 695 -#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 696 - -#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 697 - -#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 698 - -#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 699 - -#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 700 - - #define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701 -#define GNUNET_MESSAGE_TYPE_PSYC_STORY_METHOD 702 - -#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MODIFIER 703 - -#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MOD_CONT 704 - -#define GNUNET_MESSAGE_TYPE_PSYC_STORY_DATA 705 +#define GNUNET_MESSAGE_TYPE_PSYC_STORY_RESPONSE 702 -#define GNUNET_MESSAGE_TYPE_PSYC_STORY_ACK 706 +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET 703 -#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET 707 +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX 704 -#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX 708 +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_RESPONSE 705 -#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MODIFIER 709 +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MODIFIER 706 -#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MOD_CONT 710 +#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MOD_CONT 707 /******************************************************************************* diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h index 7ac40a4c5d..758b5e3d10 100644 --- a/src/include/gnunet_psyc_service.h +++ b/src/include/gnunet_psyc_service.h @@ -207,8 +207,8 @@ struct GNUNET_PSYC_MessageMethod uint32_t flags GNUNET_PACKED; /** - * Sending slave's public key. NULL if the message is from the master, or when - * transmitting a message. + * Sending slave's public key. + * NULL if the message is from the master, or when transmitting a message. */ struct GNUNET_CRYPTO_EddsaPublicKey slave_key; @@ -264,7 +264,7 @@ enum GNUNET_PSYC_DataStatus struct GNUNET_PSYC_MessageData { /** - * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER + * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA */ struct GNUNET_MessageHeader header; @@ -367,7 +367,7 @@ typedef int void GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, int is_admitted, - unsigned int relay_count, + uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, const char *method_name, const struct GNUNET_ENV_Environment *env, @@ -582,7 +582,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, const struct GNUNET_CRYPTO_EddsaPrivateKey *slave_key, const struct GNUNET_PeerIdentity *origin, - size_t relay_count, + uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_Method method, GNUNET_PSYC_JoinCallback join_cb, @@ -591,7 +591,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const char *method_name, const struct GNUNET_ENV_Environment *env, const void *data, - size_t data_size); + uint16_t data_size); /** diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c index 3dbc07f5d4..49e648468b 100644 --- a/src/multicast/multicast_api.c +++ b/src/multicast/multicast_api.c @@ -30,7 +30,10 @@ #include "gnunet_multicast_service.h" #include "multicast.h" -/** +#define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__) + + +/** * Handle for a request to send a message to all multicast group members * (from the origin). */ @@ -38,6 +41,7 @@ struct GNUNET_MULTICAST_OriginMessageHandle { GNUNET_MULTICAST_OriginTransmitNotify notify; void *notify_cls; + struct GNUNET_MULTICAST_Origin *origin; uint64_t message_id; uint64_t group_generation; @@ -353,6 +357,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg, static void schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { + LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_origin_to_all()\n"); struct GNUNET_MULTICAST_Origin *orig = cls; struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle; @@ -361,12 +366,18 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc = GNUNET_malloc (sizeof (*msg) + buf_size); int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]); - if (ret != GNUNET_YES || ret != GNUNET_NO) + if (! (GNUNET_YES == ret || GNUNET_NO == ret) + || buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE) { + LOG (GNUNET_ERROR_TYPE_ERROR, + "MasterTransmitNotify() returned error or invalid message size.\n"); /* FIXME: handle error */ return; } + if (GNUNET_NO == ret && 0 == buf_size) + return; /* Transmission paused. */ + msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); msg->header.size = htons (buf_size); msg->message_id = mh->message_id; @@ -393,12 +404,12 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc * returned signed message. * FIXME: Also send to local members in this group. */ - orig->message_cb (orig->cls, msg); + orig->message_cb (orig->cls, (const struct GNUNET_MessageHeader *) msg); if (GNUNET_NO == ret) GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1), - schedule_origin_to_all, mh); + schedule_origin_to_all, orig); } @@ -421,6 +432,7 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin, void *notify_cls) { struct GNUNET_MULTICAST_OriginMessageHandle *mh = &origin->msg_handle; + mh->origin = origin; mh->message_id = message_id; mh->group_generation = group_generation; mh->notify = notify; @@ -441,7 +453,7 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin, void GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginMessageHandle *mh) { - + GNUNET_SCHEDULER_add_now (schedule_origin_to_all, mh->origin); } diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c index 7f5189ab8f..d3f203ebf9 100644 --- a/src/psyc/gnunet-service-psyc.c +++ b/src/psyc/gnunet-service-psyc.c @@ -34,8 +34,6 @@ #include "gnunet_psyc_service.h" #include "psyc.h" -#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__) - /** * Handle to our current configuration. @@ -58,6 +56,11 @@ static struct GNUNET_SERVER_NotificationContext *nc; static struct GNUNET_PSYCSTORE_Handle *store; /** + * channel's pub_key_hash -> struct Channel + */ +static struct GNUNET_CONTAINER_MultiHashMap *clients; + +/** * Message in the transmission queue. */ struct TransmitMessage @@ -81,6 +84,7 @@ struct Channel struct TransmitMessage *tmit_tail; char *tmit_buf; + GNUNET_SCHEDULER_TaskIdentifier tmit_task; uint32_t tmit_mod_count; uint32_t tmit_mod_recvd; uint16_t tmit_size; @@ -96,8 +100,9 @@ struct Channel struct Master { struct Channel channel; - struct GNUNET_CRYPTO_EccPrivateKey private_key; - struct GNUNET_CRYPTO_EccPublicSignKey public_key; + struct GNUNET_CRYPTO_EccPrivateKey priv_key; + struct GNUNET_CRYPTO_EccPublicSignKey pub_key; + struct GNUNET_HashCode pub_key_hash; struct GNUNET_MULTICAST_Origin *origin; struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle; @@ -120,13 +125,20 @@ struct Slave { struct Channel channel; struct GNUNET_CRYPTO_EccPrivateKey slave_key; - struct GNUNET_CRYPTO_EccPublicSignKey channel_key; + struct GNUNET_CRYPTO_EccPublicSignKey chan_key; + struct GNUNET_HashCode chan_key_hash; struct GNUNET_MULTICAST_Member *member; struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle; + struct GNUNET_PeerIdentity origin; + struct GNUNET_PeerIdentity *relays; + struct GNUNET_MessageHeader *join_req; + uint64_t max_message_id; uint64_t max_request_id; + + uint32_t relay_count; }; @@ -166,41 +178,151 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client); - struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, - struct Channel); - GNUNET_assert (NULL != ch); + struct Channel *ch + = GNUNET_SERVER_client_get_user_context (client, struct Channel); + if (NULL == ch) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "User context is NULL in client_disconnect()\n"); + GNUNET_break (0); + return; + } if (NULL != ch->tmit_buf) { GNUNET_free (ch->tmit_buf); ch->tmit_buf = NULL; } + + if (ch->is_master) + { + struct Master *mst = (struct Master *) ch; + if (NULL != mst->origin) + GNUNET_MULTICAST_origin_stop (mst->origin); + } + else + { + struct Slave *slv = (struct Slave *) ch; + if (NULL != slv->join_req) + GNUNET_free (slv->join_req); + if (NULL != slv->relays) + GNUNET_free (slv->relays); + if (NULL != slv->member) + GNUNET_MULTICAST_member_part (slv->member); + } + GNUNET_free (ch); } +void +join_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, + const struct GNUNET_MessageHeader *join_req, + struct GNUNET_MULTICAST_JoinHandle *jh) +{ + +} void -counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id, - uint64_t max_group_generation, uint64_t max_state_message_id) +membership_test_cb (void *cls, + const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, + uint64_t message_id, uint64_t group_generation, + struct GNUNET_MULTICAST_MembershipTestHandle *mth) { - struct Channel *ch = cls; + +} + +void +replay_fragment_cb (void *cls, + const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, + uint64_t fragment_id, uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + +} + +void +replay_message_cb (void *cls, + const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, + uint64_t message_id, + uint64_t fragment_offset, + uint64_t flags, + struct GNUNET_MULTICAST_ReplayHandle *rh) +{ + +} + +void +request_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key, + const struct GNUNET_MessageHeader *req, + enum GNUNET_MULTICAST_MessageFlags flags) +{ + +} + +void +message_cb (void *cls, const struct GNUNET_MessageHeader *msg) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received message of type %u from multicast.\n", + ntohs (msg->type)); +} + +void +master_counters_cb (void *cls, int result, uint64_t max_fragment_id, + uint64_t max_message_id, uint64_t max_group_generation, + uint64_t max_state_message_id) +{ + struct Master *mst = cls; + struct Channel *ch = &mst->channel; struct CountersResult *res = GNUNET_malloc (sizeof (*res)); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); res->header.size = htons (sizeof (*res)); + res->result_code = htonl (result); res->max_message_id = GNUNET_htonll (max_message_id); - if (ch->is_master) + if (GNUNET_OK == result || GNUNET_NO == result) { - struct Master *mst = cls; mst->max_message_id = max_message_id; mst->max_state_message_id = max_state_message_id; mst->max_group_generation = max_group_generation; - res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK); + mst->origin + = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key, + max_fragment_id + 1, + join_cb, membership_test_cb, + replay_fragment_cb, replay_message_cb, + request_cb, message_cb, ch); } - else + GNUNET_SERVER_notification_context_add (nc, ch->client); + GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header, + GNUNET_NO); + GNUNET_free (res); +} + + +void +slave_counters_cb (void *cls, int result, uint64_t max_fragment_id, + uint64_t max_message_id, uint64_t max_group_generation, + uint64_t max_state_message_id) +{ + struct Slave *slv = cls; + struct Channel *ch = &slv->channel; + struct CountersResult *res = GNUNET_malloc (sizeof (*res)); + res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); + res->header.size = htons (sizeof (*res)); + res->result_code = htonl (result); + res->max_message_id = GNUNET_htonll (max_message_id); + + if (GNUNET_OK == result || GNUNET_NO == result) { - struct Slave *slv = cls; slv->max_message_id = max_message_id; - res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK); + slv->member + = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key, + &slv->origin, + slv->relay_count, slv->relays, + slv->join_req, join_cb, + membership_test_cb, + replay_fragment_cb, replay_message_cb, + message_cb, ch); } GNUNET_SERVER_notification_context_add (nc, ch->client); @@ -220,14 +342,17 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client, mst->channel.client = client; mst->channel.is_master = GNUNET_YES; mst->policy = ntohl (req->policy); - mst->private_key = req->channel_key; - GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key, - &mst->public_key); + mst->priv_key = req->channel_key; + GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->priv_key, + &mst->pub_key); + GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash); - GNUNET_PSYCSTORE_counters_get (store, &mst->public_key, - counters_cb, mst); + GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key, + master_counters_cb, mst); - GNUNET_SERVER_client_set_user_context (client, mst); + GNUNET_SERVER_client_set_user_context (client, &mst->channel); + GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -241,13 +366,25 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client, struct Slave *slv = GNUNET_new (struct Slave); slv->channel.client = client; slv->channel.is_master = GNUNET_NO; - slv->channel_key = req->channel_key; slv->slave_key = req->slave_key; - - GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key, - counters_cb, slv); - - GNUNET_SERVER_client_set_user_context (client, slv); + slv->chan_key = req->channel_key; + GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key), + &slv->chan_key_hash); + slv->origin = req->origin; + slv->relay_count = ntohl (req->relay_count); + + const struct GNUNET_PeerIdentity *relays + = (const struct GNUNET_PeerIdentity *) &req[1]; + slv->relays + = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity)); + uint32_t i; + for (i = 0; i < slv->relay_count; i++) + memcpy (&slv->relays[i], &relays[i], sizeof (*relays)); + + GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key, + slave_counters_cb, slv); + + GNUNET_SERVER_client_set_user_context (client, &slv->channel); GNUNET_SERVER_receive_done (client, GNUNET_OK); } @@ -268,34 +405,40 @@ send_transmit_ack (struct Channel *ch) static int -transmit_notify (void *cls, uint64_t fragment_id, size_t *data_size, void *data) +transmit_notify (void *cls, size_t *data_size, void *data) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n"); struct Channel *ch = cls; struct TransmitMessage *msg = ch->tmit_head; - if (NULL == msg || *data_size < msg->size) + if (NULL == msg || *data_size < ntohs (msg->size)) { *data_size = 0; return GNUNET_NO; } - memcpy (data, msg->buf, msg->size); - *data_size = msg->size; + *data_size = ntohs (msg->size); + memcpy (data, msg->buf, *data_size); GNUNET_free (ch->tmit_buf); + ch->tmit_buf = NULL; GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg); return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES; } -static int -master_transmit_message (struct Master *mst) +static void +master_transmit_message (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n"); + struct Master *mst = cls; + mst->channel.tmit_task = 0; if (NULL == mst->tmit_handle) { mst->tmit_handle - = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id, + = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id, mst->max_group_generation, transmit_notify, mst); } @@ -303,24 +446,25 @@ master_transmit_message (struct Master *mst) { GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle); } - return GNUNET_OK; } -static int -slave_transmit_message (struct Slave *slv) +static void +slave_transmit_message (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { + struct Slave *slv = cls; + slv->channel.tmit_task = 0; if (NULL == slv->tmit_handle) { slv->tmit_handle - = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id, + = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id, transmit_notify, slv); } else { GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle); } - return GNUNET_OK; } @@ -328,6 +472,7 @@ static int buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) { uint16_t size = ntohs (msg->size); + struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO; if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size) return GNUNET_SYSERR; @@ -353,12 +498,17 @@ buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg) tmit_msg->size = size; tmit_msg->status = ch->tmit_status; GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg); - - ch->is_master - ? master_transmit_message ((struct Master *) ch) - : slave_transmit_message ((struct Slave *) ch); + tmit_delay = GNUNET_TIME_UNIT_ZERO; } + if (0 != ch->tmit_task) + GNUNET_SCHEDULER_cancel (ch->tmit_task); + + ch->tmit_task + = ch->is_master + ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch) + : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch); + return GNUNET_OK; } @@ -368,8 +518,8 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, { const struct GNUNET_PSYC_MessageMethod *meth = (const struct GNUNET_PSYC_MessageMethod *) msg; - struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, - struct Channel); + struct Channel *ch + = GNUNET_SERVER_client_get_user_context (client, struct Channel); GNUNET_assert (NULL != ch); if (GNUNET_NO != ch->in_transmit) @@ -378,6 +528,7 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, return; } + ch->in_transmit = GNUNET_YES; ch->tmit_buf = NULL; ch->tmit_size = 0; ch->tmit_mod_recvd = 0; @@ -388,6 +539,8 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client, if (0 == ch->tmit_mod_count) send_transmit_ack (ch); + + GNUNET_SERVER_receive_done (client, GNUNET_OK); }; @@ -397,8 +550,8 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, { const struct GNUNET_PSYC_MessageModifier *mod = (const struct GNUNET_PSYC_MessageModifier *) msg; - struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, - struct Channel); + struct Channel *ch + = GNUNET_SERVER_client_get_user_context (client, struct Channel); GNUNET_assert (NULL != ch); ch->tmit_mod_recvd++; @@ -406,6 +559,8 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client, if (ch->tmit_mod_recvd == ch->tmit_mod_count) send_transmit_ack (ch); + + GNUNET_SERVER_receive_done (client, GNUNET_OK); }; @@ -415,13 +570,18 @@ handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client, { const struct GNUNET_PSYC_MessageData *data = (const struct GNUNET_PSYC_MessageData *) msg; - struct Channel *ch = GNUNET_SERVER_client_get_user_context (client, - struct Channel); + struct Channel *ch + = GNUNET_SERVER_client_get_user_context (client, struct Channel); GNUNET_assert (NULL != ch); - ch->tmit_status = data->status; + ch->tmit_status = ntohs (data->status); buffer_message (ch, msg); send_transmit_ack (ch); + + if (GNUNET_PSYC_DATA_CONT != ch->tmit_status) + ch->in_transmit = GNUNET_NO; + + GNUNET_SERVER_receive_done (client, GNUNET_OK); }; @@ -444,13 +604,13 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 }, { &handle_transmit_method, NULL, - GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 }, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 }, { &handle_transmit_modifier, NULL, - GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 }, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 }, { &handle_transmit_data, NULL, - GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 }, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 }, { NULL, NULL, 0, 0 } }; @@ -458,6 +618,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server, cfg = c; store = GNUNET_PSYCSTORE_connect (cfg); stats = GNUNET_STATISTICS_create ("psyc", cfg); + clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES); nc = GNUNET_SERVER_notification_context_create (server, 1); GNUNET_SERVER_add_handlers (server, handlers); GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL); diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h index 35e9ae8006..6a88263375 100644 --- a/src/psyc/psyc.h +++ b/src/psyc/psyc.h @@ -65,6 +65,11 @@ struct CountersResult */ struct GNUNET_MessageHeader header; + /** + * Status code for the operation. + */ + int32_t result_code GNUNET_PACKED; + uint64_t max_message_id; }; @@ -121,6 +126,8 @@ struct SlaveJoinRequest struct GNUNET_CRYPTO_EccPrivateKey slave_key; + struct GNUNET_PeerIdentity origin; + /* Followed by struct GNUNET_PeerIdentity relays[relay_count] */ }; diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index abe7bb0288..4178d920ba 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -106,6 +106,28 @@ struct GNUNET_PSYC_Channel * Are we currently transmitting a message? */ int in_transmit; + + /** + * Is this a master or slave channel? + */ + int is_master; + + /** + * Buffer space available for transmitting the next data fragment. + */ + uint16_t tmit_buf_avail; +}; + + +/** + * Handle for a pending PSYC transmission operation. + */ +struct GNUNET_PSYC_MasterTransmitHandle +{ + struct GNUNET_PSYC_Master *master; + GNUNET_PSYC_MasterTransmitNotify notify; + void *notify_cls; + enum GNUNET_PSYC_DataStatus status; }; @@ -116,6 +138,8 @@ struct GNUNET_PSYC_Master { struct GNUNET_PSYC_Channel ch; + struct GNUNET_PSYC_MasterTransmitHandle *tmit; + GNUNET_PSYC_MasterStartCallback start_cb; uint64_t max_message_id; @@ -146,19 +170,6 @@ struct GNUNET_PSYC_JoinHandle /** * Handle for a pending PSYC transmission operation. */ -struct GNUNET_PSYC_MasterTransmitHandle -{ - struct GNUNET_PSYC_Master *master; - const struct GNUNET_ENV_Environment *env; - GNUNET_PSYC_MasterTransmitNotify notify; - void *notify_cls; - enum GNUNET_PSYC_MasterTransmitFlags flags; -}; - - -/** - * Handle for a pending PSYC transmission operation. - */ struct GNUNET_PSYC_SlaveTransmitHandle { @@ -184,10 +195,10 @@ struct GNUNET_PSYC_StateQuery /** - * Try again to connect to the PSYCstore service. + * Try again to connect to the PSYC service. * - * @param cls handle to the PSYCstore service. - * @param tc scheduler context + * @param cls Handle to the PSYC service. + * @param tc Scheduler context */ static void reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); @@ -215,7 +226,7 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c) } c->in_receive = GNUNET_NO; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to PSYCstore service in %s.\n", + "Scheduling task to reconnect to PSYC service in %s.\n", GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); c->reconnect_task = GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); @@ -226,12 +237,56 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c) /** * Schedule transmission of the next message from our queue. * - * @param h PSYCstore handle + * @param h PSYC handle */ static void transmit_next (struct GNUNET_PSYC_Channel *c); +void +master_transmit_data (struct GNUNET_PSYC_Master *mst) +{ + struct GNUNET_PSYC_Channel *ch = &mst->ch; + size_t data_size = ch->tmit_buf_avail; + struct GNUNET_PSYC_MessageData *pdata; + struct OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size); + pdata = (struct GNUNET_PSYC_MessageData *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) pdata; + pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); + + switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1])) + { + case GNUNET_NO: + mst->tmit->status = GNUNET_PSYC_DATA_CONT; + break; + + case GNUNET_YES: + mst->tmit->status = GNUNET_PSYC_DATA_END; + break; + + default: + mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; + data_size = 0; + LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n"); + } + + if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) + { + /* Transmission paused, nothing to send. */ + GNUNET_free (op); + } + else + { + GNUNET_assert (data_size <= ch->tmit_buf_avail); + pdata->header.size = htons (sizeof (*pdata) + data_size); + pdata->status = htons (mst->tmit->status); + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); + transmit_next (ch); + } +} + + /** * Type of a function to call when we receive a message * from the service. @@ -253,8 +308,8 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) } uint16_t size_eq = 0; uint16_t size_min = 0; - const uint16_t size = ntohs (msg->size); - const uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + uint16_t type = ntohs (msg->type); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %d from PSYC service\n", type); @@ -265,6 +320,9 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: size_eq = sizeof (struct CountersResult); break; + case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: + size_eq = sizeof (struct TransmitAck); + break; } if (! ((0 < size_eq && size == size_eq) @@ -276,6 +334,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) } struct CountersResult *cres; + struct TransmitAck *tack; switch (type) { @@ -294,17 +353,39 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) mst->join_ack_cb (ch->cb_cls, mst->max_message_id); #endif break; + + case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: + tack = (struct TransmitAck *) msg; + if (ch->is_master) + { + GNUNET_assert (NULL != mst->tmit); + if (GNUNET_PSYC_DATA_CONT != mst->tmit->status + || NULL == mst->tmit->notify) + { + GNUNET_free (mst->tmit); + mst->tmit = NULL; + } + else + { + ch->tmit_buf_avail = ntohs (tack->buf_avail); + master_transmit_data (mst); + } + } + else + { + /* TODO: slave */ + } + break; } GNUNET_CLIENT_receive (ch->client, &message_handler, ch, GNUNET_TIME_UNIT_FOREVER_REL); } - /** * Transmit next message to service. * - * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. + * @param cls The 'struct GNUNET_PSYC_Channel'. * @param size Number of bytes available in buf. * @param buf Where to copy the message. * @return Number of bytes copied to buf. @@ -326,7 +407,7 @@ send_next_message (void *cls, size_t size, void *buf) return 0; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending message of type %d to PSYCstore service\n", + "Sending message of type %d to PSYC service\n", ntohs (op->msg->type)); memcpy (buf, op->msg, ret); @@ -349,7 +430,7 @@ send_next_message (void *cls, size_t size, void *buf) /** * Schedule transmission of the next message from our queue. * - * @param h PSYCstore handle. + * @param h PSYC handle. */ static void transmit_next (struct GNUNET_PSYC_Channel *ch) @@ -391,14 +472,12 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if (NULL == ch->transmit_head || ch->transmit_head->msg->type != ch->reconnect_msg->type) { - struct OperationHandle *op - = GNUNET_malloc (sizeof (struct OperationHandle) - + ntohs (ch->reconnect_msg->size)); - memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size)); + uint16_t reconn_size = ntohs (ch->reconnect_msg->size); + struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); + memcpy (&op[1], ch->reconnect_msg, reconn_size); op->msg = (struct GNUNET_MessageHeader *) &op[1]; GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); } - transmit_next (ch); } @@ -414,7 +493,12 @@ disconnect (void *c) { struct GNUNET_PSYC_Channel *ch = c; GNUNET_assert (NULL != ch); - GNUNET_assert (ch->transmit_head == ch->transmit_tail); + if (ch->transmit_head != ch->transmit_tail) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Disconnecting while there are still outstanding messages!\n"); + GNUNET_break (0); + } if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (ch->reconnect_task); @@ -431,7 +515,10 @@ disconnect (void *c) ch->client = NULL; } if (NULL != ch->reconnect_msg) + { + GNUNET_free (ch->reconnect_msg); ch->reconnect_msg = NULL; + } } @@ -475,12 +562,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_PSYC_Channel *ch = &mst->ch; struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); - req->header.size = htons (sizeof (*req) + sizeof (*channel_key)); + req->header.size = htons (sizeof (*req)); req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); req->channel_key = *channel_key; req->policy = policy; ch->cfg = cfg; + ch->is_master = GNUNET_YES; ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); @@ -532,7 +620,7 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) void GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, int is_admitted, - unsigned int relay_count, + uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, const char *method_name, const struct GNUNET_ENV_Environment *env, @@ -556,13 +644,13 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; op->msg = (struct GNUNET_MessageHeader *) pmod; - pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER; + pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); pmod->name_size = htons (name_size); memcpy (&pmod[1], mod->name, name_size); - memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size); + memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); return GNUNET_YES; } @@ -594,29 +682,41 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst, return NULL; ch->in_transmit = GNUNET_YES; + size_t size = strlen (method_name) + 1; struct GNUNET_PSYC_MessageMethod *pmeth; - struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth)); + struct OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size); pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; op->msg = (struct GNUNET_MessageHeader *) pmeth; - pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD; - size_t size = strlen (method_name) + 1; + pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); pmeth->header.size = htons (sizeof (*pmeth) + size); pmeth->flags = htonl (flags); - pmeth->mod_count - = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); + pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); memcpy (&pmeth[1], method_name, size); - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); - + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); GNUNET_ENV_environment_iterate (env, send_modifier, mst); + transmit_next (ch); + + mst->tmit = GNUNET_malloc (sizeof (*mst->tmit)); + mst->tmit->master = mst; + mst->tmit->notify = notify; + mst->tmit->notify_cls = notify_cls; + mst->tmit->status = GNUNET_PSYC_DATA_CONT; + return mst->tmit; +} + - struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th)); - th->master = mst; - th->env = env; - th->notify = notify; - th->notify_cls = notify_cls; - return th; +/** + * Resume transmission to the channel. + * + * @param th Handle of the request that is being resumed. + */ +void +GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) +{ + master_transmit_data (th->master); } @@ -671,7 +771,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, const struct GNUNET_CRYPTO_EccPrivateKey *slave_key, const struct GNUNET_PeerIdentity *origin, - size_t relay_count, + uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_Method method, GNUNET_PSYC_JoinCallback join_cb, @@ -680,7 +780,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const char *method_name, const struct GNUNET_ENV_Environment *env, const void *data, - size_t data_size) + uint16_t data_size) { struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); struct GNUNET_PSYC_Channel *ch = &slv->ch; @@ -692,10 +792,12 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); req->channel_key = *channel_key; req->slave_key = *slave_key; + req->origin = *origin; req->relay_count = relay_count; memcpy (&req[1], relays, relay_count * sizeof (*relays)); ch->cfg = cfg; + ch->is_master = GNUNET_NO; ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); @@ -746,6 +848,18 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, /** + * Resume transmission to the master. + * + * @param th Handle of the request that is being resumed. + */ +void +GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) +{ + +} + + +/** * Abort transmission request to master. * * @param th Handle of the request that is being aborted. @@ -822,7 +936,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch, slvadd->announced_at = GNUNET_htonll (announced_at); slvadd->effective_since = GNUNET_htonll (effective_since); - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); transmit_next (ch); } @@ -863,7 +977,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch, slvrm->header.size = htons (sizeof (*slvrm)); slvrm->announced_at = GNUNET_htonll (announced_at); - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); transmit_next (ch); } diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c index b5bc6d1355..1d7035a87e 100644 --- a/src/psyc/test_psyc.c +++ b/src/psyc/test_psyc.c @@ -19,8 +19,8 @@ */ /** - * @file psycstore/test_psycstore.c - * @brief Test for the PSYCstore service. + * @file psyc/test_psyc.c + * @brief Test for the PSYC service. * @author Gabor X Toth * @author Christian Grothoff */ @@ -30,6 +30,7 @@ #include "gnunet_common.h" #include "gnunet_util_lib.h" #include "gnunet_testing_lib.h" +#include "gnunet_env_lib.h" #include "gnunet_psyc_service.h" #define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10) @@ -59,6 +60,8 @@ static struct GNUNET_CRYPTO_EccPrivateKey *slave_key; static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key; static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key; +struct GNUNET_PSYC_MasterTransmitHandle *mth; + /** * Clean up all resources used. */ @@ -120,11 +123,14 @@ end () static int method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, - uint64_t message_id, const char *method_name, + uint64_t message_id, const char *name, size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers, uint64_t data_offset, const void *data, size_t data_size, enum GNUNET_PSYC_MessageFlags flags) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Method: %s, modifiers: %lu, flags: %u\n%.*s\n", + name, modifier_count, flags, data_size, data); return GNUNET_OK; } @@ -138,11 +144,72 @@ join (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key, return GNUNET_OK; } +struct TransmitClosure +{ + struct GNUNET_PSYC_MasterTransmitHandle *handle; + uint8_t n; + uint8_t fragment_count; + char *fragments[16]; + uint16_t fragment_sizes[16]; +}; + + +static void +transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n"); + struct TransmitClosure *tmit = cls; + GNUNET_PSYC_master_transmit_resume (tmit->handle); +} + + +static int +transmit_notify (void *cls, size_t *data_size, void *data) +{ + struct TransmitClosure *tmit = cls; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Transmit notify: %lu bytes\n", *data_size); + + if (tmit->fragment_count <= tmit->n) + return GNUNET_YES; + + GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size); + + *data_size = tmit->fragment_sizes[tmit->n]; + memcpy (data, tmit->fragments[tmit->n], *data_size); + tmit->n++; + + if (tmit->n == tmit->fragment_count - 1) + { + /* Send last fragment later. */ + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume, + tmit); + *data_size = 0; + return GNUNET_NO; + } + return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES; +} void master_started (void *cls, uint64_t max_message_id) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id); + + struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); + GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, + "_foo", "bar baz", 7); + GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, + "_foo_bar", "foo bar baz", 11); + + struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure); + tmit->fragment_count = 2; + tmit->fragments[0] = "foo bar"; + tmit->fragment_sizes[0] = 7; + tmit->fragments[1] = "baz!"; + tmit->fragment_sizes[1] = 4; + tmit->handle + = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit, + GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN); } @@ -157,7 +224,7 @@ slave_joined (void *cls, uint64_t max_message_id) * Main function of the test, run from scheduler. * * @param cls NULL - * @param cfg configuration we use (also to connect to PSYCstore service) + * @param cfg configuration we use (also to connect to PSYC service) * @param peer handle to access more of the peer (not used) */ static void @@ -182,9 +249,18 @@ run (void *cls, mst = GNUNET_PSYC_master_start (cfg, channel_key, GNUNET_PSYC_CHANNEL_PRIVATE, &method, &join, &master_started, NULL); - - slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, - &method, &join, &slave_joined, NULL); + return; + struct GNUNET_PeerIdentity origin; + struct GNUNET_PeerIdentity relays[16]; + struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create (); + GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, + "_foo", "bar baz", 7); + GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN, + "_foo_bar", "foo bar baz", 11); + slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin, + 16, relays, &method, &join, &slave_joined, + NULL, "_request_join", env, "some data", 9); + GNUNET_ENV_environment_destroy (env); } |