diff options
-rw-r--r-- | src/transport/plugin_transport_udp.c | 90 |
1 files changed, 60 insertions, 30 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c index e73712a830..1dcdeee983 100644 --- a/src/transport/plugin_transport_udp.c +++ b/src/transport/plugin_transport_udp.c @@ -696,14 +696,15 @@ call_continuation (struct UDP_MessageWrapper *udpw, int result) "Calling continuation for %u byte message to `%s' with result %s\n", udpw->payload_size, GNUNET_i2s (&udpw->session->target), (GNUNET_OK == result) ? "OK" : "SYSERR"); - if (NULL == udpw->cont) - return; if (NULL == udpw->frag_ctx) { /* Not fragmented message */ if (GNUNET_OK == result) { + GNUNET_STATISTICS_update (plugin->env->stats, + "# unfragmented messages transmit with success via UDP", + 1, GNUNET_NO); if (udpw->msg_size >= udpw->payload_size) { GNUNET_STATISTICS_update (plugin->env->stats, @@ -714,7 +715,14 @@ call_continuation (struct UDP_MessageWrapper *udpw, int result) "# bytes payload transmitted via UDP", udpw->payload_size, GNUNET_NO); } - udpw->cont (udpw->cont_cls, &udpw->session->target, result, + else + { + GNUNET_STATISTICS_update (plugin->env->stats, + "# unfragmented messages transmit with failure via UDP", + 1, GNUNET_NO); + } + if (NULL != udpw->cont) + udpw->cont (udpw->cont_cls, &udpw->session->target, result, udpw->payload_size, udpw->msg_size); } else @@ -723,7 +731,8 @@ call_continuation (struct UDP_MessageWrapper *udpw, int result) if (GNUNET_OK == result) { /* Fragmented message: only call next_fragment continuation on success */ - udpw->cont (udpw->cont_cls, &udpw->session->target, result, + if (NULL != udpw->cont) + udpw->cont (udpw->cont_cls, &udpw->session->target, result, udpw->payload_size, udpw->msg_size); } } @@ -824,6 +833,23 @@ free_session (struct Session *s) } +static void +dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw) +{ + GNUNET_STATISTICS_update (plugin->env->stats, + "# bytes currently in UDP buffers", + -udpw->msg_size, GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# msgs currently in UDP buffers", + -1, GNUNET_NO); + if (udpw->session->addrlen == sizeof (struct sockaddr_in)) + GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head, + plugin->ipv4_queue_tail, udpw); + if (udpw->session->addrlen == sizeof (struct sockaddr_in6)) + GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head, + plugin->ipv6_queue_tail, udpw); +} + /** * Functions with this signature are called whenever we need * to close a session due to a disconnect or failure to @@ -850,7 +876,7 @@ disconnect_session (struct Session *s) next = udpw->next; if (udpw->session == s) { - GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw); + dequeue (plugin, udpw); call_continuation(udpw, GNUNET_SYSERR); GNUNET_free (udpw); } @@ -861,7 +887,7 @@ disconnect_session (struct Session *s) next = udpw->next; if (udpw->session == s) { - GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw); + dequeue (plugin, udpw); call_continuation(udpw, GNUNET_SYSERR); GNUNET_free (udpw); } @@ -1204,7 +1230,12 @@ udp_plugin_get_session (void *cls, static void enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw) { - + GNUNET_STATISTICS_update (plugin->env->stats, + "# bytes currently in UDP buffers", + udpw->msg_size, GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + "# msgs currently in UDP buffers", + 1, GNUNET_NO); if (udpw->session->addrlen == sizeof (struct sockaddr_in)) GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw); @@ -1214,6 +1245,7 @@ enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw) } + /** * Fragment message was transmitted via UDP, let fragmentation know * to send the next fragment now. @@ -1333,13 +1365,9 @@ udp_plugin_send (void *cls, GNUNET_a2s(s->sock_addr, s->addrlen)); GNUNET_STATISTICS_update (plugin->env->stats, - "# bytes currently in UDP buffers", - msgbuf_size, GNUNET_NO); - GNUNET_STATISTICS_update (plugin->env->stats, "# bytes payload asked to transmit via UDP", msgbuf_size, GNUNET_NO); - /* Message */ udp = (struct UDPMessage *) mbuf; udp->header.size = htons (udpmlen); @@ -1351,6 +1379,9 @@ udp_plugin_send (void *cls, if (udpmlen <= UDP_MTU) { /* unfragmented message */ + GNUNET_STATISTICS_update (plugin->env->stats, + "# unfragmented messages asked to transmit via UDP", + 1, GNUNET_NO); udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen); udpw->session = s; udpw->msg_buf = (char *) &udpw[1]; @@ -1366,7 +1397,12 @@ udp_plugin_send (void *cls, } else { + // FIXME + return GNUNET_SYSERR; /* fragmented message */ + GNUNET_STATISTICS_update (plugin->env->stats, + "# fragmented messages asked to transmit via UDP", + 1, GNUNET_NO); if (s->frag_ctx != NULL) return GNUNET_SYSERR; memcpy (&udp[1], msgbuf, msgbuf_size); @@ -1685,6 +1721,10 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg) udp_ack->sender = *rc->plugin->env->my_identity; memcpy (&udp_ack[1], msg, ntohs (msg->size)); + GNUNET_STATISTICS_update (plugin->env->stats, + "# messages ACKs transmitted via UDP", + 1, GNUNET_NO); + enqueue (rc->plugin, udpw); } @@ -1775,7 +1815,7 @@ read_process_ack (struct Plugin *plugin, tmp = udpw->next; if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx)) { - GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw); + dequeue (plugin, udpw); GNUNET_free (udpw); } udpw = tmp; @@ -1789,7 +1829,7 @@ read_process_ack (struct Plugin *plugin, tmp = udpw->next; if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx)) { - GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw); + dequeue (plugin, udpw); GNUNET_free (udpw); } udpw = tmp; @@ -2001,22 +2041,18 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head, } GNUNET_STATISTICS_update (plugin->env->stats, - "# bytes currently in UDP buffers", - -udpw->msg_size, GNUNET_NO); - - GNUNET_STATISTICS_update (plugin->env->stats, "# messages dismissed due to timeout", 1, GNUNET_NO); /* Remove message */ if (sock == plugin->sockv4) { - GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw); + dequeue (plugin, udpw); GNUNET_free (udpw); udpw = plugin->ipv4_queue_head; } if (sock == plugin->sockv6) { - GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw); + dequeue (plugin, udpw); GNUNET_free (udpw); udpw = plugin->ipv6_queue_head; } @@ -2135,15 +2171,7 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock) udpw->frag_ctx->on_wire_size += udpw->msg_size; call_continuation (udpw, GNUNET_OK); } - - GNUNET_STATISTICS_update (plugin->env->stats, - "# bytes currently in UDP buffers", - -udpw->msg_size, GNUNET_NO); - - if (sock == plugin->sockv4) - GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw); - else if (sock == plugin->sockv6) - GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw); + dequeue (plugin, udpw); GNUNET_free (udpw); udpw = NULL; @@ -2610,18 +2638,20 @@ libgnunet_plugin_transport_udp_done (void *cls) while (udpw != NULL) { struct UDP_MessageWrapper *tmp = udpw->next; - GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw); + dequeue (plugin, udpw); call_continuation(udpw, GNUNET_SYSERR); GNUNET_free (udpw); + udpw = tmp; } udpw = plugin->ipv6_queue_head; while (udpw != NULL) { struct UDP_MessageWrapper *tmp = udpw->next; - GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw); + dequeue (plugin, udpw); call_continuation(udpw, GNUNET_SYSERR); GNUNET_free (udpw); + udpw = tmp; } |