diff options
author | Gabor X Toth <*@tg-x.net> | 2016-08-17 21:26:41 +0000 |
---|---|---|
committer | Gabor X Toth <*@tg-x.net> | 2016-08-17 21:26:41 +0000 |
commit | 667cc67f8224ccf4ff391b125a614cf90cf5917e (patch) | |
tree | ae2048a6525ab2521ad989afa795d7a2f0833af6 /src/psycutil/psyc_message.c | |
parent | 720a38ea7519f5a1a820157f056b150ab3d4abd5 (diff) |
psyc, social: switch to MQ
Diffstat (limited to 'src/psycutil/psyc_message.c')
-rw-r--r-- | src/psycutil/psyc_message.c | 34 |
1 files changed, 24 insertions, 10 deletions
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c index 303ba84668..bc1896b1f9 100644 --- a/src/psycutil/psyc_message.c +++ b/src/psycutil/psyc_message.c @@ -39,7 +39,7 @@ struct GNUNET_PSYC_TransmitHandle /** * Client connection to service. */ - struct GNUNET_CLIENT_MANAGER_Connection *client; + struct GNUNET_MQ_Handle *mq; /** * Message currently being received from the client. @@ -47,6 +47,11 @@ struct GNUNET_PSYC_TransmitHandle struct GNUNET_MessageHeader *msg; /** + * Envelope for @a msg + */ + struct GNUNET_MQ_Envelope *env; + + /** * Callback to request next modifier from client. */ GNUNET_PSYC_TransmitNotifyModifier notify_mod; @@ -327,11 +332,11 @@ GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind, * Create a transmission handle. */ struct GNUNET_PSYC_TransmitHandle * -GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client) +GNUNET_PSYC_transmit_create (struct GNUNET_MQ_Handle *mq) { struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_new (struct GNUNET_PSYC_TransmitHandle); - tmit->client = client; + tmit->mq = mq; return tmit; } @@ -378,16 +383,15 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, { /* End of message or buffer is full, add it to transmission queue * and start with empty buffer */ - tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); tmit->msg->size = htons (tmit->msg->size); - GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); + GNUNET_MQ_send (tmit->mq, tmit->env); + tmit->env = NULL; tmit->msg = NULL; tmit->acks_pending++; } else { /* Message fits in current buffer, append */ - tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size); GNUNET_memcpy ((char *) tmit->msg + tmit->msg->size, msg, size); tmit->msg->size += size; } @@ -396,8 +400,13 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, if (NULL == tmit->msg && NULL != msg) { /* Empty buffer, copy over message. */ - tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size); + tmit->env = GNUNET_MQ_msg_extra (tmit->msg, + GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + /* store current message size in host byte order + * then later switch it to network byte order before sending */ tmit->msg->size = sizeof (*tmit->msg) + size; + GNUNET_memcpy (&tmit->msg[1], msg, size); } @@ -407,9 +416,9 @@ transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit, < tmit->msg->size + sizeof (struct GNUNET_MessageHeader)))) { /* End of message or buffer is full, add it to transmission queue. */ - tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); tmit->msg->size = htons (tmit->msg->size); - GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg); + GNUNET_MQ_send (tmit->mq, tmit->env); + tmit->env = NULL; tmit->msg = NULL; tmit->acks_pending++; } @@ -722,7 +731,12 @@ GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit, size_t size = strlen (method_name) + 1; struct GNUNET_PSYC_MessageMethod *pmeth; - tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size); + + tmit->env = GNUNET_MQ_msg_extra (tmit->msg, + GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD, + GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); + /* store current message size in host byte order + * then later switch it to network byte order before sending */ tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size; if (NULL != notify_mod) |