aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/transport/plugin_transport_udp.c90
1 files changed, 60 insertions, 30 deletions
diff --git a/src/transport/plugin_transport_udp.c b/src/transport/plugin_transport_udp.c
index e73712a830..1dcdeee983 100644
--- a/src/transport/plugin_transport_udp.c
+++ b/src/transport/plugin_transport_udp.c
@@ -696,14 +696,15 @@ call_continuation (struct UDP_MessageWrapper *udpw, int result)
"Calling continuation for %u byte message to `%s' with result %s\n",
udpw->payload_size, GNUNET_i2s (&udpw->session->target),
(GNUNET_OK == result) ? "OK" : "SYSERR");
- if (NULL == udpw->cont)
- return;
if (NULL == udpw->frag_ctx)
{
/* Not fragmented message */
if (GNUNET_OK == result)
{
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# unfragmented messages transmit with success via UDP",
+ 1, GNUNET_NO);
if (udpw->msg_size >= udpw->payload_size)
{
GNUNET_STATISTICS_update (plugin->env->stats,
@@ -714,7 +715,14 @@ call_continuation (struct UDP_MessageWrapper *udpw, int result)
"# bytes payload transmitted via UDP",
udpw->payload_size, GNUNET_NO);
}
- udpw->cont (udpw->cont_cls, &udpw->session->target, result,
+ else
+ {
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# unfragmented messages transmit with failure via UDP",
+ 1, GNUNET_NO);
+ }
+ if (NULL != udpw->cont)
+ udpw->cont (udpw->cont_cls, &udpw->session->target, result,
udpw->payload_size, udpw->msg_size);
}
else
@@ -723,7 +731,8 @@ call_continuation (struct UDP_MessageWrapper *udpw, int result)
if (GNUNET_OK == result)
{
/* Fragmented message: only call next_fragment continuation on success */
- udpw->cont (udpw->cont_cls, &udpw->session->target, result,
+ if (NULL != udpw->cont)
+ udpw->cont (udpw->cont_cls, &udpw->session->target, result,
udpw->payload_size, udpw->msg_size);
}
}
@@ -824,6 +833,23 @@ free_session (struct Session *s)
}
+static void
+dequeue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
+{
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# bytes currently in UDP buffers",
+ -udpw->msg_size, GNUNET_NO);
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# msgs currently in UDP buffers",
+ -1, GNUNET_NO);
+ if (udpw->session->addrlen == sizeof (struct sockaddr_in))
+ GNUNET_CONTAINER_DLL_remove (plugin->ipv4_queue_head,
+ plugin->ipv4_queue_tail, udpw);
+ if (udpw->session->addrlen == sizeof (struct sockaddr_in6))
+ GNUNET_CONTAINER_DLL_remove (plugin->ipv6_queue_head,
+ plugin->ipv6_queue_tail, udpw);
+}
+
/**
* Functions with this signature are called whenever we need
* to close a session due to a disconnect or failure to
@@ -850,7 +876,7 @@ disconnect_session (struct Session *s)
next = udpw->next;
if (udpw->session == s)
{
- GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+ dequeue (plugin, udpw);
call_continuation(udpw, GNUNET_SYSERR);
GNUNET_free (udpw);
}
@@ -861,7 +887,7 @@ disconnect_session (struct Session *s)
next = udpw->next;
if (udpw->session == s)
{
- GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+ dequeue (plugin, udpw);
call_continuation(udpw, GNUNET_SYSERR);
GNUNET_free (udpw);
}
@@ -1204,7 +1230,12 @@ udp_plugin_get_session (void *cls,
static void
enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
{
-
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# bytes currently in UDP buffers",
+ udpw->msg_size, GNUNET_NO);
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# msgs currently in UDP buffers",
+ 1, GNUNET_NO);
if (udpw->session->addrlen == sizeof (struct sockaddr_in))
GNUNET_CONTAINER_DLL_insert (plugin->ipv4_queue_head,
plugin->ipv4_queue_tail, udpw);
@@ -1214,6 +1245,7 @@ enqueue (struct Plugin *plugin, struct UDP_MessageWrapper * udpw)
}
+
/**
* Fragment message was transmitted via UDP, let fragmentation know
* to send the next fragment now.
@@ -1333,13 +1365,9 @@ udp_plugin_send (void *cls,
GNUNET_a2s(s->sock_addr, s->addrlen));
GNUNET_STATISTICS_update (plugin->env->stats,
- "# bytes currently in UDP buffers",
- msgbuf_size, GNUNET_NO);
- GNUNET_STATISTICS_update (plugin->env->stats,
"# bytes payload asked to transmit via UDP",
msgbuf_size, GNUNET_NO);
-
/* Message */
udp = (struct UDPMessage *) mbuf;
udp->header.size = htons (udpmlen);
@@ -1351,6 +1379,9 @@ udp_plugin_send (void *cls,
if (udpmlen <= UDP_MTU)
{
/* unfragmented message */
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# unfragmented messages asked to transmit via UDP",
+ 1, GNUNET_NO);
udpw = GNUNET_malloc (sizeof (struct UDP_MessageWrapper) + udpmlen);
udpw->session = s;
udpw->msg_buf = (char *) &udpw[1];
@@ -1366,7 +1397,12 @@ udp_plugin_send (void *cls,
}
else
{
+ // FIXME
+ return GNUNET_SYSERR;
/* fragmented message */
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# fragmented messages asked to transmit via UDP",
+ 1, GNUNET_NO);
if (s->frag_ctx != NULL)
return GNUNET_SYSERR;
memcpy (&udp[1], msgbuf, msgbuf_size);
@@ -1685,6 +1721,10 @@ ack_proc (void *cls, uint32_t id, const struct GNUNET_MessageHeader *msg)
udp_ack->sender = *rc->plugin->env->my_identity;
memcpy (&udp_ack[1], msg, ntohs (msg->size));
+ GNUNET_STATISTICS_update (plugin->env->stats,
+ "# messages ACKs transmitted via UDP",
+ 1, GNUNET_NO);
+
enqueue (rc->plugin, udpw);
}
@@ -1775,7 +1815,7 @@ read_process_ack (struct Plugin *plugin,
tmp = udpw->next;
if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
{
- GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+ dequeue (plugin, udpw);
GNUNET_free (udpw);
}
udpw = tmp;
@@ -1789,7 +1829,7 @@ read_process_ack (struct Plugin *plugin,
tmp = udpw->next;
if ((udpw->frag_ctx != NULL) && (udpw->frag_ctx == s->frag_ctx))
{
- GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+ dequeue (plugin, udpw);
GNUNET_free (udpw);
}
udpw = tmp;
@@ -2001,22 +2041,18 @@ remove_timeout_messages_and_select (struct UDP_MessageWrapper *head,
}
GNUNET_STATISTICS_update (plugin->env->stats,
- "# bytes currently in UDP buffers",
- -udpw->msg_size, GNUNET_NO);
-
- GNUNET_STATISTICS_update (plugin->env->stats,
"# messages dismissed due to timeout",
1, GNUNET_NO);
/* Remove message */
if (sock == plugin->sockv4)
{
- GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+ dequeue (plugin, udpw);
GNUNET_free (udpw);
udpw = plugin->ipv4_queue_head;
}
if (sock == plugin->sockv6)
{
- GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+ dequeue (plugin, udpw);
GNUNET_free (udpw);
udpw = plugin->ipv6_queue_head;
}
@@ -2135,15 +2171,7 @@ udp_select_send (struct Plugin *plugin, struct GNUNET_NETWORK_Handle *sock)
udpw->frag_ctx->on_wire_size += udpw->msg_size;
call_continuation (udpw, GNUNET_OK);
}
-
- GNUNET_STATISTICS_update (plugin->env->stats,
- "# bytes currently in UDP buffers",
- -udpw->msg_size, GNUNET_NO);
-
- if (sock == plugin->sockv4)
- GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
- else if (sock == plugin->sockv6)
- GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+ dequeue (plugin, udpw);
GNUNET_free (udpw);
udpw = NULL;
@@ -2610,18 +2638,20 @@ libgnunet_plugin_transport_udp_done (void *cls)
while (udpw != NULL)
{
struct UDP_MessageWrapper *tmp = udpw->next;
- GNUNET_CONTAINER_DLL_remove(plugin->ipv4_queue_head, plugin->ipv4_queue_tail, udpw);
+ dequeue (plugin, udpw);
call_continuation(udpw, GNUNET_SYSERR);
GNUNET_free (udpw);
+
udpw = tmp;
}
udpw = plugin->ipv6_queue_head;
while (udpw != NULL)
{
struct UDP_MessageWrapper *tmp = udpw->next;
- GNUNET_CONTAINER_DLL_remove(plugin->ipv6_queue_head, plugin->ipv6_queue_tail, udpw);
+ dequeue (plugin, udpw);
call_continuation(udpw, GNUNET_SYSERR);
GNUNET_free (udpw);
+
udpw = tmp;
}