aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2013-03-14 16:03:30 +0000
committerChristian Grothoff <christian@grothoff.org>2013-03-14 16:03:30 +0000
commit20b65432c8df1b2f84fcc0dac3b2f5d689f5c888 (patch)
tree6183cf957cffc5e9b415d35d119f8faa897513f4 /src
parent38144d2375191205912027c97a5af6c7bbe87bfe (diff)
-generate and process ACKs
Diffstat (limited to 'src')
-rw-r--r--src/dv/dv.h2
-rw-r--r--src/dv/dv_api.c91
-rw-r--r--src/dv/gnunet-service-dv.c34
3 files changed, 109 insertions, 18 deletions
diff --git a/src/dv/dv.h b/src/dv/dv.h
index ea5215a10b..bcc586a0e6 100644
--- a/src/dv/dv.h
+++ b/src/dv/dv.h
@@ -121,7 +121,7 @@ struct GNUNET_DV_SendMessage
struct GNUNET_MessageHeader header;
/**
- * Unique ID for this message, for confirm callback.
+ * Unique ID for this message, for confirm callback, must never be zero.
*/
uint32_t uid GNUNET_PACKED;
diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c
index 4b6cc1a5cf..fff0896b3e 100644
--- a/src/dv/dv_api.c
+++ b/src/dv/dv_api.c
@@ -74,6 +74,11 @@ struct GNUNET_DV_TransmitHandle
*/
struct GNUNET_PeerIdentity target;
+ /**
+ * UID of our message, if any.
+ */
+ uint32_t uid;
+
};
@@ -184,10 +189,17 @@ transmit_pending (void *cls, size_t size, void *buf)
th);
memcpy (&cbuf[ret], th->msg, tsize);
ret += tsize;
- (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks,
- &th->target.hashPubKey,
- th,
- GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ if (NULL != th->cb)
+ {
+ (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks,
+ &th->target.hashPubKey,
+ th,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ }
+ else
+ {
+ GNUNET_free (th);
+ }
}
return ret;
}
@@ -215,6 +227,54 @@ start_transmit (struct GNUNET_DV_ServiceHandle *sh)
/**
+ * Closure for 'process_ack'.
+ */
+struct AckContext
+{
+ /**
+ * The ACK message.
+ */
+ const struct GNUNET_DV_AckMessage *ack;
+
+ /**
+ * Our service handle.
+ */
+ struct GNUNET_DV_ServiceHandle *sh;
+};
+
+
+/**
+ * We got an ACK. Check if it matches the given transmit handle, and if
+ * so call the continuation.
+ *
+ * @param cls the 'struct AckContext'
+ * @param key peer identity
+ * @param value the 'struct GNUNET_DV_TransmitHandle'
+ * @return GNUNET_OK if the ACK did not match (continue to iterate)
+ */
+static int
+process_ack (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct AckContext *ctx = cls;
+ struct GNUNET_DV_TransmitHandle *th = value;
+
+ if (th->uid != ntohl (ctx->ack->uid))
+ return GNUNET_OK;
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multihashmap_remove (ctx->sh->send_callbacks,
+ key,
+ th));
+ /* FIXME: should distinguish between success and failure here... */
+ th->cb (th->cb_cls,
+ GNUNET_OK);
+ GNUNET_free (th);
+ return GNUNET_NO;
+}
+
+
+/**
* Handles a message sent from the DV service to us.
* Parse it out and give it to the plugin.
*
@@ -230,7 +290,9 @@ handle_message_receipt (void *cls,
const struct GNUNET_DV_DisconnectMessage *dm;
const struct GNUNET_DV_ReceivedMessage *rm;
const struct GNUNET_MessageHeader *payload;
-
+ const struct GNUNET_DV_AckMessage *ack;
+ struct AckContext ctx;
+
if (NULL == msg)
{
/* Connection closed */
@@ -282,6 +344,21 @@ handle_message_receipt (void *cls,
ntohl (rm->distance),
payload);
break;
+ case GNUNET_MESSAGE_TYPE_DV_SEND_ACK:
+ if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage))
+ {
+ GNUNET_break (0);
+ reconnect (sh);
+ return;
+ }
+ ack = (const struct GNUNET_DV_AckMessage *) msg;
+ ctx.ack = ack;
+ ctx.sh = sh;
+ GNUNET_CONTAINER_multihashmap_get_multiple (sh->send_callbacks,
+ &ack->target.hashPubKey,
+ &process_ack,
+ &ctx);
+ return;
default:
reconnect (sh);
break;
@@ -495,6 +572,10 @@ GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh,
sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND);
sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) +
ntohs (msg->size));
+ if (0 == sh->uid_gen)
+ sh->uid_gen = 1;
+ th->uid = sh->uid_gen;
+ sm->uid = htonl (sh->uid_gen++);
/* use memcpy here as 'target' may not be sufficiently aligned */
memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity));
memcpy (&sm[1], msg, ntohs (msg->size));
diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c
index c42d9344ee..e1709be3d0 100644
--- a/src/dv/gnunet-service-dv.c
+++ b/src/dv/gnunet-service-dv.c
@@ -28,9 +28,6 @@
* @author Nathan Evans
*
* TODO:
- * - even _local_ flow control (send ACK only after core took our message) is
- * not implemented, but should be (easy fix, but needs adjustments to data
- * structures)
* - distance updates are not properly communicate to US by core,
* and conversely we don't give distance updates properly to the plugin yet
* - we send 'ACK' even if a message was dropped due to no route (may
@@ -420,12 +417,12 @@ transmit_to_plugin (void *cls, size_t size, void *buf)
* Forward a message from another peer to the plugin.
*
* @param message the message to send to the plugin
- * @param distant_neighbor the original sender of the message
+ * @param origin the original sender of the message
* @param distnace distance to the original sender of the message
*/
static void
send_data_to_plugin (const struct GNUNET_MessageHeader *message,
- const struct GNUNET_PeerIdentity *distant_neighbor,
+ const struct GNUNET_PeerIdentity *origin,
uint32_t distance)
{
struct GNUNET_DV_ReceivedMessage *received_msg;
@@ -443,7 +440,7 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message,
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Delivering message from peer `%s'\n",
- GNUNET_i2s (distant_neighbor));
+ GNUNET_i2s (origin));
size = sizeof (struct GNUNET_DV_ReceivedMessage) +
ntohs (message->size);
if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
@@ -456,7 +453,7 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message,
received_msg->header.size = htons (size);
received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV);
received_msg->distance = htonl (distance);
- received_msg->sender = *distant_neighbor;
+ received_msg->sender = *origin;
memcpy (&received_msg[1], message, ntohs (message->size));
GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head,
plugin_pending_tail,
@@ -611,8 +608,9 @@ core_transmit_notify (void *cls, size_t size, void *buf)
dn->pm_tail,
pending);
memcpy (&cbuf[off], pending->msg, msize);
- send_ack_to_plugin (&pending->ultimate_target,
- pending->uid);
+ if (0 != pending->uid)
+ send_ack_to_plugin (&pending->ultimate_target,
+ pending->uid);
GNUNET_free (pending);
off += msize;
}
@@ -633,6 +631,8 @@ core_transmit_notify (void *cls, size_t size, void *buf)
* Forward the given payload to the given target.
*
* @param target where to send the message
+ * @param uid unique ID for the message
+ * @param ultimate_target ultimate recipient for the message
* @param distance expected (remaining) distance to the target
* @param sender original sender of the message
* @param payload payload of the message
@@ -640,7 +640,9 @@ core_transmit_notify (void *cls, size_t size, void *buf)
static void
forward_payload (struct DirectNeighbor *target,
uint32_t distance,
+ uint32_t uid,
const struct GNUNET_PeerIdentity *sender,
+ const struct GNUNET_PeerIdentity *ultimate_target,
const struct GNUNET_MessageHeader *payload)
{
struct PendingMessage *pm;
@@ -651,7 +653,10 @@ forward_payload (struct DirectNeighbor *target,
(0 != memcmp (sender,
&my_identity,
sizeof (struct GNUNET_PeerIdentity))) )
+ {
+ GNUNET_break (0 == uid);
return;
+ }
msize = sizeof (struct RouteMessage) + ntohs (payload->size);
if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE)
{
@@ -659,6 +664,8 @@ forward_payload (struct DirectNeighbor *target,
return;
}
pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize);
+ pm->ultimate_target = *ultimate_target;
+ pm->uid = uid;
pm->msg = (const struct GNUNET_MessageHeader *) &pm[1];
rm = (struct RouteMessage *) &pm[1];
rm->header.size = htons ((uint16_t) msize);
@@ -1271,6 +1278,8 @@ handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer,
}
forward_payload (route->next_hop,
ntohl (route->target.distance),
+ 0,
+ &rm->target,
&rm->sender,
payload);
return GNUNET_OK;
@@ -1300,6 +1309,7 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
return;
}
msg = (const struct GNUNET_DV_SendMessage *) message;
+ GNUNET_break (0 != ntohl (msg->uid));
payload = (const struct GNUNET_MessageHeader *) &msg[1];
if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size))
{
@@ -1316,14 +1326,14 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client,
GNUNET_STATISTICS_update (stats,
"# local messages discarded (no route)",
1, GNUNET_NO);
- send_ack_to_plugin (&msg->target, htonl (msg->uid));
+ send_ack_to_plugin (&msg->target, ntohl (msg->uid));
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return;
}
- // FIXME: flow control (send ACK only once message has left the queue...)
- send_ack_to_plugin (&msg->target, htonl (msg->uid));
forward_payload (route->next_hop,
ntohl (route->target.distance),
+ htonl (msg->uid),
+ &msg->target,
&my_identity,
payload);
GNUNET_SERVER_receive_done (client, GNUNET_OK);