aboutsummaryrefslogtreecommitdiff
path: root/src/psyc/psyc_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2013-10-10 18:08:53 +0000
committerGabor X Toth <*@tg-x.net>2013-10-10 18:08:53 +0000
commit1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb (patch)
tree3cd28bfee831af0417c2dcbb543c03481517ad00 /src/psyc/psyc_api.c
parent67a8e21eedb6d35fec76841d4a1a6b4b41b37879 (diff)
PSYC: master msg transmission
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r--src/psyc/psyc_api.c218
1 files changed, 166 insertions, 52 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c
index abe7bb0288..4178d920ba 100644
--- a/src/psyc/psyc_api.c
+++ b/src/psyc/psyc_api.c
@@ -106,6 +106,28 @@ struct GNUNET_PSYC_Channel
* Are we currently transmitting a message?
*/
int in_transmit;
+
+ /**
+ * Is this a master or slave channel?
+ */
+ int is_master;
+
+ /**
+ * Buffer space available for transmitting the next data fragment.
+ */
+ uint16_t tmit_buf_avail;
+};
+
+
+/**
+ * Handle for a pending PSYC transmission operation.
+ */
+struct GNUNET_PSYC_MasterTransmitHandle
+{
+ struct GNUNET_PSYC_Master *master;
+ GNUNET_PSYC_MasterTransmitNotify notify;
+ void *notify_cls;
+ enum GNUNET_PSYC_DataStatus status;
};
@@ -116,6 +138,8 @@ struct GNUNET_PSYC_Master
{
struct GNUNET_PSYC_Channel ch;
+ struct GNUNET_PSYC_MasterTransmitHandle *tmit;
+
GNUNET_PSYC_MasterStartCallback start_cb;
uint64_t max_message_id;
@@ -146,19 +170,6 @@ struct GNUNET_PSYC_JoinHandle
/**
* Handle for a pending PSYC transmission operation.
*/
-struct GNUNET_PSYC_MasterTransmitHandle
-{
- struct GNUNET_PSYC_Master *master;
- const struct GNUNET_ENV_Environment *env;
- GNUNET_PSYC_MasterTransmitNotify notify;
- void *notify_cls;
- enum GNUNET_PSYC_MasterTransmitFlags flags;
-};
-
-
-/**
- * Handle for a pending PSYC transmission operation.
- */
struct GNUNET_PSYC_SlaveTransmitHandle
{
@@ -184,10 +195,10 @@ struct GNUNET_PSYC_StateQuery
/**
- * Try again to connect to the PSYCstore service.
+ * Try again to connect to the PSYC service.
*
- * @param cls handle to the PSYCstore service.
- * @param tc scheduler context
+ * @param cls Handle to the PSYC service.
+ * @param tc Scheduler context
*/
static void
reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
@@ -215,7 +226,7 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c)
}
c->in_receive = GNUNET_NO;
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling task to reconnect to PSYCstore service in %s.\n",
+ "Scheduling task to reconnect to PSYC service in %s.\n",
GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES));
c->reconnect_task =
GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c);
@@ -226,12 +237,56 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c)
/**
* Schedule transmission of the next message from our queue.
*
- * @param h PSYCstore handle
+ * @param h PSYC handle
*/
static void
transmit_next (struct GNUNET_PSYC_Channel *c);
+void
+master_transmit_data (struct GNUNET_PSYC_Master *mst)
+{
+ struct GNUNET_PSYC_Channel *ch = &mst->ch;
+ size_t data_size = ch->tmit_buf_avail;
+ struct GNUNET_PSYC_MessageData *pdata;
+ struct OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size);
+ pdata = (struct GNUNET_PSYC_MessageData *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) pdata;
+ pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
+
+ switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1]))
+ {
+ case GNUNET_NO:
+ mst->tmit->status = GNUNET_PSYC_DATA_CONT;
+ break;
+
+ case GNUNET_YES:
+ mst->tmit->status = GNUNET_PSYC_DATA_END;
+ break;
+
+ default:
+ mst->tmit->status = GNUNET_PSYC_DATA_CANCEL;
+ data_size = 0;
+ LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n");
+ }
+
+ if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size))
+ {
+ /* Transmission paused, nothing to send. */
+ GNUNET_free (op);
+ }
+ else
+ {
+ GNUNET_assert (data_size <= ch->tmit_buf_avail);
+ pdata->header.size = htons (sizeof (*pdata) + data_size);
+ pdata->status = htons (mst->tmit->status);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
+ transmit_next (ch);
+ }
+}
+
+
/**
* Type of a function to call when we receive a message
* from the service.
@@ -253,8 +308,8 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
}
uint16_t size_eq = 0;
uint16_t size_min = 0;
- const uint16_t size = ntohs (msg->size);
- const uint16_t type = ntohs (msg->type);
+ uint16_t size = ntohs (msg->size);
+ uint16_t type = ntohs (msg->type);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received message of type %d from PSYC service\n", type);
@@ -265,6 +320,9 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK:
size_eq = sizeof (struct CountersResult);
break;
+ case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+ size_eq = sizeof (struct TransmitAck);
+ break;
}
if (! ((0 < size_eq && size == size_eq)
@@ -276,6 +334,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
}
struct CountersResult *cres;
+ struct TransmitAck *tack;
switch (type)
{
@@ -294,17 +353,39 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
mst->join_ack_cb (ch->cb_cls, mst->max_message_id);
#endif
break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK:
+ tack = (struct TransmitAck *) msg;
+ if (ch->is_master)
+ {
+ GNUNET_assert (NULL != mst->tmit);
+ if (GNUNET_PSYC_DATA_CONT != mst->tmit->status
+ || NULL == mst->tmit->notify)
+ {
+ GNUNET_free (mst->tmit);
+ mst->tmit = NULL;
+ }
+ else
+ {
+ ch->tmit_buf_avail = ntohs (tack->buf_avail);
+ master_transmit_data (mst);
+ }
+ }
+ else
+ {
+ /* TODO: slave */
+ }
+ break;
}
GNUNET_CLIENT_receive (ch->client, &message_handler, ch,
GNUNET_TIME_UNIT_FOREVER_REL);
}
-
/**
* Transmit next message to service.
*
- * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
+ * @param cls The 'struct GNUNET_PSYC_Channel'.
* @param size Number of bytes available in buf.
* @param buf Where to copy the message.
* @return Number of bytes copied to buf.
@@ -326,7 +407,7 @@ send_next_message (void *cls, size_t size, void *buf)
return 0;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Sending message of type %d to PSYCstore service\n",
+ "Sending message of type %d to PSYC service\n",
ntohs (op->msg->type));
memcpy (buf, op->msg, ret);
@@ -349,7 +430,7 @@ send_next_message (void *cls, size_t size, void *buf)
/**
* Schedule transmission of the next message from our queue.
*
- * @param h PSYCstore handle.
+ * @param h PSYC handle.
*/
static void
transmit_next (struct GNUNET_PSYC_Channel *ch)
@@ -391,14 +472,12 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
if (NULL == ch->transmit_head ||
ch->transmit_head->msg->type != ch->reconnect_msg->type)
{
- struct OperationHandle *op
- = GNUNET_malloc (sizeof (struct OperationHandle)
- + ntohs (ch->reconnect_msg->size));
- memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size));
+ uint16_t reconn_size = ntohs (ch->reconnect_msg->size);
+ struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size);
+ memcpy (&op[1], ch->reconnect_msg, reconn_size);
op->msg = (struct GNUNET_MessageHeader *) &op[1];
GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
}
-
transmit_next (ch);
}
@@ -414,7 +493,12 @@ disconnect (void *c)
{
struct GNUNET_PSYC_Channel *ch = c;
GNUNET_assert (NULL != ch);
- GNUNET_assert (ch->transmit_head == ch->transmit_tail);
+ if (ch->transmit_head != ch->transmit_tail)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Disconnecting while there are still outstanding messages!\n");
+ GNUNET_break (0);
+ }
if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (ch->reconnect_task);
@@ -431,7 +515,10 @@ disconnect (void *c)
ch->client = NULL;
}
if (NULL != ch->reconnect_msg)
+ {
+ GNUNET_free (ch->reconnect_msg);
ch->reconnect_msg = NULL;
+ }
}
@@ -475,12 +562,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
struct GNUNET_PSYC_Channel *ch = &mst->ch;
struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req));
- req->header.size = htons (sizeof (*req) + sizeof (*channel_key));
+ req->header.size = htons (sizeof (*req));
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START);
req->channel_key = *channel_key;
req->policy = policy;
ch->cfg = cfg;
+ ch->is_master = GNUNET_YES;
ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst);
@@ -532,7 +620,7 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst)
void
GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh,
int is_admitted,
- unsigned int relay_count,
+ uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
@@ -556,13 +644,13 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod)
pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1];
op->msg = (struct GNUNET_MessageHeader *) pmod;
- pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER;
+ pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size);
pmod->name_size = htons (name_size);
memcpy (&pmod[1], mod->name, name_size);
- memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size);
+ memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size);
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
return GNUNET_YES;
}
@@ -594,29 +682,41 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst,
return NULL;
ch->in_transmit = GNUNET_YES;
+ size_t size = strlen (method_name) + 1;
struct GNUNET_PSYC_MessageMethod *pmeth;
- struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth));
+ struct OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size);
pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1];
op->msg = (struct GNUNET_MessageHeader *) pmeth;
- pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD;
- size_t size = strlen (method_name) + 1;
+ pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
pmeth->header.size = htons (sizeof (*pmeth) + size);
pmeth->flags = htonl (flags);
- pmeth->mod_count
- = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
+ pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env));
memcpy (&pmeth[1], method_name, size);
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
-
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
GNUNET_ENV_environment_iterate (env, send_modifier, mst);
+ transmit_next (ch);
+
+ mst->tmit = GNUNET_malloc (sizeof (*mst->tmit));
+ mst->tmit->master = mst;
+ mst->tmit->notify = notify;
+ mst->tmit->notify_cls = notify_cls;
+ mst->tmit->status = GNUNET_PSYC_DATA_CONT;
+ return mst->tmit;
+}
+
- struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th));
- th->master = mst;
- th->env = env;
- th->notify = notify;
- th->notify_cls = notify_cls;
- return th;
+/**
+ * Resume transmission to the channel.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+void
+GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+{
+ master_transmit_data (th->master);
}
@@ -671,7 +771,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
const struct GNUNET_CRYPTO_EccPrivateKey *slave_key,
const struct GNUNET_PeerIdentity *origin,
- size_t relay_count,
+ uint32_t relay_count,
const struct GNUNET_PeerIdentity *relays,
GNUNET_PSYC_Method method,
GNUNET_PSYC_JoinCallback join_cb,
@@ -680,7 +780,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
const char *method_name,
const struct GNUNET_ENV_Environment *env,
const void *data,
- size_t data_size)
+ uint16_t data_size)
{
struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv));
struct GNUNET_PSYC_Channel *ch = &slv->ch;
@@ -692,10 +792,12 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN);
req->channel_key = *channel_key;
req->slave_key = *slave_key;
+ req->origin = *origin;
req->relay_count = relay_count;
memcpy (&req[1], relays, relay_count * sizeof (*relays));
ch->cfg = cfg;
+ ch->is_master = GNUNET_NO;
ch->reconnect_msg = (struct GNUNET_MessageHeader *) req;
ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv);
@@ -746,6 +848,18 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave,
/**
+ * Resume transmission to the master.
+ *
+ * @param th Handle of the request that is being resumed.
+ */
+void
+GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th)
+{
+
+}
+
+
+/**
* Abort transmission request to master.
*
* @param th Handle of the request that is being aborted.
@@ -822,7 +936,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch,
slvadd->announced_at = GNUNET_htonll (announced_at);
slvadd->effective_since = GNUNET_htonll (effective_since);
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
transmit_next (ch);
}
@@ -863,7 +977,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch,
slvrm->header.size = htons (sizeof (*slvrm));
slvrm->announced_at = GNUNET_htonll (announced_at);
- GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op);
+ GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op);
transmit_next (ch);
}