aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2013-10-10 18:08:53 +0000
committerGabor X Toth <*@tg-x.net>2013-10-10 18:08:53 +0000
commit1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb (patch)
tree3cd28bfee831af0417c2dcbb543c03481517ad00 /src
parent67a8e21eedb6d35fec76841d4a1a6b4b41b37879 (diff)
PSYC: master msg transmission
Diffstat (limited to 'src')
-rw-r--r--src/include/gnunet_protocols.h37
-rw-r--r--src/include/gnunet_psyc_service.h12
-rw-r--r--src/multicast/multicast_api.c22
-rw-r--r--src/psyc/gnunet-service-psyc.c271
-rw-r--r--src/psyc/psyc.h7
-rw-r--r--src/psyc/psyc_api.c218
-rw-r--r--src/psyc/test_psyc.c90
7 files changed, 505 insertions, 152 deletions
diff --git a/src/include/gnunet_protocols.h b/src/include/gnunet_protocols.h
index bfc8f1ab31..84ac9dda1f 100644
--- a/src/include/gnunet_protocols.h
+++ b/src/include/gnunet_protocols.h
@@ -2083,48 +2083,31 @@ extern "C"
#define GNUNET_MESSAGE_TYPE_PSYC_CHANNEL_SLAVE_RM 690
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD 691
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 691
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER 692
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 692
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MOD_CONT 693
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 693
-#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA 694
+#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 694
#define GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK 695
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD 696
-
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER 697
-
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT 698
-
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA 699
-
-#define GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_ACK 700
-
-
#define GNUNET_MESSAGE_TYPE_PSYC_STORY_REQUEST 701
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_METHOD 702
-
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MODIFIER 703
-
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_MOD_CONT 704
-
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_DATA 705
+#define GNUNET_MESSAGE_TYPE_PSYC_STORY_RESPONSE 702
-#define GNUNET_MESSAGE_TYPE_PSYC_STORY_ACK 706
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET 703
-#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET 707
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX 704
-#define GNUNET_MESSAGE_TYPE_PSYC_STATE_GET_PREFIX 708
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_RESPONSE 705
-#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MODIFIER 709
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MODIFIER 706
-#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MOD_CONT 710
+#define GNUNET_MESSAGE_TYPE_PSYC_STATE_MOD_CONT 707
/*******************************************************************************
diff --git a/src/include/gnunet_psyc_service.h b/src/include/gnunet_psyc_service.h
index 7ac40a4c5d..758b5e3d10 100644
--- a/src/include/gnunet_psyc_service.h
+++ b/src/include/gnunet_psyc_service.h
@@ -207,8 +207,8 @@ struct GNUNET_PSYC_MessageMethod
uint32_t flags GNUNET_PACKED;
/**
- * Sending slave's public key. NULL if the message is from the master, or when
- * transmitting a message.
+ * Sending slave's public key.
+ * NULL if the message is from the master, or when transmitting a message.
*/
struct GNUNET_CRYPTO_EddsaPublicKey slave_key;
@@ -264,7 +264,7 @@ enum GNUNET_PSYC_DataStatus
struct GNUNET_PSYC_MessageData
{
/**
- * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER
+ * Type: GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA
*/
struct GNUNET_MessageHeader header;
@@ -367,7 +367,7 @@ typedef int
void
GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
int is_admitted,
- unsigned int relay_count,
+ uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
@@ -582,7 +582,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key,
const struct GNUNET_CRYPTO_EddsaPrivateKey *slave_key,
const struct GNUNET_PeerIdentity *origin,
- size_t relay_count,
+ uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_Method method,
GNUNET_PSYC_JoinCallback join_cb,
@@ -591,7 +591,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
const void *data,
- size_t data_size);
+ uint16_t data_size);
/**
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 3dbc07f5d4..49e648468b 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -30,7 +30,10 @@
#include "gnunet_multicast_service.h"
#include "multicast.h"
-/**
+#define LOG(kind,...) GNUNET_log_from (kind, "multicast-api",__VA_ARGS__)
+
+
+/**
* Handle for a request to send a message to all multicast group members
* (from the origin).
*/
@@ -38,6 +41,7 @@ struct GNUNET_MULTICAST_OriginMessageHandle
{
GNUNET_MULTICAST_OriginTransmitNotify notify;
void *notify_cls;
+ struct GNUNET_MULTICAST_Origin *origin;
uint64_t message_id;
uint64_t group_generation;
@@ -353,6 +357,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
static void
schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "schedule_origin_to_all()\n");
struct GNUNET_MULTICAST_Origin *orig = cls;
struct GNUNET_MULTICAST_OriginMessageHandle *mh = &orig->msg_handle;
@@ -361,12 +366,18 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
= GNUNET_malloc (sizeof (*msg) + buf_size);
int ret = mh->notify (mh->notify_cls, &buf_size, &msg[1]);
- if (ret != GNUNET_YES || ret != GNUNET_NO)
+ if (! (GNUNET_YES == ret || GNUNET_NO == ret)
+ || buf_size > GNUNET_MULTICAST_FRAGMENT_MAX_SIZE)
{
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "MasterTransmitNotify() returned error or invalid message size.\n");
/* FIXME: handle error */
return;
}
+ if (GNUNET_NO == ret && 0 == buf_size)
+ return; /* Transmission paused. */
+
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
msg->header.size = htons (buf_size);
msg->message_id = mh->message_id;
@@ -393,12 +404,12 @@ schedule_origin_to_all (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc
* returned signed message.
* FIXME: Also send to local members in this group.
*/
- orig->message_cb (orig->cls, msg);
+ orig->message_cb (orig->cls, (const struct GNUNET_MessageHeader *) msg);
if (GNUNET_NO == ret)
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_SECONDS, 1),
- schedule_origin_to_all, mh);
+ schedule_origin_to_all, orig);
}
@@ -421,6 +432,7 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin,
void *notify_cls)
{
struct GNUNET_MULTICAST_OriginMessageHandle *mh = &origin->msg_handle;
+ mh->origin = origin;
mh->message_id = message_id;
mh->group_generation = group_generation;
mh->notify = notify;
@@ -441,7 +453,7 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *origin,
void
GNUNET_MULTICAST_origin_to_all_resume (struct GNUNET_MULTICAST_OriginMessageHandle *mh)
{
-
+ GNUNET_SCHEDULER_add_now (schedule_origin_to_all, mh->origin);
}
diff --git a/src/psyc/gnunet-service-psyc.c b/src/psyc/gnunet-service-psyc.c
index 7f5189ab8f..d3f203ebf9 100644
--- a/src/psyc/gnunet-service-psyc.c
+++ b/src/psyc/gnunet-service-psyc.c
@@ -34,8 +34,6 @@
#include "gnunet_psyc_service.h"
#include "psyc.h"
-#define LOG(kind,...) GNUNET_log_from (kind, "psyc-api",__VA_ARGS__)
-
/**
* Handle to our current configuration.
@@ -58,6 +56,11 @@ static struct GNUNET_SERVER_NotificationContext *nc;
static struct GNUNET_PSYCSTORE_Handle *store;
/**
+ * channel's pub_key_hash -> struct Channel
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *clients;
+
+/**
* Message in the transmission queue.
*/
struct TransmitMessage
@@ -81,6 +84,7 @@ struct Channel
struct TransmitMessage *tmit_tail;
char *tmit_buf;
+ GNUNET_SCHEDULER_TaskIdentifier tmit_task;
uint32_t tmit_mod_count;
uint32_t tmit_mod_recvd;
uint16_t tmit_size;
@@ -96,8 +100,9 @@ struct Channel
struct Master
{
struct Channel channel;
- struct GNUNET_CRYPTO_EccPrivateKey private_key;
- struct GNUNET_CRYPTO_EccPublicSignKey public_key;
+ struct GNUNET_CRYPTO_EccPrivateKey priv_key;
+ struct GNUNET_CRYPTO_EccPublicSignKey pub_key;
+ struct GNUNET_HashCode pub_key_hash;
struct GNUNET_MULTICAST_Origin *origin;
struct GNUNET_MULTICAST_OriginMessageHandle *tmit_handle;
@@ -120,13 +125,20 @@ struct Slave
{
struct Channel channel;
struct GNUNET_CRYPTO_EccPrivateKey slave_key;
- struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+ struct GNUNET_CRYPTO_EccPublicSignKey chan_key;
+ struct GNUNET_HashCode chan_key_hash;
struct GNUNET_MULTICAST_Member *member;
struct GNUNET_MULTICAST_MemberRequestHandle *tmit_handle;
+ struct GNUNET_PeerIdentity origin;
+ struct GNUNET_PeerIdentity *relays;
+ struct GNUNET_MessageHeader *join_req;
+
uint64_t max_message_id;
uint64_t max_request_id;
+
+ uint32_t relay_count;
};
@@ -166,41 +178,151 @@ client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p disconnected\n", client);
- struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
- struct Channel);
- GNUNET_assert (NULL != ch);
+ struct Channel *ch
+ = GNUNET_SERVER_client_get_user_context (client, struct Channel);
+ if (NULL == ch)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "User context is NULL in client_disconnect()\n");
+ GNUNET_break (0);
+ return;
+ }
if (NULL != ch->tmit_buf)
{
GNUNET_free (ch->tmit_buf);
ch->tmit_buf = NULL;
}
+
+ if (ch->is_master)
+ {
+ struct Master *mst = (struct Master *) ch;
+ if (NULL != mst->origin)
+ GNUNET_MULTICAST_origin_stop (mst->origin);
+ }
+ else
+ {
+ struct Slave *slv = (struct Slave *) ch;
+ if (NULL != slv->join_req)
+ GNUNET_free (slv->join_req);
+ if (NULL != slv->relays)
+ GNUNET_free (slv->relays);
+ if (NULL != slv->member)
+ GNUNET_MULTICAST_member_part (slv->member);
+ }
+
GNUNET_free (ch);
}
+void
+join_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+ const struct GNUNET_MessageHeader *join_req,
+ struct GNUNET_MULTICAST_JoinHandle *jh)
+{
+
+}
void
-counters_cb (void *cls, uint64_t max_fragment_id, uint64_t max_message_id,
- uint64_t max_group_generation, uint64_t max_state_message_id)
+membership_test_cb (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+ uint64_t message_id, uint64_t group_generation,
+ struct GNUNET_MULTICAST_MembershipTestHandle *mth)
{
- struct Channel *ch = cls;
+
+}
+
+void
+replay_fragment_cb (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+ uint64_t fragment_id, uint64_t flags,
+ struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+
+}
+
+void
+replay_message_cb (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+ uint64_t message_id,
+ uint64_t fragment_offset,
+ uint64_t flags,
+ struct GNUNET_MULTICAST_ReplayHandle *rh)
+{
+
+}
+
+void
+request_cb (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *member_key,
+ const struct GNUNET_MessageHeader *req,
+ enum GNUNET_MULTICAST_MessageFlags flags)
+{
+
+}
+
+void
+message_cb (void *cls, const struct GNUNET_MessageHeader *msg)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from multicast.\n",
+ ntohs (msg->type));
+}
+
+void
+master_counters_cb (void *cls, int result, uint64_t max_fragment_id,
+ uint64_t max_message_id, uint64_t max_group_generation,
+ uint64_t max_state_message_id)
+{
+ struct Master *mst = cls;
+ struct Channel *ch = &mst->channel;
struct CountersResult *res = GNUNET_malloc (sizeof (*res));
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
res->header.size = htons (sizeof (*res));
+ res->result_code = htonl (result);
res->max_message_id = GNUNET_htonll (max_message_id);
- if (ch->is_master)
+ if (GNUNET_OK == result || GNUNET_NO == result)
{
- struct Master *mst = cls;
mst->max_message_id = max_message_id;
mst->max_state_message_id = max_state_message_id;
mst->max_group_generation = max_group_generation;
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START_ACK);
+ mst->origin
+ = GNUNET_MULTICAST_origin_start (cfg, &mst->priv_key,
+ max_fragment_id + 1,
+ join_cb, membership_test_cb,
+ replay_fragment_cb, replay_message_cb,
+ request_cb, message_cb, ch);
}
- else
+ GNUNET_SERVER_notification_context_add (nc, ch->client);
+ GNUNET_SERVER_notification_context_unicast (nc, ch->client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+}
+
+
+void
+slave_counters_cb (void *cls, int result, uint64_t max_fragment_id,
+ uint64_t max_message_id, uint64_t max_group_generation,
+ uint64_t max_state_message_id)
+{
+ struct Slave *slv = cls;
+ struct Channel *ch = &slv->channel;
+ struct CountersResult *res = GNUNET_malloc (sizeof (*res));
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+ res->header.size = htons (sizeof (*res));
+ res->result_code = htonl (result);
+ res->max_message_id = GNUNET_htonll (max_message_id);
+
+ if (GNUNET_OK == result || GNUNET_NO == result)
{
- struct Slave *slv = cls;
slv->max_message_id = max_message_id;
- res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK);
+ slv->member
+ = GNUNET_MULTICAST_member_join (cfg, &slv->chan_key, &slv->slave_key,
+ &slv->origin,
+ slv->relay_count, slv->relays,
+ slv->join_req, join_cb,
+ membership_test_cb,
+ replay_fragment_cb, replay_message_cb,
+ message_cb, ch);
}
GNUNET_SERVER_notification_context_add (nc, ch->client);
@@ -220,14 +342,17 @@ handle_master_start (void *cls, struct GNUNET_SERVER_Client *client,
mst->channel.client = client;
mst->channel.is_master = GNUNET_YES;
mst->policy = ntohl (req->policy);
- mst->private_key = req->channel_key;
- GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->private_key,
- &mst->public_key);
+ mst->priv_key = req->channel_key;
+ GNUNET_CRYPTO_ecc_key_get_public_for_signature (&mst->priv_key,
+ &mst->pub_key);
+ GNUNET_CRYPTO_hash (&mst->pub_key, sizeof (mst->pub_key), &mst->pub_key_hash);
- GNUNET_PSYCSTORE_counters_get (store, &mst->public_key,
- counters_cb, mst);
+ GNUNET_PSYCSTORE_counters_get (store, &mst->pub_key,
+ master_counters_cb, mst);
- GNUNET_SERVER_client_set_user_context (client, mst);
+ GNUNET_SERVER_client_set_user_context (client, &mst->channel);
+ GNUNET_CONTAINER_multihashmap_put (clients, &mst->pub_key_hash, mst,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -241,13 +366,25 @@ handle_slave_join (void *cls, struct GNUNET_SERVER_Client *client,
struct Slave *slv = GNUNET_new (struct Slave);
slv->channel.client = client;
slv->channel.is_master = GNUNET_NO;
- slv->channel_key = req->channel_key;
slv->slave_key = req->slave_key;
-
- GNUNET_PSYCSTORE_counters_get (store, &slv->channel_key,
- counters_cb, slv);
-
- GNUNET_SERVER_client_set_user_context (client, slv);
+ slv->chan_key = req->channel_key;
+ GNUNET_CRYPTO_hash (&slv->chan_key, sizeof (slv->chan_key),
+ &slv->chan_key_hash);
+ slv->origin = req->origin;
+ slv->relay_count = ntohl (req->relay_count);
+
+ const struct GNUNET_PeerIdentity *relays
+ = (const struct GNUNET_PeerIdentity *) &req[1];
+ slv->relays
+ = GNUNET_malloc (slv->relay_count * sizeof (struct GNUNET_PeerIdentity));
+ uint32_t i;
+ for (i = 0; i < slv->relay_count; i++)
+ memcpy (&slv->relays[i], &relays[i], sizeof (*relays));
+
+ GNUNET_PSYCSTORE_counters_get (store, &slv->chan_key,
+ slave_counters_cb, slv);
+
+ GNUNET_SERVER_client_set_user_context (client, &slv->channel);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -268,34 +405,40 @@ send_transmit_ack (struct Channel *ch)
static int
-transmit_notify (void *cls, uint64_t fragment_id, size_t *data_size, void *data)
+transmit_notify (void *cls, size_t *data_size, void *data)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "transmit_notify()\n");
struct Channel *ch = cls;
struct TransmitMessage *msg = ch->tmit_head;
- if (NULL == msg || *data_size < msg->size)
+ if (NULL == msg || *data_size < ntohs (msg->size))
{
*data_size = 0;
return GNUNET_NO;
}
- memcpy (data, msg->buf, msg->size);
- *data_size = msg->size;
+ *data_size = ntohs (msg->size);
+ memcpy (data, msg->buf, *data_size);
GNUNET_free (ch->tmit_buf);
+ ch->tmit_buf = NULL;
GNUNET_CONTAINER_DLL_remove (ch->tmit_head, ch->tmit_tail, msg);
return (GNUNET_YES == ch->in_transmit) ? GNUNET_NO : GNUNET_YES;
}
-static int
-master_transmit_message (struct Master *mst)
+static void
+master_transmit_message (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "master_transmit_message()\n");
+ struct Master *mst = cls;
+ mst->channel.tmit_task = 0;
if (NULL == mst->tmit_handle)
{
mst->tmit_handle
- = GNUNET_MULTICAST_origin_to_all (mst->origin, mst->max_message_id,
+ = GNUNET_MULTICAST_origin_to_all (mst->origin, ++mst->max_message_id,
mst->max_group_generation,
transmit_notify, mst);
}
@@ -303,24 +446,25 @@ master_transmit_message (struct Master *mst)
{
GNUNET_MULTICAST_origin_to_all_resume (mst->tmit_handle);
}
- return GNUNET_OK;
}
-static int
-slave_transmit_message (struct Slave *slv)
+static void
+slave_transmit_message (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ struct Slave *slv = cls;
+ slv->channel.tmit_task = 0;
if (NULL == slv->tmit_handle)
{
slv->tmit_handle
- = GNUNET_MULTICAST_member_to_origin(slv->member, slv->max_request_id,
+ = GNUNET_MULTICAST_member_to_origin(slv->member, ++slv->max_request_id,
transmit_notify, slv);
}
else
{
GNUNET_MULTICAST_member_to_origin_resume (slv->tmit_handle);
}
- return GNUNET_OK;
}
@@ -328,6 +472,7 @@ static int
buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
{
uint16_t size = ntohs (msg->size);
+ struct GNUNET_TIME_Relative tmit_delay = GNUNET_TIME_UNIT_ZERO;
if (GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < size)
return GNUNET_SYSERR;
@@ -353,12 +498,17 @@ buffer_message (struct Channel *ch, const struct GNUNET_MessageHeader *msg)
tmit_msg->size = size;
tmit_msg->status = ch->tmit_status;
GNUNET_CONTAINER_DLL_insert_tail (ch->tmit_head, ch->tmit_tail, tmit_msg);
-
- ch->is_master
- ? master_transmit_message ((struct Master *) ch)
- : slave_transmit_message ((struct Slave *) ch);
+ tmit_delay = GNUNET_TIME_UNIT_ZERO;
}
+ if (0 != ch->tmit_task)
+ GNUNET_SCHEDULER_cancel (ch->tmit_task);
+
+ ch->tmit_task
+ = ch->is_master
+ ? GNUNET_SCHEDULER_add_delayed (tmit_delay, master_transmit_message, ch)
+ : GNUNET_SCHEDULER_add_delayed (tmit_delay, slave_transmit_message, ch);
+
return GNUNET_OK;
}
@@ -368,8 +518,8 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
{
const struct GNUNET_PSYC_MessageMethod *meth
= (const struct GNUNET_PSYC_MessageMethod *) msg;
- struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
- struct Channel);
+ struct Channel *ch
+ = GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != ch);
if (GNUNET_NO != ch->in_transmit)
@@ -378,6 +528,7 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
return;
}
+ ch->in_transmit = GNUNET_YES;
ch->tmit_buf = NULL;
ch->tmit_size = 0;
ch->tmit_mod_recvd = 0;
@@ -388,6 +539,8 @@ handle_transmit_method (void *cls, struct GNUNET_SERVER_Client *client,
if (0 == ch->tmit_mod_count)
send_transmit_ack (ch);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
@@ -397,8 +550,8 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
{
const struct GNUNET_PSYC_MessageModifier *mod
= (const struct GNUNET_PSYC_MessageModifier *) msg;
- struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
- struct Channel);
+ struct Channel *ch
+ = GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != ch);
ch->tmit_mod_recvd++;
@@ -406,6 +559,8 @@ handle_transmit_modifier (void *cls, struct GNUNET_SERVER_Client *client,
if (ch->tmit_mod_recvd == ch->tmit_mod_count)
send_transmit_ack (ch);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
@@ -415,13 +570,18 @@ handle_transmit_data (void *cls, struct GNUNET_SERVER_Client *client,
{
const struct GNUNET_PSYC_MessageData *data
= (const struct GNUNET_PSYC_MessageData *) msg;
- struct Channel *ch = GNUNET_SERVER_client_get_user_context (client,
- struct Channel);
+ struct Channel *ch
+ = GNUNET_SERVER_client_get_user_context (client, struct Channel);
GNUNET_assert (NULL != ch);
- ch->tmit_status = data->status;
+ ch->tmit_status = ntohs (data->status);
buffer_message (ch, msg);
send_transmit_ack (ch);
+
+ if (GNUNET_PSYC_DATA_CONT != ch->tmit_status)
+ ch->in_transmit = GNUNET_NO;
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
};
@@ -444,13 +604,13 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN, 0 },
{ &handle_transmit_method, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD, 0 },
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD, 0 },
{ &handle_transmit_modifier, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER, 0 },
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER, 0 },
{ &handle_transmit_data, NULL,
- GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_DATA, 0 },
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA, 0 },
{ NULL, NULL, 0, 0 }
};
@@ -458,6 +618,7 @@ run (void *cls, struct GNUNET_SERVER_Handle *server,
cfg = c;
store = GNUNET_PSYCSTORE_connect (cfg);
stats = GNUNET_STATISTICS_create ("psyc", cfg);
+ clients = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
nc = GNUNET_SERVER_notification_context_create (server, 1);
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SERVER_disconnect_notify (server, &client_disconnect, NULL);
diff --git a/src/psyc/psyc.h b/src/psyc/psyc.h
index 35e9ae8006..6a88263375 100644
--- a/src/psyc/psyc.h
+++ b/src/psyc/psyc.h
@@ -65,6 +65,11 @@ struct CountersResult
*/
struct GNUNET_MessageHeader header;
+ /**
+ * Status code for the operation.
+ */
+ int32_t result_code GNUNET_PACKED;
+
uint64_t max_message_id;
};
@@ -121,6 +126,8 @@ struct SlaveJoinRequest
struct GNUNET_CRYPTO_EccPrivateKey slave_key;
+ struct GNUNET_PeerIdentity origin;
+
/* Followed by struct GNUNET_PeerIdentity relays[relay_count] */
};
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index abe7bb0288..4178d920ba 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -106,6 +106,28 @@ struct GNUNET_PSYC_Channel
* Are we currently transmitting a message?
*/
int in_transmit;
+
+ /**
+ * Is this a master or slave channel?
+ */
+ int is_master;
+
+ /**
+ * Buffer space available for transmitting the next data fragment.
+ */
+ uint16_t tmit_buf_avail;
+};
+
+
+/**
+ * Handle for a pending PSYC transmission operation.
+ */
+struct GNUNET_PSYC_MasterTransmitHandle
+{
+ struct GNUNET_PSYC_Master *master;
+ GNUNET_PSYC_MasterTransmitNotify notify;
+ void *notify_cls;
+ enum GNUNET_PSYC_DataStatus status;
};
@@ -116,6 +138,8 @@ struct GNUNET_PSYC_Master
{
struct GNUNET_PSYC_Channel ch;
+ struct GNUNET_PSYC_MasterTransmitHandle *tmit;
+
GNUNET_PSYC_MasterStartCallback start_cb;
uint64_t max_message_id;
@@ -146,19 +170,6 @@ struct GNUNET_PSYC_JoinHandle
/**
* Handle for a pending PSYC transmission operation.
*/
-struct GNUNET_PSYC_MasterTransmitHandle
-{
- struct GNUNET_PSYC_Master *master;
- const struct GNUNET_ENV_Environment *env;
- GNUNET_PSYC_MasterTransmitNotify notify;
- void *notify_cls;
- enum GNUNET_PSYC_MasterTransmitFlags flags;
-};
-
-
-/**
- * Handle for a pending PSYC transmission operation.
- */
struct GNUNET_PSYC_SlaveTransmitHandle
{
@@ -184,10 +195,10 @@ struct GNUNET_PSYC_StateQuery
/**
- * Try again to connect to the PSYCstore service.
+ * Try again to connect to the PSYC service.
*
- * @param cls handle to the PSYCstore service.
- * @param tc scheduler context
+ * @param cls Handle to the PSYC service.
+ * @param tc Scheduler context
*/
static void
reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
@@ -215,7 +226,7 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c)
}
c->in_receive = GNUNET_NO;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling task to reconnect to PSYCstore service in %s.\n",
+ "Scheduling task to reconnect to PSYC service in %s.\n",
GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES));
c->reconnect_task =
GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c);
@@ -226,12 +237,56 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c)
/**
* Schedule transmission of the next message from our queue.
*
- * @param h PSYCstore handle
+ * @param h PSYC handle
*/
static void
transmit_next (struct GNUNET_PSYC_Channel *c);
+void
+master_transmit_data (struct GNUNET_PSYC_Master *mst)
+{
+ struct GNUNET_PSYC_Channel *ch = &mst->ch;
+ size_t data_size = ch->tmit_buf_avail;
+ struct GNUNET_PSYC_MessageData *pdata;
+ struct OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size);
+ pdata = (struct GNUNET_PSYC_MessageData *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) pdata;
+ pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
+
+ switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1]))
+ {
+ case GNUNET_NO:
+ mst->tmit->status = GNUNET_PSYC_DATA_CONT;
+ break;
+
+ case GNUNET_YES:
+ mst->tmit->status = GNUNET_PSYC_DATA_END;
+ break;
+
+ default:
+ mst->tmit->status = GNUNET_PSYC_DATA_CANCEL;
+ data_size = 0;
+ LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n");
+ }
+
+ if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size))
+ {
+ /* Transmission paused, nothing to send. */
+ GNUNET_free (op);
+ }
+ else
+ {
+ GNUNET_assert (data_size <= ch->tmit_buf_avail);
+ pdata->header.size = htons (sizeof (*pdata) + data_size);
+ pdata->status = htons (mst->tmit->status);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
+ transmit_next (ch);
+ }
+}
+
+
/**
* Type of a function to call when we receive a message
* from the service.
@@ -253,8 +308,8 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
}
uint16_t size_eq = 0;
uint16_t size_min = 0;
- const uint16_t size = ntohs (msg->size);
- const uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+ uint16_t type = ntohs (msg->type);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received message of type %d from PSYC service\n", type);
@@ -265,6 +320,9 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
size_eq = sizeof (struct CountersResult);
break;
+ case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+ size_eq = sizeof (struct TransmitAck);
+ break;
}
if (! ((0 < size_eq && size == size_eq)
@@ -276,6 +334,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
}
struct CountersResult *cres;
+ struct TransmitAck *tack;
switch (type)
{
@@ -294,17 +353,39 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
#endif
break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+ tack = (struct TransmitAck *) msg;
+ if (ch->is_master)
+ {
+ GNUNET_assert (NULL != mst->tmit);
+ if (GNUNET_PSYC_DATA_CONT != mst->tmit->status
+ || NULL == mst->tmit->notify)
+ {
+ GNUNET_free (mst->tmit);
+ mst->tmit = NULL;
+ }
+ else
+ {
+ ch->tmit_buf_avail = ntohs (tack->buf_avail);
+ master_transmit_data (mst);
+ }
+ }
+ else
+ {
+ /* TODO: slave */
+ }
+ break;
}
GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
GNUNET_TIME_UNIT_FOREVER_REL);
}
-
/**
* Transmit next message to service.
*
- * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
+ * @param cls The 'struct GNUNET_PSYC_Channel'.
* @param size Number of bytes available in buf.
* @param buf Where to copy the message.
* @return Number of bytes copied to buf.
@@ -326,7 +407,7 @@ send_next_message (void *cls, size_t size, void *buf)
return 0;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending message of type %d to PSYCstore service\n",
+ "Sending message of type %d to PSYC service\n",
ntohs (op->msg->type));
memcpy (buf, op->msg, ret);
@@ -349,7 +430,7 @@ send_next_message (void *cls, size_t size, void *buf)
/**
* Schedule transmission of the next message from our queue.
*
- * @param h PSYCstore handle.
+ * @param h PSYC handle.
*/
static void
transmit_next (struct GNUNET_PSYC_Channel *ch)
@@ -391,14 +472,12 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
if (NULL == ch->transmit_head ||
ch->transmit_head->msg->type != ch->reconnect_msg->type)
{
- struct OperationHandle *op
- = GNUNET_malloc (sizeof (struct OperationHandle)
- + ntohs (ch->reconnect_msg->size));
- memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size));
+ uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
+ struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
+ memcpy (&op[1], ch->reconnect_msg, reconn_size);
op->msg = (struct GNUNET_MessageHeader *) &op[1];
GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
}
-
transmit_next (ch);
}
@@ -414,7 +493,12 @@ disconnect (void *c)
{
struct GNUNET_PSYC_Channel *ch = c;
GNUNET_assert (NULL != ch);
- GNUNET_assert (ch->transmit_head == ch->transmit_tail);
+ if (ch->transmit_head != ch->transmit_tail)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Disconnecting while there are still outstanding messages!\n");
+ GNUNET_break (0);
+ }
if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (ch->reconnect_task);
@@ -431,7 +515,10 @@ disconnect (void *c)
ch->client = NULL;
}
if (NULL != ch->reconnect_msg)
+ {
+ GNUNET_free (ch->reconnect_msg);
ch->reconnect_msg = NULL;
+ }
}
@@ -475,12 +562,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
struct GNUNET_PSYC_Channel *ch = &mst->ch;
struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
- req->header.size = htons (sizeof (*req) + sizeof (*channel_key));
+ req->header.size = htons (sizeof (*req));
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
req->channel_key = *channel_key;
req->policy = policy;
ch->cfg = cfg;
+ ch->is_master = GNUNET_YES;
ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
@@ -532,7 +620,7 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
void
GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
int is_admitted,
- unsigned int relay_count,
+ uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
@@ -556,13 +644,13 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
op->msg = (struct GNUNET_MessageHeader *) pmod;
- pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER;
+ pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
pmod->name_size = htons (name_size);
memcpy (&pmod[1], mod->name, name_size);
- memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size);
+ memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
return GNUNET_YES;
}
@@ -594,29 +682,41 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
return NULL;
ch->in_transmit = GNUNET_YES;
+ size_t size = strlen (method_name) + 1;
struct GNUNET_PSYC_MessageMethod *pmeth;
- struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth));
+ struct OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
op->msg = (struct GNUNET_MessageHeader *) pmeth;
- pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD;
- size_t size = strlen (method_name) + 1;
+ pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
pmeth->header.size = htons (sizeof (*pmeth) + size);
pmeth->flags = htonl (flags);
- pmeth->mod_count
- = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
+ pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
memcpy (&pmeth[1], method_name, size);
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
-
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
GNUNET_ENV_environment_iterate (env, send_modifier, mst);
+ transmit_next (ch);
+
+ mst->tmit = GNUNET_malloc (sizeof (*mst->tmit));
+ mst->tmit->master = mst;
+ mst->tmit->notify = notify;
+ mst->tmit->notify_cls = notify_cls;
+ mst->tmit->status = GNUNET_PSYC_DATA_CONT;
+ return mst->tmit;
+}
+
- struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th));
- th->master = mst;
- th->env = env;
- th->notify = notify;
- th->notify_cls = notify_cls;
- return th;
+/**
+ * Resume transmission to the channel.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+void
+GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+{
+ master_transmit_data (th->master);
}
@@ -671,7 +771,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
const struct GNUNET_CRYPTO_EccPrivateKey *slave_key,
const struct GNUNET_PeerIdentity *origin,
- size_t relay_count,
+ uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_Method method,
GNUNET_PSYC_JoinCallback join_cb,
@@ -680,7 +780,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
const void *data,
- size_t data_size)
+ uint16_t data_size)
{
struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
struct GNUNET_PSYC_Channel *ch = &slv->ch;
@@ -692,10 +792,12 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
req->channel_key = *channel_key;
req->slave_key = *slave_key;
+ req->origin = *origin;
req->relay_count = relay_count;
memcpy (&req[1], relays, relay_count * sizeof (*relays));
ch->cfg = cfg;
+ ch->is_master = GNUNET_NO;
ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
@@ -746,6 +848,18 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
/**
+ * Resume transmission to the master.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+void
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+{
+
+}
+
+
+/**
* Abort transmission request to master.
*
* @param th Handle of the request that is being aborted.
@@ -822,7 +936,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch,
slvadd->announced_at = GNUNET_htonll (announced_at);
slvadd->effective_since = GNUNET_htonll (effective_since);
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
transmit_next (ch);
}
@@ -863,7 +977,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch,
slvrm->header.size = htons (sizeof (*slvrm));
slvrm->announced_at = GNUNET_htonll (announced_at);
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
transmit_next (ch);
}
diff --git a/src/psyc/test_psyc.c b/src/psyc/test_psyc.c
index b5bc6d1355..1d7035a87e 100644
--- a/src/psyc/test_psyc.c
+++ b/src/psyc/test_psyc.c
@@ -19,8 +19,8 @@
*/
/**
- * @file psycstore/test_psycstore.c
- * @brief Test for the PSYCstore service.
+ * @file psyc/test_psyc.c
+ * @brief Test for the PSYC service.
* @author Gabor X Toth
* @author Christian Grothoff
*/
@@ -30,6 +30,7 @@
#include "gnunet_common.h"
#include "gnunet_util_lib.h"
#include "gnunet_testing_lib.h"
+#include "gnunet_env_lib.h"
#include "gnunet_psyc_service.h"
#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
@@ -59,6 +60,8 @@ static struct GNUNET_CRYPTO_EccPrivateKey *slave_key;
static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key;
static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key;
+struct GNUNET_PSYC_MasterTransmitHandle *mth;
+
/**
* Clean up all resources used.
*/
@@ -120,11 +123,14 @@ end ()
static int
method (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
- uint64_t message_id, const char *method_name,
+ uint64_t message_id, const char *name,
size_t modifier_count, const struct GNUNET_ENV_Modifier *modifiers,
uint64_t data_offset, const void *data, size_t data_size,
enum GNUNET_PSYC_MessageFlags flags)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Method: %s, modifiers: %lu, flags: %u\n%.*s\n",
+ name, modifier_count, flags, data_size, data);
return GNUNET_OK;
}
@@ -138,11 +144,72 @@ join (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
return GNUNET_OK;
}
+struct TransmitClosure
+{
+ struct GNUNET_PSYC_MasterTransmitHandle *handle;
+ uint8_t n;
+ uint8_t fragment_count;
+ char *fragments[16];
+ uint16_t fragment_sizes[16];
+};
+
+
+static void
+transmit_resume (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Transmit resume\n");
+ struct TransmitClosure *tmit = cls;
+ GNUNET_PSYC_master_transmit_resume (tmit->handle);
+}
+
+
+static int
+transmit_notify (void *cls, size_t *data_size, void *data)
+{
+ struct TransmitClosure *tmit = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Transmit notify: %lu bytes\n", *data_size);
+
+ if (tmit->fragment_count <= tmit->n)
+ return GNUNET_YES;
+
+ GNUNET_assert (tmit->fragment_sizes[tmit->n] <= *data_size);
+
+ *data_size = tmit->fragment_sizes[tmit->n];
+ memcpy (data, tmit->fragments[tmit->n], *data_size);
+ tmit->n++;
+
+ if (tmit->n == tmit->fragment_count - 1)
+ {
+ /* Send last fragment later. */
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &transmit_resume,
+ tmit);
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+ return tmit->n <= tmit->fragment_count ? GNUNET_NO : GNUNET_YES;
+}
void
master_started (void *cls, uint64_t max_message_id)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Master started: %lu\n", max_message_id);
+
+ struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
+ GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo", "bar baz", 7);
+ GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo_bar", "foo bar baz", 11);
+
+ struct TransmitClosure *tmit = GNUNET_new (struct TransmitClosure);
+ tmit->fragment_count = 2;
+ tmit->fragments[0] = "foo bar";
+ tmit->fragment_sizes[0] = 7;
+ tmit->fragments[1] = "baz!";
+ tmit->fragment_sizes[1] = 4;
+ tmit->handle
+ = GNUNET_PSYC_master_transmit (mst, "_test", env, transmit_notify, tmit,
+ GNUNET_PSYC_MASTER_TRANSMIT_INC_GROUP_GEN);
}
@@ -157,7 +224,7 @@ slave_joined (void *cls, uint64_t max_message_id)
* Main function of the test, run from scheduler.
*
* @param cls NULL
- * @param cfg configuration we use (also to connect to PSYCstore service)
+ * @param cfg configuration we use (also to connect to PSYC service)
* @param peer handle to access more of the peer (not used)
*/
static void
@@ -182,9 +249,18 @@ run (void *cls,
mst = GNUNET_PSYC_master_start (cfg, channel_key,
GNUNET_PSYC_CHANNEL_PRIVATE,
&method, &join, &master_started, NULL);
-
- slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key,
- &method, &join, &slave_joined, NULL);
+ return;
+ struct GNUNET_PeerIdentity origin;
+ struct GNUNET_PeerIdentity relays[16];
+ struct GNUNET_ENV_Environment *env = GNUNET_ENV_environment_create ();
+ GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo", "bar baz", 7);
+ GNUNET_ENV_environment_add_mod (env, GNUNET_ENV_OP_ASSIGN,
+ "_foo_bar", "foo bar baz", 11);
+ slv = GNUNET_PSYC_slave_join (cfg, &channel_pub_key, slave_key, &origin,
+ 16, relays, &method, &join, &slave_joined,
+ NULL, "_request_join", env, "some data", 9);
+ GNUNET_ENV_environment_destroy (env);
}