diff options
author | Christian Grothoff <christian@grothoff.org> | 2013-03-14 16:03:30 +0000 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2013-03-14 16:03:30 +0000 |
commit | 20b65432c8df1b2f84fcc0dac3b2f5d689f5c888 (patch) | |
tree | 6183cf957cffc5e9b415d35d119f8faa897513f4 /src | |
parent | 38144d2375191205912027c97a5af6c7bbe87bfe (diff) |
-generate and process ACKs
Diffstat (limited to 'src')
-rw-r--r-- | src/dv/dv.h | 2 | ||||
-rw-r--r-- | src/dv/dv_api.c | 91 | ||||
-rw-r--r-- | src/dv/gnunet-service-dv.c | 34 |
3 files changed, 109 insertions, 18 deletions
diff --git a/src/dv/dv.h b/src/dv/dv.h index ea5215a10b..bcc586a0e6 100644 --- a/src/dv/dv.h +++ b/src/dv/dv.h @@ -121,7 +121,7 @@ struct GNUNET_DV_SendMessage struct GNUNET_MessageHeader header; /** - * Unique ID for this message, for confirm callback. + * Unique ID for this message, for confirm callback, must never be zero. */ uint32_t uid GNUNET_PACKED; diff --git a/src/dv/dv_api.c b/src/dv/dv_api.c index 4b6cc1a5cf..fff0896b3e 100644 --- a/src/dv/dv_api.c +++ b/src/dv/dv_api.c @@ -74,6 +74,11 @@ struct GNUNET_DV_TransmitHandle */ struct GNUNET_PeerIdentity target; + /** + * UID of our message, if any. + */ + uint32_t uid; + }; @@ -184,10 +189,17 @@ transmit_pending (void *cls, size_t size, void *buf) th); memcpy (&cbuf[ret], th->msg, tsize); ret += tsize; - (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks, - &th->target.hashPubKey, - th, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + if (NULL != th->cb) + { + (void) GNUNET_CONTAINER_multihashmap_put (sh->send_callbacks, + &th->target.hashPubKey, + th, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); + } + else + { + GNUNET_free (th); + } } return ret; } @@ -215,6 +227,54 @@ start_transmit (struct GNUNET_DV_ServiceHandle *sh) /** + * Closure for 'process_ack'. + */ +struct AckContext +{ + /** + * The ACK message. + */ + const struct GNUNET_DV_AckMessage *ack; + + /** + * Our service handle. + */ + struct GNUNET_DV_ServiceHandle *sh; +}; + + +/** + * We got an ACK. Check if it matches the given transmit handle, and if + * so call the continuation. + * + * @param cls the 'struct AckContext' + * @param key peer identity + * @param value the 'struct GNUNET_DV_TransmitHandle' + * @return GNUNET_OK if the ACK did not match (continue to iterate) + */ +static int +process_ack (void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct AckContext *ctx = cls; + struct GNUNET_DV_TransmitHandle *th = value; + + if (th->uid != ntohl (ctx->ack->uid)) + return GNUNET_OK; + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (ctx->sh->send_callbacks, + key, + th)); + /* FIXME: should distinguish between success and failure here... */ + th->cb (th->cb_cls, + GNUNET_OK); + GNUNET_free (th); + return GNUNET_NO; +} + + +/** * Handles a message sent from the DV service to us. * Parse it out and give it to the plugin. * @@ -230,7 +290,9 @@ handle_message_receipt (void *cls, const struct GNUNET_DV_DisconnectMessage *dm; const struct GNUNET_DV_ReceivedMessage *rm; const struct GNUNET_MessageHeader *payload; - + const struct GNUNET_DV_AckMessage *ack; + struct AckContext ctx; + if (NULL == msg) { /* Connection closed */ @@ -282,6 +344,21 @@ handle_message_receipt (void *cls, ntohl (rm->distance), payload); break; + case GNUNET_MESSAGE_TYPE_DV_SEND_ACK: + if (ntohs (msg->size) != sizeof (struct GNUNET_DV_AckMessage)) + { + GNUNET_break (0); + reconnect (sh); + return; + } + ack = (const struct GNUNET_DV_AckMessage *) msg; + ctx.ack = ack; + ctx.sh = sh; + GNUNET_CONTAINER_multihashmap_get_multiple (sh->send_callbacks, + &ack->target.hashPubKey, + &process_ack, + &ctx); + return; default: reconnect (sh); break; @@ -495,6 +572,10 @@ GNUNET_DV_send (struct GNUNET_DV_ServiceHandle *sh, sm->header.type = htons (GNUNET_MESSAGE_TYPE_DV_SEND); sm->header.size = htons (sizeof (struct GNUNET_DV_SendMessage) + ntohs (msg->size)); + if (0 == sh->uid_gen) + sh->uid_gen = 1; + th->uid = sh->uid_gen; + sm->uid = htonl (sh->uid_gen++); /* use memcpy here as 'target' may not be sufficiently aligned */ memcpy (&sm->target, target, sizeof (struct GNUNET_PeerIdentity)); memcpy (&sm[1], msg, ntohs (msg->size)); diff --git a/src/dv/gnunet-service-dv.c b/src/dv/gnunet-service-dv.c index c42d9344ee..e1709be3d0 100644 --- a/src/dv/gnunet-service-dv.c +++ b/src/dv/gnunet-service-dv.c @@ -28,9 +28,6 @@ * @author Nathan Evans * * TODO: - * - even _local_ flow control (send ACK only after core took our message) is - * not implemented, but should be (easy fix, but needs adjustments to data - * structures) * - distance updates are not properly communicate to US by core, * and conversely we don't give distance updates properly to the plugin yet * - we send 'ACK' even if a message was dropped due to no route (may @@ -420,12 +417,12 @@ transmit_to_plugin (void *cls, size_t size, void *buf) * Forward a message from another peer to the plugin. * * @param message the message to send to the plugin - * @param distant_neighbor the original sender of the message + * @param origin the original sender of the message * @param distnace distance to the original sender of the message */ static void send_data_to_plugin (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *distant_neighbor, + const struct GNUNET_PeerIdentity *origin, uint32_t distance) { struct GNUNET_DV_ReceivedMessage *received_msg; @@ -443,7 +440,7 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Delivering message from peer `%s'\n", - GNUNET_i2s (distant_neighbor)); + GNUNET_i2s (origin)); size = sizeof (struct GNUNET_DV_ReceivedMessage) + ntohs (message->size); if (size >= GNUNET_SERVER_MAX_MESSAGE_SIZE) @@ -456,7 +453,7 @@ send_data_to_plugin (const struct GNUNET_MessageHeader *message, received_msg->header.size = htons (size); received_msg->header.type = htons (GNUNET_MESSAGE_TYPE_DV_RECV); received_msg->distance = htonl (distance); - received_msg->sender = *distant_neighbor; + received_msg->sender = *origin; memcpy (&received_msg[1], message, ntohs (message->size)); GNUNET_CONTAINER_DLL_insert_tail (plugin_pending_head, plugin_pending_tail, @@ -611,8 +608,9 @@ core_transmit_notify (void *cls, size_t size, void *buf) dn->pm_tail, pending); memcpy (&cbuf[off], pending->msg, msize); - send_ack_to_plugin (&pending->ultimate_target, - pending->uid); + if (0 != pending->uid) + send_ack_to_plugin (&pending->ultimate_target, + pending->uid); GNUNET_free (pending); off += msize; } @@ -633,6 +631,8 @@ core_transmit_notify (void *cls, size_t size, void *buf) * Forward the given payload to the given target. * * @param target where to send the message + * @param uid unique ID for the message + * @param ultimate_target ultimate recipient for the message * @param distance expected (remaining) distance to the target * @param sender original sender of the message * @param payload payload of the message @@ -640,7 +640,9 @@ core_transmit_notify (void *cls, size_t size, void *buf) static void forward_payload (struct DirectNeighbor *target, uint32_t distance, + uint32_t uid, const struct GNUNET_PeerIdentity *sender, + const struct GNUNET_PeerIdentity *ultimate_target, const struct GNUNET_MessageHeader *payload) { struct PendingMessage *pm; @@ -651,7 +653,10 @@ forward_payload (struct DirectNeighbor *target, (0 != memcmp (sender, &my_identity, sizeof (struct GNUNET_PeerIdentity))) ) + { + GNUNET_break (0 == uid); return; + } msize = sizeof (struct RouteMessage) + ntohs (payload->size); if (msize >= GNUNET_SERVER_MAX_MESSAGE_SIZE) { @@ -659,6 +664,8 @@ forward_payload (struct DirectNeighbor *target, return; } pm = GNUNET_malloc (sizeof (struct PendingMessage) + msize); + pm->ultimate_target = *ultimate_target; + pm->uid = uid; pm->msg = (const struct GNUNET_MessageHeader *) &pm[1]; rm = (struct RouteMessage *) &pm[1]; rm->header.size = htons ((uint16_t) msize); @@ -1271,6 +1278,8 @@ handle_dv_route_message (void *cls, const struct GNUNET_PeerIdentity *peer, } forward_payload (route->next_hop, ntohl (route->target.distance), + 0, + &rm->target, &rm->sender, payload); return GNUNET_OK; @@ -1300,6 +1309,7 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, return; } msg = (const struct GNUNET_DV_SendMessage *) message; + GNUNET_break (0 != ntohl (msg->uid)); payload = (const struct GNUNET_MessageHeader *) &msg[1]; if (ntohs (message->size) != sizeof (struct GNUNET_DV_SendMessage) + ntohs (payload->size)) { @@ -1316,14 +1326,14 @@ handle_dv_send_message (void *cls, struct GNUNET_SERVER_Client *client, GNUNET_STATISTICS_update (stats, "# local messages discarded (no route)", 1, GNUNET_NO); - send_ack_to_plugin (&msg->target, htonl (msg->uid)); + send_ack_to_plugin (&msg->target, ntohl (msg->uid)); GNUNET_SERVER_receive_done (client, GNUNET_OK); return; } - // FIXME: flow control (send ACK only once message has left the queue...) - send_ack_to_plugin (&msg->target, htonl (msg->uid)); forward_payload (route->next_hop, ntohl (route->target.distance), + htonl (msg->uid), + &msg->target, &my_identity, payload); GNUNET_SERVER_receive_done (client, GNUNET_OK); |