diff options
author | Gabor X Toth <*@tg-x.net> | 2013-10-10 18:08:53 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2013-10-10 18:08:53 +0000 |
commit | 1b5d4aba9d8bbcb62c93f96782e3567f6f79d0cb (patch) | |
tree | 3cd28bfee831af0417c2dcbb543c03481517ad00 /src/psyc/psyc_api.c | |
parent | 67a8e21eedb6d35fec76841d4a1a6b4b41b37879 (diff) |
PSYC: master msg transmission
Diffstat (limited to 'src/psyc/psyc_api.c')
-rw-r--r-- | src/psyc/psyc_api.c | 218 |
1 files changed, 166 insertions, 52 deletions
diff --git a/src/psyc/psyc_api.c b/src/psyc/psyc_api.c index abe7bb0288..4178d920ba 100644 --- a/src/psyc/psyc_api.c +++ b/src/psyc/psyc_api.c @@ -106,6 +106,28 @@ struct GNUNET_PSYC_Channel * Are we currently transmitting a message? */ int in_transmit; + + /** + * Is this a master or slave channel? + */ + int is_master; + + /** + * Buffer space available for transmitting the next data fragment. + */ + uint16_t tmit_buf_avail; +}; + + +/** + * Handle for a pending PSYC transmission operation. + */ +struct GNUNET_PSYC_MasterTransmitHandle +{ + struct GNUNET_PSYC_Master *master; + GNUNET_PSYC_MasterTransmitNotify notify; + void *notify_cls; + enum GNUNET_PSYC_DataStatus status; }; @@ -116,6 +138,8 @@ struct GNUNET_PSYC_Master { struct GNUNET_PSYC_Channel ch; + struct GNUNET_PSYC_MasterTransmitHandle *tmit; + GNUNET_PSYC_MasterStartCallback start_cb; uint64_t max_message_id; @@ -146,19 +170,6 @@ struct GNUNET_PSYC_JoinHandle /** * Handle for a pending PSYC transmission operation. */ -struct GNUNET_PSYC_MasterTransmitHandle -{ - struct GNUNET_PSYC_Master *master; - const struct GNUNET_ENV_Environment *env; - GNUNET_PSYC_MasterTransmitNotify notify; - void *notify_cls; - enum GNUNET_PSYC_MasterTransmitFlags flags; -}; - - -/** - * Handle for a pending PSYC transmission operation. - */ struct GNUNET_PSYC_SlaveTransmitHandle { @@ -184,10 +195,10 @@ struct GNUNET_PSYC_StateQuery /** - * Try again to connect to the PSYCstore service. + * Try again to connect to the PSYC service. * - * @param cls handle to the PSYCstore service. - * @param tc scheduler context + * @param cls Handle to the PSYC service. + * @param tc Scheduler context */ static void reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); @@ -215,7 +226,7 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c) } c->in_receive = GNUNET_NO; LOG (GNUNET_ERROR_TYPE_DEBUG, - "Scheduling task to reconnect to PSYCstore service in %s.\n", + "Scheduling task to reconnect to PSYC service in %s.\n", GNUNET_STRINGS_relative_time_to_string (c->reconnect_delay, GNUNET_YES)); c->reconnect_task = GNUNET_SCHEDULER_add_delayed (c->reconnect_delay, &reconnect, c); @@ -226,12 +237,56 @@ reschedule_connect (struct GNUNET_PSYC_Channel *c) /** * Schedule transmission of the next message from our queue. * - * @param h PSYCstore handle + * @param h PSYC handle */ static void transmit_next (struct GNUNET_PSYC_Channel *c); +void +master_transmit_data (struct GNUNET_PSYC_Master *mst) +{ + struct GNUNET_PSYC_Channel *ch = &mst->ch; + size_t data_size = ch->tmit_buf_avail; + struct GNUNET_PSYC_MessageData *pdata; + struct OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*pdata) + data_size); + pdata = (struct GNUNET_PSYC_MessageData *) &op[1]; + op->msg = (struct GNUNET_MessageHeader *) pdata; + pdata->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA); + + switch (mst->tmit->notify (mst->tmit->notify_cls, &data_size, &pdata[1])) + { + case GNUNET_NO: + mst->tmit->status = GNUNET_PSYC_DATA_CONT; + break; + + case GNUNET_YES: + mst->tmit->status = GNUNET_PSYC_DATA_END; + break; + + default: + mst->tmit->status = GNUNET_PSYC_DATA_CANCEL; + data_size = 0; + LOG (GNUNET_ERROR_TYPE_ERROR, "MasterTransmitNotify returned error\n"); + } + + if ((GNUNET_PSYC_DATA_CONT == mst->tmit->status && 0 == data_size)) + { + /* Transmission paused, nothing to send. */ + GNUNET_free (op); + } + else + { + GNUNET_assert (data_size <= ch->tmit_buf_avail); + pdata->header.size = htons (sizeof (*pdata) + data_size); + pdata->status = htons (mst->tmit->status); + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); + transmit_next (ch); + } +} + + /** * Type of a function to call when we receive a message * from the service. @@ -253,8 +308,8 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) } uint16_t size_eq = 0; uint16_t size_min = 0; - const uint16_t size = ntohs (msg->size); - const uint16_t type = ntohs (msg->type); + uint16_t size = ntohs (msg->size); + uint16_t type = ntohs (msg->type); LOG (GNUNET_ERROR_TYPE_DEBUG, "Received message of type %d from PSYC service\n", type); @@ -265,6 +320,9 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) case GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN_ACK: size_eq = sizeof (struct CountersResult); break; + case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: + size_eq = sizeof (struct TransmitAck); + break; } if (! ((0 < size_eq && size == size_eq) @@ -276,6 +334,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) } struct CountersResult *cres; + struct TransmitAck *tack; switch (type) { @@ -294,17 +353,39 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg) mst->join_ack_cb (ch->cb_cls, mst->max_message_id); #endif break; + + case GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_ACK: + tack = (struct TransmitAck *) msg; + if (ch->is_master) + { + GNUNET_assert (NULL != mst->tmit); + if (GNUNET_PSYC_DATA_CONT != mst->tmit->status + || NULL == mst->tmit->notify) + { + GNUNET_free (mst->tmit); + mst->tmit = NULL; + } + else + { + ch->tmit_buf_avail = ntohs (tack->buf_avail); + master_transmit_data (mst); + } + } + else + { + /* TODO: slave */ + } + break; } GNUNET_CLIENT_receive (ch->client, &message_handler, ch, GNUNET_TIME_UNIT_FOREVER_REL); } - /** * Transmit next message to service. * - * @param cls The 'struct GNUNET_PSYCSTORE_Handle'. + * @param cls The 'struct GNUNET_PSYC_Channel'. * @param size Number of bytes available in buf. * @param buf Where to copy the message. * @return Number of bytes copied to buf. @@ -326,7 +407,7 @@ send_next_message (void *cls, size_t size, void *buf) return 0; } LOG (GNUNET_ERROR_TYPE_DEBUG, - "Sending message of type %d to PSYCstore service\n", + "Sending message of type %d to PSYC service\n", ntohs (op->msg->type)); memcpy (buf, op->msg, ret); @@ -349,7 +430,7 @@ send_next_message (void *cls, size_t size, void *buf) /** * Schedule transmission of the next message from our queue. * - * @param h PSYCstore handle. + * @param h PSYC handle. */ static void transmit_next (struct GNUNET_PSYC_Channel *ch) @@ -391,14 +472,12 @@ reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) if (NULL == ch->transmit_head || ch->transmit_head->msg->type != ch->reconnect_msg->type) { - struct OperationHandle *op - = GNUNET_malloc (sizeof (struct OperationHandle) - + ntohs (ch->reconnect_msg->size)); - memcpy (&op[1], ch->reconnect_msg, ntohs (ch->reconnect_msg->size)); + uint16_t reconn_size = ntohs (ch->reconnect_msg->size); + struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + reconn_size); + memcpy (&op[1], ch->reconnect_msg, reconn_size); op->msg = (struct GNUNET_MessageHeader *) &op[1]; GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); } - transmit_next (ch); } @@ -414,7 +493,12 @@ disconnect (void *c) { struct GNUNET_PSYC_Channel *ch = c; GNUNET_assert (NULL != ch); - GNUNET_assert (ch->transmit_head == ch->transmit_tail); + if (ch->transmit_head != ch->transmit_tail) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Disconnecting while there are still outstanding messages!\n"); + GNUNET_break (0); + } if (ch->reconnect_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (ch->reconnect_task); @@ -431,7 +515,10 @@ disconnect (void *c) ch->client = NULL; } if (NULL != ch->reconnect_msg) + { + GNUNET_free (ch->reconnect_msg); ch->reconnect_msg = NULL; + } } @@ -475,12 +562,13 @@ GNUNET_PSYC_master_start (const struct GNUNET_CONFIGURATION_Handle *cfg, struct GNUNET_PSYC_Channel *ch = &mst->ch; struct MasterStartRequest *req = GNUNET_malloc (sizeof (*req)); - req->header.size = htons (sizeof (*req) + sizeof (*channel_key)); + req->header.size = htons (sizeof (*req)); req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MASTER_START); req->channel_key = *channel_key; req->policy = policy; ch->cfg = cfg; + ch->is_master = GNUNET_YES; ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, mst); @@ -532,7 +620,7 @@ GNUNET_PSYC_master_stop (struct GNUNET_PSYC_Master *mst) void GNUNET_PSYC_join_decision (struct GNUNET_PSYC_JoinHandle *jh, int is_admitted, - unsigned int relay_count, + uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, const char *method_name, const struct GNUNET_ENV_Environment *env, @@ -556,13 +644,13 @@ send_modifier (void *cls, struct GNUNET_ENV_Modifier *mod) pmod = (struct GNUNET_PSYC_MessageModifier *) &op[1]; op->msg = (struct GNUNET_MessageHeader *) pmod; - pmod->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_MODIFIER; + pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER); pmod->header.size = htons (sizeof (*pmod) + name_size + mod->value_size); pmod->name_size = htons (name_size); memcpy (&pmod[1], mod->name, name_size); - memcpy ((void *) &pmod[1] + name_size, mod->value, mod->value_size); + memcpy ((char *) &pmod[1] + name_size, mod->value, mod->value_size); - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); return GNUNET_YES; } @@ -594,29 +682,41 @@ GNUNET_PSYC_master_transmit (struct GNUNET_PSYC_Master *mst, return NULL; ch->in_transmit = GNUNET_YES; + size_t size = strlen (method_name) + 1; struct GNUNET_PSYC_MessageMethod *pmeth; - struct OperationHandle *op = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth)); + struct OperationHandle *op + = GNUNET_malloc (sizeof (*op) + sizeof (*pmeth) + size); pmeth = (struct GNUNET_PSYC_MessageMethod *) &op[1]; op->msg = (struct GNUNET_MessageHeader *) pmeth; - pmeth->header.type = GNUNET_MESSAGE_TYPE_PSYC_TRANSMIT_METHOD; - size_t size = strlen (method_name) + 1; + pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD); pmeth->header.size = htons (sizeof (*pmeth) + size); pmeth->flags = htonl (flags); - pmeth->mod_count - = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); + pmeth->mod_count = GNUNET_ntohll (GNUNET_ENV_environment_get_mod_count (env)); memcpy (&pmeth[1], method_name, size); - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); - + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); GNUNET_ENV_environment_iterate (env, send_modifier, mst); + transmit_next (ch); + + mst->tmit = GNUNET_malloc (sizeof (*mst->tmit)); + mst->tmit->master = mst; + mst->tmit->notify = notify; + mst->tmit->notify_cls = notify_cls; + mst->tmit->status = GNUNET_PSYC_DATA_CONT; + return mst->tmit; +} + - struct GNUNET_PSYC_MasterTransmitHandle *th = GNUNET_malloc (sizeof (*th)); - th->master = mst; - th->env = env; - th->notify = notify; - th->notify_cls = notify_cls; - return th; +/** + * Resume transmission to the channel. + * + * @param th Handle of the request that is being resumed. + */ +void +GNUNET_PSYC_master_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) +{ + master_transmit_data (th->master); } @@ -671,7 +771,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key, const struct GNUNET_CRYPTO_EccPrivateKey *slave_key, const struct GNUNET_PeerIdentity *origin, - size_t relay_count, + uint32_t relay_count, const struct GNUNET_PeerIdentity *relays, GNUNET_PSYC_Method method, GNUNET_PSYC_JoinCallback join_cb, @@ -680,7 +780,7 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, const char *method_name, const struct GNUNET_ENV_Environment *env, const void *data, - size_t data_size) + uint16_t data_size) { struct GNUNET_PSYC_Slave *slv = GNUNET_malloc (sizeof (*slv)); struct GNUNET_PSYC_Channel *ch = &slv->ch; @@ -692,10 +792,12 @@ GNUNET_PSYC_slave_join (const struct GNUNET_CONFIGURATION_Handle *cfg, req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_SLAVE_JOIN); req->channel_key = *channel_key; req->slave_key = *slave_key; + req->origin = *origin; req->relay_count = relay_count; memcpy (&req[1], relays, relay_count * sizeof (*relays)); ch->cfg = cfg; + ch->is_master = GNUNET_NO; ch->reconnect_msg = (struct GNUNET_MessageHeader *) req; ch->reconnect_delay = GNUNET_TIME_UNIT_ZERO; ch->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, slv); @@ -746,6 +848,18 @@ GNUNET_PSYC_slave_transmit (struct GNUNET_PSYC_Slave *slave, /** + * Resume transmission to the master. + * + * @param th Handle of the request that is being resumed. + */ +void +GNUNET_PSYC_slave_transmit_resume (struct GNUNET_PSYC_MasterTransmitHandle *th) +{ + +} + + +/** * Abort transmission request to master. * * @param th Handle of the request that is being aborted. @@ -822,7 +936,7 @@ GNUNET_PSYC_channel_slave_add (struct GNUNET_PSYC_Channel *ch, slvadd->announced_at = GNUNET_htonll (announced_at); slvadd->effective_since = GNUNET_htonll (effective_since); - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); transmit_next (ch); } @@ -863,7 +977,7 @@ GNUNET_PSYC_channel_slave_remove (struct GNUNET_PSYC_Channel *ch, slvrm->header.size = htons (sizeof (*slvrm)); slvrm->announced_at = GNUNET_htonll (announced_at); - GNUNET_CONTAINER_DLL_insert (ch->transmit_head, ch->transmit_tail, op); + GNUNET_CONTAINER_DLL_insert_tail (ch->transmit_head, ch->transmit_tail, op); transmit_next (ch); } |