aboutsummaryrefslogtreecommitdiff
path: root/src/multicast/multicast_api.c
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2015-09-26 17:09:57 +0000
committerGabor X Toth <*@tg-x.net>2015-09-26 17:09:57 +0000
commit5a042e00e06de726a21d9db05ddeb2ac16ca7c0c (patch)
tree9b2f504796b72a50d061eb812be4aa87efbf6584 /src/multicast/multicast_api.c
parent0c5cea1c26efb8b53f643a6ce6b162d9c9fe2b8f (diff)
multicast: replay
Diffstat (limited to 'src/multicast/multicast_api.c')
-rw-r--r--src/multicast/multicast_api.c236
1 files changed, 204 insertions, 32 deletions
diff --git a/src/multicast/multicast_api.c b/src/multicast/multicast_api.c
index ef4cc73e79..9f0c77f365 100644
--- a/src/multicast/multicast_api.c
+++ b/src/multicast/multicast_api.c
@@ -137,6 +137,11 @@ struct GNUNET_MULTICAST_Member
GNUNET_MULTICAST_JoinDecisionCallback join_dcsn_cb;
+ /**
+ * Replay fragment -> struct GNUNET_MULTICAST_MemberReplayHandle *
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *replay_reqs;
+
uint64_t next_fragment_id;
};
@@ -176,6 +181,8 @@ struct GNUNET_MULTICAST_MembershipTestHandle
*/
struct GNUNET_MULTICAST_ReplayHandle
{
+ struct GNUNET_MULTICAST_Group *grp;
+ struct MulticastReplayRequestMessage req;
};
@@ -184,6 +191,9 @@ struct GNUNET_MULTICAST_ReplayHandle
*/
struct GNUNET_MULTICAST_MemberReplayHandle
{
+
+ GNUNET_MULTICAST_ResultCallback result_cb;
+ void *result_cls;
};
@@ -263,11 +273,14 @@ group_recv_message (void *cls,
struct GNUNET_MULTICAST_MessageHeader *
mmsg = (struct GNUNET_MULTICAST_MessageHeader *) msg;
+ if (GNUNET_YES == grp->is_disconnecting)
+ return;
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Calling message callback with a message of size %u.\n",
ntohs (mmsg->header.size));
- if (GNUNET_YES != grp->is_disconnecting && NULL != grp->message_cb)
+ if (NULL != grp->message_cb)
grp->message_cb (grp->cb_cls, mmsg);
}
@@ -297,6 +310,73 @@ origin_recv_request (void *cls,
/**
+ * Receive multicast replay request from service.
+ */
+static void
+group_recv_replay_request (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_MULTICAST_Group *
+ grp = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ struct MulticastReplayRequestMessage *
+ rep = (struct MulticastReplayRequestMessage *) msg;
+
+ if (GNUNET_YES == grp->is_disconnecting)
+ return;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay request.\n");
+
+ if (0 != rep->fragment_id)
+ {
+ if (NULL != grp->replay_frag_cb)
+ {
+ struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
+ rh->grp = grp;
+ rh->req = *rep;
+ grp->replay_frag_cb (grp->cb_cls, &rep->member_key,
+ GNUNET_ntohll (rep->fragment_id),
+ GNUNET_ntohll (rep->flags), rh);
+ }
+ }
+ else if (0 != rep->message_id)
+ {
+ if (NULL != grp->replay_msg_cb)
+ {
+ struct GNUNET_MULTICAST_ReplayHandle * rh = GNUNET_malloc (sizeof (*rh));
+ rh->grp = grp;
+ rh->req = *rep;
+ grp->replay_msg_cb (grp->cb_cls, &rep->member_key,
+ GNUNET_ntohll (rep->message_id),
+ GNUNET_ntohll (rep->fragment_offset),
+ GNUNET_ntohll (rep->flags), rh);
+ }
+ }
+}
+
+
+/**
+ * Receive multicast replay request from service.
+ */
+static void
+member_recv_replay_response (void *cls,
+ struct GNUNET_CLIENT_MANAGER_Connection *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_MULTICAST_Group *grp;
+ struct GNUNET_MULTICAST_Member *
+ mem = GNUNET_CLIENT_MANAGER_get_user_context_ (client, sizeof (*grp));
+ grp = &mem->grp;
+ struct MulticastReplayResponseMessage *
+ res = (struct MulticastReplayResponseMessage *) msg;
+
+ if (GNUNET_YES == grp->is_disconnecting)
+ return;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got replay response.\n");
+}
+
+/**
* Member receives join decision.
*/
static void
@@ -369,20 +449,24 @@ member_recv_join_decision (void *cls,
*/
static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
{
- { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
+ { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- { &group_recv_message, NULL,
+ { group_recv_message, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
- { &origin_recv_request, NULL,
+ { origin_recv_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_REQUEST,
sizeof (struct GNUNET_MULTICAST_RequestHeader), GNUNET_YES },
- { &group_recv_join_request, NULL,
+ { group_recv_join_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
+ { group_recv_replay_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
+
{ NULL, NULL, 0, 0, GNUNET_NO }
};
@@ -392,20 +476,28 @@ static struct GNUNET_CLIENT_MANAGER_MessageHandler origin_handlers[] =
*/
static struct GNUNET_CLIENT_MANAGER_MessageHandler member_handlers[] =
{
- { &group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
+ { group_recv_disconnect, NULL, 0, 0, GNUNET_NO },
- { &group_recv_message, NULL,
+ { group_recv_message, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE,
sizeof (struct GNUNET_MULTICAST_MessageHeader), GNUNET_YES },
- { &group_recv_join_request, NULL,
+ { group_recv_join_request, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_REQUEST,
sizeof (struct MulticastJoinRequestMessage), GNUNET_YES },
- { &member_recv_join_decision, NULL,
+ { member_recv_join_decision, NULL,
GNUNET_MESSAGE_TYPE_MULTICAST_JOIN_DECISION,
sizeof (struct MulticastJoinDecisionMessage), GNUNET_YES },
+ { group_recv_replay_request, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST,
+ sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
+
+ { member_recv_replay_response, NULL,
+ GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE,
+ sizeof (struct MulticastReplayRequestMessage), GNUNET_NO },
+
{ NULL, NULL, 0, 0, GNUNET_NO }
};
@@ -514,15 +606,45 @@ GNUNET_MULTICAST_membership_test_result (struct GNUNET_MULTICAST_MembershipTestH
/**
* Replay a message fragment for the multicast group.
*
- * @param rh Replay handle identifying which replay operation was requested.
- * @param msg Replayed message fragment, NULL if unknown/error.
- * @param ec Error code.
+ * @param rh
+ * Replay handle identifying which replay operation was requested.
+ * @param msg
+ * Replayed message fragment, NULL if not found / an error occurred.
+ * @param ec
+ * Error code. See enum GNUNET_MULTICAST_ReplayErrorCode
+ * If not #GNUNET_MULTICAST_REC_OK, the replay handle is invalidated.
*/
void
GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
const struct GNUNET_MessageHeader *msg,
enum GNUNET_MULTICAST_ReplayErrorCode ec)
{
+ uint8_t msg_size = (NULL != msg) ? ntohs (msg->size) : 0;
+ struct MulticastReplayResponseMessage *
+ res = GNUNET_malloc (sizeof (*res) + msg_size);
+ *res = (struct MulticastReplayResponseMessage) {
+ .header = {
+ .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE),
+ .size = htons (sizeof (*res) + msg_size),
+ },
+ .fragment_id = rh->req.fragment_id,
+ .message_id = rh->req.message_id,
+ .fragment_offset = rh->req.fragment_offset,
+ .flags = rh->req.flags,
+ .error_code = htonl (ec),
+ };
+
+ if (GNUNET_MULTICAST_REC_OK == ec)
+ {
+ GNUNET_assert (NULL != msg);
+ memcpy (&res[1], msg, msg_size);
+ }
+
+ GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &res->header);
+ GNUNET_free (res);
+
+ if (GNUNET_MULTICAST_REC_OK != ec)
+ GNUNET_free (rh);
}
@@ -536,6 +658,19 @@ GNUNET_MULTICAST_replay_response (struct GNUNET_MULTICAST_ReplayHandle *rh,
void
GNUNET_MULTICAST_replay_response_end (struct GNUNET_MULTICAST_ReplayHandle *rh)
{
+ struct MulticastReplayResponseMessage end = {
+ .header = {
+ .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_RESPONSE_END),
+ .size = htons (sizeof (end)),
+ },
+ .fragment_id = rh->req.fragment_id,
+ .message_id = rh->req.message_id,
+ .fragment_offset = rh->req.fragment_offset,
+ .flags = rh->req.flags,
+ };
+
+ GNUNET_CLIENT_MANAGER_transmit (rh->grp->client, &end.header);
+ GNUNET_free (rh);
}
@@ -827,6 +962,7 @@ GNUNET_MULTICAST_member_join (const struct GNUNET_CONFIGURATION_Handle *cfg,
grp->join_req_cb = join_request_cb;
grp->member_test_cb = member_test_cb;
grp->replay_frag_cb = replay_frag_cb;
+ grp->replay_msg_cb = replay_msg_cb;
grp->message_cb = message_cb;
grp->cb_cls = cls;
@@ -864,26 +1000,55 @@ GNUNET_MULTICAST_member_part (struct GNUNET_MULTICAST_Member *mem,
}
+void
+member_replay_request (struct GNUNET_MULTICAST_Member *mem,
+ uint64_t fragment_id,
+ uint64_t message_id,
+ uint64_t fragment_offset,
+ uint64_t flags)
+{
+ struct MulticastReplayRequestMessage rep = {
+ .header = {
+ .type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_REPLAY_REQUEST),
+ .size = htons (sizeof (rep)),
+ },
+ .fragment_id = GNUNET_htonll (fragment_id),
+ .message_id = GNUNET_htonll (message_id),
+ .fragment_offset = GNUNET_htonll (fragment_offset),
+ .flags = GNUNET_htonll (flags),
+ };
+ GNUNET_CLIENT_MANAGER_transmit (mem->grp.client, &rep.header);
+}
+
+
/**
* Request a fragment to be replayed by fragment ID.
*
* Useful if messages below the @e max_known_fragment_id given when joining are
* needed and not known to the client.
*
- * @param member Membership handle.
- * @param fragment_id ID of a message fragment that this client would like to
- see replayed.
- * @param flags Additional flags for the replay request. It is used and defined
- * by the replay callback. FIXME: which replay callback? FIXME: use enum?
- * FIXME: why not pass reply cb here?
- * @return Replay request handle, NULL on error.
+ * @param member
+ * Membership handle.
+ * @param fragment_id
+ * ID of a message fragment that this client would like to see replayed.
+ * @param flags
+ * Additional flags for the replay request.
+ * It is used and defined by GNUNET_MULTICAST_ReplayFragmentCallback
+ * @param result_cb
+ * Function to call when the replayed message fragment arrives.
+ * @param result_cls
+ * Closure for @a result_cb.
+ *
+ * @return Replay request handle.
*/
struct GNUNET_MULTICAST_MemberReplayHandle *
-GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member,
+GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *mem,
uint64_t fragment_id,
- uint64_t flags)
+ uint64_t flags,
+ GNUNET_MULTICAST_ResultCallback result_cb,
+ void *result_cls)
{
- return NULL;
+ member_replay_request (mem, fragment_id, 0, 0, flags);
}
@@ -893,24 +1058,31 @@ GNUNET_MULTICAST_member_replay_fragment (struct GNUNET_MULTICAST_Member *member,
* Useful if messages below the @e max_known_fragment_id given when joining are
* needed and not known to the client.
*
- * @param member Membership handle.
- * @param message_id ID of the message this client would like to see replayed.
- * @param fragment_offset Offset of the fragment within the message to replay.
- * @param flags Additional flags for the replay request. It is used & defined
- * by the replay callback.
- * @param result_cb Function to be called for the replayed message.
- * @param result_cb_cls Closure for @a result_cb.
+ * @param member
+ * Membership handle.
+ * @param message_id
+ * ID of the message this client would like to see replayed.
+ * @param fragment_offset
+ * Offset of the fragment within the message to replay.
+ * @param flags
+ * Additional flags for the replay request.
+ * It is used & defined by GNUNET_MULTICAST_ReplayMessageCallback
+ * @param result_cb
+ * Function to call for each replayed message fragment.
+ * @param result_cls
+ * Closure for @a result_cb.
+ *
* @return Replay request handle, NULL on error.
*/
struct GNUNET_MULTICAST_MemberReplayHandle *
-GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *member,
+GNUNET_MULTICAST_member_replay_message (struct GNUNET_MULTICAST_Member *mem,
uint64_t message_id,
uint64_t fragment_offset,
uint64_t flags,
GNUNET_MULTICAST_ResultCallback result_cb,
- void *result_cb_cls)
+ void *result_cls)
{
- return NULL;
+ member_replay_request (mem, 0, message_id, fragment_offset, flags);
}