aboutsummaryrefslogtreecommitdiff
path: root/src/multicast/multicast_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2014-05-29 16:35:55 +0000
committerGabor X Toth <*@tg-x.net>2014-05-29 16:35:55 +0000
commit9955561e1b204ccf23fbf841f409bd3ef79be88c (patch)
tree0271c23ae9f1dad72266a0e6073d696e5afca027 /src/multicast/multicast_api.c
parenta5877668ba805c5e0efe622e6ce4c58ff5609bf9 (diff)
psyc, multicast: reorg code, use new client manager & psyc util lib
Diffstat (limited to 'src/multicast/multicast_api.c')
-rw-r--r--src/multicast/multicast_api.c614
1 files changed, 154 insertions, 460 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index 36d564d524..b6d51896d7 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -24,6 +24,7 @@
* @author Christian Grothoff
* @author Gabor X Toth
*/
+
#include "platform.h"
#include "gnunet_util_lib.h"
#include "gnunet_multicast_service.h"
@@ -33,26 +34,6 @@
/**
- * Started origins.
- * Group's pub_key_hash -> struct GNUNET_MULTICAST_Origin
- */
-static struct GNUNET_CONTAINER_MultiHashMap *origins;
-
-/**
- * Joined members.
- * group_key_hash -> struct GNUNET_MULTICAST_Member
- */
-static struct GNUNET_CONTAINER_MultiHashMap *members;
-
-
-struct MessageQueue
-{
- struct MessageQueue *prev;
- struct MessageQueue *next;
-};
-
-
-/**
* Handle for a request to send a message to all multicast group members
* (from the origin).
*/
@@ -90,47 +71,14 @@ struct GNUNET_MULTICAST_Group
const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
- * Socket (if available).
+ * Client connection to the service.
*/
- struct GNUNET_CLIENT_Connection *client;
-
- /**
- * Currently pending transmission request, or NULL for none.
- */
- struct GNUNET_CLIENT_TransmitHandle *th;
-
- /**
- * Head of operations to transmit.
- */
- struct MessageQueue *tmit_head;
-
- /**
- * Tail of operations to transmit.
- */
- struct MessageQueue *tmit_tail;
-
- /**
- * Message being transmitted to the Multicast service.
- */
- struct MessageQueue *tmit_msg;
+ struct GNUNET_CLIENT_MANAGER_Connection *client;
/**
* Message to send on reconnect.
*/
- struct GNUNET_MessageHeader *reconnect_msg;
-
- /**
- * Task doing exponential back-off trying to reconnect.
- */
- GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
-
- /**
- * Time for next connect retry.
- */
- struct GNUNET_TIME_Relative reconnect_delay;
-
- struct GNUNET_CRYPTO_EddsaPublicKey pub_key;
- struct GNUNET_HashCode pub_key_hash;
+ struct GNUNET_MessageHeader *connect_msg;
GNUNET_MULTICAST_JoinRequestCallback join_req_cb;
GNUNET_MULTICAST_MembershipTestCallback member_test_cb;
@@ -140,11 +88,6 @@ struct GNUNET_MULTICAST_Group
void *cb_cls;
/**
- * Are we polling for incoming messages right now?
- */
- uint8_t in_receive;
-
- /**
* Are we currently transmitting a message?
*/
uint8_t in_transmit;
@@ -163,7 +106,6 @@ struct GNUNET_MULTICAST_Origin
{
struct GNUNET_MULTICAST_Group grp;
struct GNUNET_MULTICAST_OriginTransmitHandle tmit;
- struct GNUNET_CRYPTO_EddsaPrivateKey priv_key;
GNUNET_MULTICAST_RequestCallback request_cb;
};
@@ -229,294 +171,125 @@ struct GNUNET_MULTICAST_MemberReplayHandle
};
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
-
-
-static void
-reschedule_connect (struct GNUNET_MULTICAST_Group *grp);
-
-
/**
- * Schedule transmission of the next message from our queue.
- *
- * @param grp PSYC channel handle
+ * Send first message to the service after connecting.
*/
static void
-transmit_next (struct GNUNET_MULTICAST_Group *grp);
-
-
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg);
-
-
-/**
- * Reschedule a connect attempt to the service.
- *
- * @param c channel to reconnect
- */
-static void
-reschedule_connect (struct GNUNET_MULTICAST_Group *grp)
+group_send_connect_msg (struct GNUNET_MULTICAST_Group *grp)
{
- GNUNET_assert (grp->reconnect_task == GNUNET_SCHEDULER_NO_TASK);
-
- if (NULL != grp->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
- grp->th = NULL;
- }
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_disconnect (grp->client);
- grp->client = NULL;
- }
- grp->in_receive = GNUNET_NO;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Scheduling task to reconnect to Multicast service in %s.\n",
- GNUNET_STRINGS_relative_time_to_string (grp->reconnect_delay, GNUNET_YES));
- grp->reconnect_task =
- GNUNET_SCHEDULER_add_delayed (grp->reconnect_delay, &reconnect, grp);
- grp->reconnect_delay = GNUNET_TIME_STD_BACKOFF (grp->reconnect_delay);
+ uint16_t cmsg_size = ntohs (grp->connect_msg->size);
+ struct GNUNET_MessageHeader * cmsg = GNUNET_malloc (cmsg_size);
+ memcpy (cmsg, grp->connect_msg, cmsg_size);
+ GNUNET_CLIENT_MANAGER_transmit_now (grp->client, cmsg);
}
/**
- * Reset stored data related to the last received message.
+ * Got disconnected from service. Reconnect.
*/
static void
-recv_reset (struct GNUNET_MULTICAST_Group *grp)
-{
-}
-
-
-static void
-recv_error (struct GNUNET_MULTICAST_Group *grp)
-{
- if (NULL != grp->message_cb)
- grp->message_cb (grp->cb_cls, NULL);
-
- recv_reset (grp);
-}
-
-
-/**
- * Transmit next message to service.
- *
- * @param cls The struct GNUNET_MULTICAST_Group.
- * @param size Number of bytes available in @a buf.
- * @param buf Where to copy the message.
- *
- * @return Number of bytes copied to @a buf.
- */
-static size_t
-send_next_message (void *cls, size_t size, void *buf)
+group_recv_disconnect (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "send_next_message()\n");
- struct GNUNET_MULTICAST_Group *grp = cls;
- struct MessageQueue *mq = grp->tmit_head;
- if (NULL == mq)
- return 0;
- struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
- size_t ret = ntohs (qmsg->size);
- grp->th = NULL;
- if (ret > size)
- {
- reschedule_connect (grp);
- return 0;
- }
- memcpy (buf, qmsg, ret);
-
- GNUNET_CONTAINER_DLL_remove (grp->tmit_head, grp->tmit_tail, mq);
- GNUNET_free (mq);
-
- if (NULL != grp->tmit_head)
- transmit_next (grp);
-
- if (GNUNET_NO == grp->in_receive)
- {
- grp->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
- return ret;
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ GNUNET_CLIENT_MANAGER_reconnect (client);
+ group_send_connect_msg (grp);
}
/**
- * Schedule transmission of the next message from our queue.
- *
- * @param grp Multicast group handle.
+ * Receive join request from service.
*/
static void
-transmit_next (struct GNUNET_MULTICAST_Group *grp)
+group_recv_join_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- LOG (GNUNET_ERROR_TYPE_DEBUG, "transmit_next()\n");
- if (NULL != grp->th || NULL == grp->client)
- return;
-
- struct MessageQueue *mq = grp->tmit_head;
- if (NULL == mq)
- return;
- struct GNUNET_MessageHeader *qmsg = (struct GNUNET_MessageHeader *) &mq[1];
-
- grp->th = GNUNET_CLIENT_notify_transmit_ready (grp->client,
- ntohs (qmsg->size),
- GNUNET_TIME_UNIT_FOREVER_REL,
- GNUNET_NO,
- &send_next_message,
- grp);
-}
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ const struct MulticastJoinRequestMessage *
+ jreq = (const struct MulticastJoinRequestMessage *) msg;
-/**
- * Try again to connect to the Multicast service.
- *
- * @param cls Channel handle.
- * @param tc Scheduler context.
- */
-static void
-reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
- struct GNUNET_MULTICAST_Group *grp = cls;
-
- recv_reset (grp);
- grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Connecting to Multicast service.\n");
- GNUNET_assert (NULL == grp->client);
- grp->client = GNUNET_CLIENT_connect ("multicast", grp->cfg);
- GNUNET_assert (NULL != grp->client);
- uint16_t reconn_size = ntohs (grp->reconnect_msg->size);
-
- if (NULL == grp->tmit_head ||
- 0 != memcmp (&grp->tmit_head[1], grp->reconnect_msg, reconn_size))
- {
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + reconn_size);
- memcpy (&mq[1], grp->reconnect_msg, reconn_size);
- GNUNET_CONTAINER_DLL_insert (grp->tmit_head, grp->tmit_tail, mq);
- }
- transmit_next (grp);
-}
+ struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
+ jh->group = grp;
+ jh->member_key = jreq->member_key;
+ jh->member_peer = jreq->member_peer;
+ const struct GNUNET_MessageHeader *jmsg = NULL;
+ if (sizeof (*jreq) + sizeof (*jmsg) <= ntohs (jreq->header.size))
+ jmsg = (const struct GNUNET_MessageHeader *) &jreq[1];
-/**
- * Disconnect from the Multicast service.
- *
- * @param g Group handle to disconnect.
- */
-static void
-disconnect (void *g)
-{
- struct GNUNET_MULTICAST_Group *grp = g;
-
- GNUNET_assert (NULL != grp);
- if (grp->tmit_head != grp->tmit_tail)
- {
- LOG (GNUNET_ERROR_TYPE_ERROR,
- "Disconnecting while there are still outstanding messages!\n");
- GNUNET_break (0);
- }
- if (grp->reconnect_task != GNUNET_SCHEDULER_NO_TASK)
- {
- GNUNET_SCHEDULER_cancel (grp->reconnect_task);
- grp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
- }
- if (NULL != grp->th)
- {
- GNUNET_CLIENT_notify_transmit_ready_cancel (grp->th);
- grp->th = NULL;
- }
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_disconnect (grp->client);
- grp->client = NULL;
- }
- if (NULL != grp->reconnect_msg)
- {
- GNUNET_free (grp->reconnect_msg);
- grp->reconnect_msg = NULL;
- }
+ if (NULL != grp->join_req_cb)
+ grp->join_req_cb (grp->cb_cls, &jreq->member_key, jmsg, jh);
}
/**
- * Iterator callback for calling message callbacks for all groups.
+ * Receive multicast message from service.
*/
-static int
-message_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *group)
+static void
+group_recv_message (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct GNUNET_MessageHeader *msg = cls;
- struct GNUNET_MULTICAST_Group *grp = group;
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct GNUNET_MULTICAST_MessageHeader *
+ mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling message callback with a message "
- "of type %u and size %u.\n",
- ntohs (msg->type), ntohs (msg->size));
+ "Calling message callback with a message of size %u.\n",
+ ntohs (mmsg->header.size));
if (NULL != grp->message_cb)
- grp->message_cb (grp->cb_cls, msg);
-
- return GNUNET_YES;
+ grp->message_cb (grp->cb_cls, mmsg);
}
/**
- * Iterator callback for calling request callbacks of origins.
+ * Origin receives uniquest request from a member.
*/
-static int
-request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash, void *origin)
-{
- const struct GNUNET_MULTICAST_RequestHeader *req = cls;
- struct GNUNET_MULTICAST_Origin *orig = origin;
+static void
+origin_recv_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_MULTICAST_Group *grp;
+ struct GNUNET_MULTICAST_Origin *
+ orig = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ grp = &orig->grp;
+ struct GNUNET_MULTICAST_RequestHeader *
+ req = (struct GNUNET_MULTICAST_RequestHeader *) msg;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Calling request callback for a request of type %u and size %u.\n",
- ntohs (req->header.type), ntohs (req->header.size));
+ "Calling request callback with a request of size %u.\n",
+ ntohs (req->header.size));
if (NULL != orig->request_cb)
- orig->request_cb (orig->grp.cb_cls, &req->member_key,
- (const struct GNUNET_MessageHeader *) req, 0);
- return GNUNET_YES;
+ orig->request_cb (grp->cb_cls, req);
}
/**
- * Iterator callback for calling join request callbacks of origins.
+ * Member receives join decision.
*/
-static int
-join_request_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *group)
+static void
+member_recv_join_decision (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct MulticastJoinRequestMessage *req = cls;
- struct GNUNET_MULTICAST_Group *grp = group;
-
- struct GNUNET_MULTICAST_JoinHandle *jh = GNUNET_malloc (sizeof (*jh));
- jh->group = grp;
- jh->member_key = req->member_key;
- jh->member_peer = req->member_peer;
-
- const struct GNUNET_MessageHeader *msg = NULL;
- if (sizeof (*req) + sizeof (*msg) <= ntohs (req->header.size))
- msg = (const struct GNUNET_MessageHeader *) &req[1];
-
- if (NULL != grp->join_req_cb)
- grp->join_req_cb (grp->cb_cls, &req->member_key, msg, jh);
- return GNUNET_YES;
-}
-
+ struct GNUNET_MULTICAST_Group *grp;
+ struct GNUNET_MULTICAST_Member *
+ mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ grp = &mem->grp;
-/**
- * Iterator callback for calling join decision callbacks of members.
- */
-static int
-join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
- void *member)
-{
- const struct MulticastJoinDecisionMessageHeader *hdcsn = cls;
+ const struct MulticastJoinDecisionMessageHeader *
+ hdcsn = (const struct MulticastJoinDecisionMessageHeader *) msg;
const struct MulticastJoinDecisionMessage *
dcsn = (const struct MulticastJoinDecisionMessage *) &hdcsn[1];
- struct GNUNET_MULTICAST_Member *mem = member;
- struct GNUNET_MULTICAST_Group *grp = &mem->grp;
uint16_t dcsn_size = ntohs (dcsn->header.size);
int is_admitted = ntohl (dcsn->is_admitted);
@@ -549,116 +322,53 @@ join_decision_cb (void *cls, const struct GNUNET_HashCode *pub_key_hash,
if (GNUNET_YES != is_admitted)
GNUNET_MULTICAST_member_part (mem);
-
- return GNUNET_YES;
}
+
/**
- * Function called when we receive a message from the service.
- *
- * @param cls struct GNUNET_MULTICAST_Group
- * @param msg Message received, NULL on timeout or fatal error.
+ * Message handlers for an origin.
*/
-static void
-message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
+static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
{
- struct GNUNET_MULTICAST_Group *grp = cls;
+ { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- if (NULL == msg)
- {
- // timeout / disconnected from service, reconnect
- reschedule_connect (grp);
- return;
- }
+ { &group_recv_message, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
- uint16_t size_eq = 0;
- uint16_t size_min = 0;
- uint16_t size = ntohs (msg->size);
- uint16_t type = ntohs (msg->type);
+ { &origin_recv_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
+ sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
- LOG (GNUNET_ERROR_TYPE_DEBUG,
- "Received message of type %d and size %u from Multicast service\n",
- type, size);
+ { &group_recv_join_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- size_min = sizeof (struct GNUNET_MULTICAST_MessageHeader);
- break;
-
- case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
- size_min = sizeof (struct GNUNET_MULTICAST_RequestHeader);
- break;
+ { NULL, NULL, 0, 0, GNUNET_NO }
+};
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
- size_min = sizeof (struct MulticastJoinRequestMessage);
- break;
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION:
- size_min = sizeof (struct MulticastJoinDecisionMessage);
- break;
+/**
+ * Message handlers for a member.
+ */
+static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
+{
+ { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- default:
- GNUNET_break_op (0);
- type = 0;
- }
+ { &group_recv_message, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
+ sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
- if (! ((0 < size_eq && size == size_eq)
- || (0 < size_min && size_min <= size)))
- {
- GNUNET_break_op (0);
- type = 0;
- }
+ { &group_recv_join_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
+ sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
- switch (type)
- {
- case GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE:
- if (origins != NULL)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- message_cb, (void *) msg);
- if (members != NULL)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- message_cb, (void *) msg);
- break;
-
- case GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST:
- if (GNUNET_YES != grp->is_origin)
- {
- GNUNET_break (0);
- break;
- }
- if (NULL != origins)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- request_cb, (void *) msg);
- break;
-
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST:
- if (NULL != origins)
- GNUNET_CONTAINER_multihashmap_get_multiple (origins, &grp->pub_key_hash,
- join_request_cb, (void *) msg);
- if (NULL != members)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- join_request_cb, (void *) msg);
- break;
-
- case GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION:
- if (GNUNET_NO != grp->is_origin)
- {
- GNUNET_break (0);
- break;
- }
- if (NULL != members)
- GNUNET_CONTAINER_multihashmap_get_multiple (members, &grp->pub_key_hash,
- join_decision_cb, (void *) msg);
- break;
- }
+ { &member_recv_join_decision, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
+ sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
- if (NULL != grp->client)
- {
- GNUNET_CLIENT_receive (grp->client, &message_handler, grp,
- GNUNET_TIME_UNIT_FOREVER_REL);
- }
-}
+ { NULL, NULL, 0, 0, GNUNET_NO }
+};
/**
@@ -667,7 +377,7 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
* Must be called once and only once in response to an invocation of the
* #GNUNET_MULTICAST_JoinRequestCallback.
*
- * @param jh Join request handle.
+ * @param join Join request handle.
* @param is_admitted #GNUNET_YES if the join is approved,
* #GNUNET_NO if it is disapproved,
* #GNUNET_SYSERR if we cannot answer the request.
@@ -685,27 +395,25 @@ message_handler (void *cls, const struct GNUNET_MessageHeader *msg)
* peer that issued the request even if admission is denied.
*/
struct GNUNET_MULTICAST_ReplayHandle *
-GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
+GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *join,
int is_admitted,
uint16_t relay_count,
const struct GNUNET_PeerIdentity *relays,
const struct GNUNET_MessageHeader *join_resp)
{
- struct GNUNET_MULTICAST_Group *grp = jh->group;
+ struct GNUNET_MULTICAST_Group *grp = join->group;
uint16_t join_resp_size = (NULL != join_resp) ? ntohs (join_resp->size) : 0;
uint16_t relay_size = relay_count * sizeof (*relays);
+
struct MulticastJoinDecisionMessageHeader * hdcsn;
struct MulticastJoinDecisionMessage *dcsn;
- struct MessageQueue *
- mq = GNUNET_malloc (sizeof (*mq) + sizeof (*hdcsn) + sizeof (*dcsn)
- + relay_size + join_resp_size);
-
- hdcsn = (struct MulticastJoinDecisionMessageHeader *) &mq[1];
- hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+ hdcsn = GNUNET_malloc (sizeof (*hdcsn) + sizeof (*dcsn)
+ + relay_size + join_resp_size);
hdcsn->header.size = htons (sizeof (*hdcsn) + sizeof (*dcsn)
- + relay_size + join_resp_size);
- hdcsn->member_key = jh->member_key;
- hdcsn->peer = jh->member_peer;
+ + relay_size + join_resp_size);
+ hdcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
+ hdcsn->member_key = join->member_key;
+ hdcsn->peer = join->member_peer;
dcsn = (struct MulticastJoinDecisionMessage *) &hdcsn[1];
dcsn->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION);
@@ -717,10 +425,8 @@ GNUNET_MULTICAST_join_decision (struct GNUNET_MULTICAST_JoinHandle *jh,
if (0 < join_resp_size)
memcpy (((char *) &dcsn[1]) + relay_size, join_resp, join_resp_size);
- GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
- transmit_next (grp);
-
- GNUNET_free (jh);
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &hdcsn->header);
+ GNUNET_free (join);
return NULL;
}
@@ -832,7 +538,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
start->max_fragment_id = max_fragment_id;
memcpy (&start->group_key, priv_key, sizeof (*priv_key));
- grp->reconnect_msg = (struct GNUNET_MessageHeader *) start;
+ grp->connect_msg = (struct GNUNET_MessageHeader *) start;
grp->is_origin = GNUNET_YES;
grp->cfg = cfg;
@@ -844,20 +550,10 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
grp->message_cb = message_cb;
orig->request_cb = request_cb;
- orig->priv_key = *priv_key;
-
- GNUNET_CRYPTO_eddsa_key_get_public (&orig->priv_key, &grp->pub_key);
- GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key),
- &grp->pub_key_hash);
-
- if (NULL == origins)
- origins = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-
- GNUNET_CONTAINER_multihashmap_put (origins, &grp->pub_key_hash, orig,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
- grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
+ grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", origin_handlers);
+ GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, orig, sizeof (*grp));
+ group_send_connect_msg (grp);
return orig;
}
@@ -871,8 +567,7 @@ GNUNET_MULTICAST_origin_start (const struct GNUNET_CONFIGURATION_Handle *cfg,
void
GNUNET_MULTICAST_origin_stop (struct GNUNET_MULTICAST_Origin *orig)
{
- disconnect (&orig->grp);
- GNUNET_CONTAINER_multihashmap_remove (origins, &orig->grp.pub_key_hash, orig);
+ GNUNET_CLIENT_MANAGER_disconnect (orig->grp.client, GNUNET_YES);
GNUNET_free (orig);
}
@@ -885,26 +580,22 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
- GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-
- struct GNUNET_MULTICAST_MessageHeader *
- msg = (struct GNUNET_MULTICAST_MessageHeader *) &mq[1];
+ struct GNUNET_MULTICAST_MessageHeader *msg = GNUNET_malloc (buf_size);
int ret = tmit->notify (tmit->notify_cls, &buf_size, &msg[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
- || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+ || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
"OriginTransmitNotify() returned error or invalid message size.\n");
/* FIXME: handle error */
- GNUNET_free (mq);
+ GNUNET_free (msg);
return;
}
if (GNUNET_NO == ret && 0 == buf_size)
{
- GNUNET_free (mq);
+ GNUNET_free (msg);
return; /* Transmission paused. */
}
@@ -915,7 +606,7 @@ origin_to_all (struct GNUNET_MULTICAST_Origin *orig)
msg->fragment_offset = GNUNET_htonll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*msg) + buf_size;
- transmit_next (grp);
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &msg->header);
}
@@ -939,6 +630,12 @@ GNUNET_MULTICAST_origin_to_all (struct GNUNET_MULTICAST_Origin *orig,
GNUNET_MULTICAST_OriginTransmitNotify notify,
void *notify_cls)
{
+/* FIXME
+ if (GNUNET_YES == orig->grp.in_transmit)
+ return NULL;
+ orig->grp.in_transmit = GNUNET_YES;
+*/
+
struct GNUNET_MULTICAST_OriginTransmitHandle *tmit = &orig->tmit;
tmit->origin = orig;
tmit->message_id = message_id;
@@ -1047,10 +744,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
if (0 < join_msg_size)
memcpy (((char *) &join[1]) + relay_size, join_msg, join_msg_size);
- grp->reconnect_msg = (struct GNUNET_MessageHeader *) join;
+ grp->connect_msg = (struct GNUNET_MessageHeader *) join;
grp->is_origin = GNUNET_NO;
grp->cfg = cfg;
- grp->pub_key = *group_key;
mem->join_dcsn_cb = join_decision_cb;
grp->join_req_cb = join_request_cb;
@@ -1059,17 +755,9 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
grp->message_cb = message_cb;
grp->cb_cls = cls;
- GNUNET_CRYPTO_eddsa_key_get_public (member_key, &grp->pub_key);
- GNUNET_CRYPTO_hash (&grp->pub_key, sizeof (grp->pub_key), &grp->pub_key_hash);
-
- if (NULL == members)
- members = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_YES);
-
- GNUNET_CONTAINER_multihashmap_put (members, &grp->pub_key_hash, mem,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
-
- grp->reconnect_delay = GNUNET_TIME_UNIT_ZERO;
- grp->reconnect_task = GNUNET_SCHEDULER_add_now (&reconnect, grp);
+ grp->client = GNUNET_CLIENT_MANAGER_connect (cfg, "multicast", member_handlers);
+ GNUNET_CLIENT_MANAGER_set_user_context_ (grp->client, mem, sizeof (*grp));
+ group_send_connect_msg (grp);
return mem;
}
@@ -1088,8 +776,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
void
GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem)
{
- disconnect (&mem->grp);
- GNUNET_CONTAINER_multihashmap_remove (members, &mem->grp.pub_key_hash, mem);
+ GNUNET_CLIENT_MANAGER_disconnect (mem->grp.client, GNUNET_YES);
GNUNET_free (mem);
}
@@ -1162,25 +849,26 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
struct GNUNET_MULTICAST_Group *grp = &mem->grp;
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
- size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD;
- struct MessageQueue *mq = GNUNET_malloc (sizeof (*mq) + buf_size);
- GNUNET_CONTAINER_DLL_insert_tail (grp->tmit_head, grp->tmit_tail, mq);
-
- struct GNUNET_MULTICAST_RequestHeader *
- req = (struct GNUNET_MULTICAST_RequestHeader *) &mq[1];
+ size_t buf_size = GNUNET_MULTICAST_FRAGMENT_MAX_SIZE;
+ struct GNUNET_MULTICAST_RequestHeader *req = GNUNET_malloc (buf_size);
int ret = tmit->notify (tmit->notify_cls, &buf_size, &req[1]);
if (! (GNUNET_YES == ret || GNUNET_NO == ret)
- || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < buf_size)
+ || GNUNET_MULTICAST_FRAGMENT_MAX_SIZE < buf_size)
{
LOG (GNUNET_ERROR_TYPE_ERROR,
"MemberTransmitNotify() returned error or invalid message size.\n");
/* FIXME: handle error */
+ GNUNET_free (req);
return;
}
if (GNUNET_NO == ret && 0 == buf_size)
- return; /* Transmission paused. */
+ {
+ /* Transmission paused. */
+ GNUNET_free (req);
+ return;
+ }
req->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST);
req->header.size = htons (sizeof (*req) + buf_size);
@@ -1188,7 +876,7 @@ member_to_origin (struct GNUNET_MULTICAST_Member *mem)
req->fragment_offset = GNUNET_ntohll (tmit->fragment_offset);
tmit->fragment_offset += sizeof (*req) + buf_size;
- transmit_next (grp);
+ GNUNET_CLIENT_MANAGER_transmit (grp->client, &req->header);
}
@@ -1207,6 +895,12 @@ GNUNET_MULTICAST_member_to_origin (struct GNUNET_MULTICAST_Member *mem,
GNUNET_MULTICAST_MemberTransmitNotify notify,
void *notify_cls)
{
+/* FIXME
+ if (GNUNET_YES == mem->grp.in_transmit)
+ return NULL;
+ mem->grp.in_transmit = GNUNET_YES;
+*/
+
struct GNUNET_MULTICAST_MemberTransmitHandle *tmit = &mem->tmit;
tmit->member = mem;
tmit->request_id = request_id;