aboutsummaryrefslogtreecommitdiff
path: root/src/psycutil/psyc_message.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-08-17 21:26:41 +0000
committerGabor X Toth <*@tg-x.net>2016-08-17 21:26:41 +0000
commit667cc67f8224ccf4ff391b125a614cf90cf5917e (patch)
treeae2048a6525ab2521ad989afa795d7a2f0833af6 /src/psycutil/psyc_message.c
parent720a38ea7519f5a1a820157f056b150ab3d4abd5 (diff)
psyc, social: switch to MQ
Diffstat (limited to 'src/psycutil/psyc_message.c')
-rw-r--r--src/psycutil/psyc_message.c34
1 files changed, 24 insertions, 10 deletions
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c
index 303ba84668..bc1896b1f9 100644
--- a/src/psycutil/psyc_message.c
+++ b/src/psycutil/psyc_message.c
@@ -39,7 +39,7 @@ struct GNUNET_PSYC_TransmitHandle
/**
* Client connection to service.
*/
- struct GNUNET_CLIENT_MANAGER_Connection *client;
+ struct GNUNET_MQ_Handle *mq;
/**
* Message currently being received from the client.
@@ -47,6 +47,11 @@ struct GNUNET_PSYC_TransmitHandle
struct GNUNET_MessageHeader *msg;
/**
+ * Envelope for @a msg
+ */
+ struct GNUNET_MQ_Envelope *env;
+
+ /**
* Callback to request next modifier from client.
*/
GNUNET_PSYC_TransmitNotifyModifier notify_mod;
@@ -327,11 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
* Create a transmission handle.
*/
struct GNUNET_PSYC_TransmitHandle *
-GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
+GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq)
{
struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle);
- tmit->client = client;
+ tmit->mq = mq;
return tmit;
}
@@ -378,16 +383,15 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
{
/* End of message or buffer is full, add it to transmission queue
* and start with empty buffer */
- tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
tmit->msg->size = htons (tmit->msg->size);
- GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+ GNUNET_MQ_send (tmit->mq, tmit->env);
+ tmit->env = NULL;
tmit->msg = NULL;
tmit->acks_pending++;
}
else
{
/* Message fits in current buffer, append */
- tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
tmit->msg->size += size;
}
@@ -396,8 +400,13 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
if (NULL == tmit->msg && NULL != msg)
{
/* Empty buffer, copy over message. */
- tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
+ tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
+ GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ /* store current message size in host byte order
+ * then later switch it to network byte order before sending */
tmit->msg->size = sizeof (*tmit->msg) + size;
+
GNUNET_memcpy (&tmit->msg[1], msg, size);
}
@@ -407,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
< tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
{
/* End of message or buffer is full, add it to transmission queue. */
- tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
tmit->msg->size = htons (tmit->msg->size);
- GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+ GNUNET_MQ_send (tmit->mq, tmit->env);
+ tmit->env = NULL;
tmit->msg = NULL;
tmit->acks_pending++;
}
@@ -722,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
size_t size = strlen (method_name) + 1;
struct GNUNET_PSYC_MessageMethod *pmeth;
- tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
+
+ tmit->env = GNUNET_MQ_msg_extra (tmit->msg,
+ GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD,
+ GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ /* store current message size in host byte order
+ * then later switch it to network byte order before sending */
tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
if (NULL != notify_mod)