aboutsummaryrefslogtreecommitdiff
path: root/src/transport/plugin_transport_http_client.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/plugin_transport_http_client.c')
-rw-r--r--src/transport/plugin_transport_http_client.c199
1 files changed, 102 insertions, 97 deletions
diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c
index 4679e45..f55311f 100644
--- a/src/transport/plugin_transport_http_client.c
+++ b/src/transport/plugin_transport_http_client.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and other contributing authors)
+ (C) 2002--2012 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -26,6 +26,8 @@
#include "plugin_transport_http.h"
+static struct Plugin * p;
+
#if VERBOSE_CURL
/**
* Function to log curl debug messages with GNUNET_log
@@ -53,10 +55,10 @@ client_log (CURL * curl, curl_infotype type, char *data, size_t size, void *cls)
}
#if BUILD_HTTPS
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-https",
- "Client: %X - %s", cls, text);
+ "Client: %p - %s", cls, text);
#else
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-http",
- "Client: %X - %s", cls, text);
+ "Client: %p - %s", cls, text);
#endif
}
return 0;
@@ -71,6 +73,7 @@ client_log (CURL * curl, curl_infotype type, char *data, size_t size, void *cls)
static void
client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
/**
* Function setting up file descriptors and scheduling task to run
*
@@ -97,7 +100,6 @@ client_schedule (struct Plugin *plugin, int now)
GNUNET_SCHEDULER_cancel (plugin->client_perform_task);
plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK;
}
-
max = -1;
FD_ZERO (&rs);
FD_ZERO (&ws);
@@ -133,7 +135,7 @@ client_schedule (struct Plugin *plugin, int now)
plugin->client_perform_task =
GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT,
- GNUNET_SCHEDULER_NO_TASK, timeout, grs, gws,
+ timeout, grs, gws,
&client_run, plugin);
GNUNET_NETWORK_fdset_destroy (gws);
GNUNET_NETWORK_fdset_destroy (grs);
@@ -145,27 +147,29 @@ int
client_send (struct Session *s, struct HTTP_Message *msg)
{
GNUNET_assert (s != NULL);
- GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg);
+ GNUNET_CONTAINER_DLL_insert_tail (s->msg_head, s->msg_tail, msg);
+ if (GNUNET_YES != exist_session(p, s))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
if (s->client_put_paused == GNUNET_YES)
{
-#if VERBOSE_CLIENT
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name,
- "Client: %X was suspended, unpausing\n", s->client_put);
-#endif
+ "Client: %p was suspended, unpausing\n", s->client_put);
s->client_put_paused = GNUNET_NO;
curl_easy_pause (s->client_put, CURLPAUSE_CONT);
}
-
client_schedule (s->plugin, GNUNET_YES);
return GNUNET_OK;
}
-
/**
* Task performing curl operations
+ *
* @param cls plugin as closure
* @param tc gnunet scheduler task context
*/
@@ -210,12 +214,19 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
GNUNET_assert (CURLE_OK ==
curl_easy_getinfo (easy_h, CURLINFO_PRIVATE, &d));
s = (struct Session *) d;
+
+ if (GNUNET_YES != exist_session(plugin, s))
+ {
+ GNUNET_break (0);
+ return;
+ }
+
GNUNET_assert (s != NULL);
if (msg->msg == CURLMSG_DONE)
{
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Client: %X connection to '%s' %s ended with reason %i: `%s'\n",
+ "Client: %p connection to '%s' %s ended with reason %i: `%s'\n",
msg->easy_handle, GNUNET_i2s (&s->target),
http_plugin_address_to_string (NULL, s->addr,
s->addrlen),
@@ -223,7 +234,11 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
curl_easy_strerror (msg->data.result));
client_disconnect (s);
- //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen));
+ GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+ plugin->name,
+ "Notifying about ended session to peer `%s' `%s'\n",
+ GNUNET_i2s (&s->target),
+ http_plugin_address_to_string (plugin, s->addr, s->addrlen));
notify_session_end (plugin, &s->target, s);
}
}
@@ -232,6 +247,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
client_schedule (plugin, GNUNET_NO);
}
+
int
client_disconnect (struct Session *s)
{
@@ -241,15 +257,17 @@ client_disconnect (struct Session *s)
struct HTTP_Message *msg;
struct HTTP_Message *t;
-
+ if (GNUNET_YES != exist_session(plugin, s))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
if (s->client_put != NULL)
{
-#if DEBUG_HTTP
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Client: %X Deleting outbound PUT session to peer `%s'\n",
+ "Client: %p Deleting outbound PUT session to peer `%s'\n",
s->client_put, GNUNET_i2s (&s->target));
-#endif
mret = curl_multi_remove_handle (plugin->client_mh, s->client_put);
if (mret != CURLM_OK)
@@ -271,11 +289,9 @@ client_disconnect (struct Session *s)
if (s->client_get != NULL)
{
-#if DEBUG_HTTP
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Client: %X Deleting outbound GET session to peer `%s'\n",
+ "Client: %p Deleting outbound GET session to peer `%s'\n",
s->client_get, GNUNET_i2s (&s->target));
-#endif
mret = curl_multi_remove_handle (plugin->client_mh, s->client_get);
if (mret != CURLM_OK)
@@ -300,6 +316,14 @@ client_disconnect (struct Session *s)
}
plugin->cur_connections -= 2;
+
+ GNUNET_assert (plugin->outbound_sessions > 0);
+ plugin->outbound_sessions --;
+ GNUNET_STATISTICS_set (plugin->env->stats,
+ "# HTTP outbound sessions",
+ plugin->outbound_sessions,
+ GNUNET_NO);
+
/* Re-schedule since handles have changed */
if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK)
{
@@ -312,84 +336,86 @@ client_disconnect (struct Session *s)
return res;
}
-static void
+static int
client_receive_mst_cb (void *cls, void *client,
const struct GNUNET_MessageHeader *message)
{
struct Session *s = cls;
struct GNUNET_TIME_Relative delay;
+ if (GNUNET_YES != exist_session(p, s))
+ {
+ GNUNET_break (0);
+ return GNUNET_OK;
+ }
+
delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen);
s->next_receive =
GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), delay);
if (GNUNET_TIME_absolute_get ().abs_value < s->next_receive.abs_value)
{
-#if VERBOSE_CLIENT
struct Plugin *plugin = s->plugin;
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
"Client: peer `%s' address `%s' next read delayed for %llu ms\n",
GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen),
delay);
-#endif
}
+ return GNUNET_OK;
}
+
static void
client_wake_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Session *s = cls;
+ if (GNUNET_YES != exist_session(p, s))
+ {
+ GNUNET_break (0);
+ return;
+ }
s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK;
-
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
-
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name,
- "Client: %X Waking up receive handle\n", s->client_get);
-
+ "Client: %p Waking up receive handle\n", s->client_get);
if (s->client_get != NULL)
curl_easy_pause (s->client_get, CURLPAUSE_CONT);
-
}
+
/**
-* Callback method used with libcurl
-* Method is called when libcurl needs to write data during sending
-* @param stream pointer where to write data
-* @param size size of an individual element
-* @param nmemb count of elements that can be written to the buffer
-* @param cls destination pointer, passed to the libcurl handle
-* @return bytes read from stream
-*/
+ * Callback method used with libcurl
+ * Method is called when libcurl needs to write data during sending
+ *
+ * @param stream pointer where to write data
+ * @param size size of an individual element
+ * @param nmemb count of elements that can be written to the buffer
+ * @param cls destination pointer, passed to the libcurl handle
+ * @return bytes read from stream
+ */
static size_t
client_receive (void *stream, size_t size, size_t nmemb, void *cls)
{
struct Session *s = cls;
struct GNUNET_TIME_Absolute now;
size_t len = size * nmemb;
-
-
-#if VERBOSE_CLIENT
struct Plugin *plugin = s->plugin;
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
"Client: Received %Zu bytes from peer `%s'\n", len,
GNUNET_i2s (&s->target));
-#endif
-
now = GNUNET_TIME_absolute_get ();
if (now.abs_value < s->next_receive.abs_value)
{
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
struct GNUNET_TIME_Relative delta =
GNUNET_TIME_absolute_get_difference (now, s->next_receive);
-#if DEBUG_CLIENT
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Client: %X No inbound bandwidth available! Next read was delayed for %llu ms\n",
+ "Client: %p No inbound bandwidth available! Next read was delayed for %llu ms\n",
s->client_get, delta.rel_value);
-#endif
if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (s->recv_wakeup_task);
@@ -399,19 +425,17 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls)
GNUNET_SCHEDULER_add_delayed (delta, &client_wake_up, s);
return CURLPAUSE_ALL;
}
-
-
- if (s->msg_tk == NULL)
+ if (NULL == s->msg_tk)
s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s);
-
GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO, GNUNET_NO);
-
return len;
}
+
/**
* Callback method used with libcurl
* Method is called when libcurl needs to read data during sending
+ *
* @param stream pointer where to write data
* @param size size of an individual element
* @param nmemb count of elements that can be written to the buffer
@@ -422,69 +446,45 @@ static size_t
client_send_cb (void *stream, size_t size, size_t nmemb, void *cls)
{
struct Session *s = cls;
-
-#if VERBOSE_CLIENT
struct Plugin *plugin = s->plugin;
-#endif
- size_t bytes_sent = 0;
- size_t len;
-
struct HTTP_Message *msg = s->msg_head;
+ size_t len;
- if (msg == NULL)
+ if (GNUNET_YES != exist_session(plugin, s))
+ {
+ GNUNET_break (0);
+ return 0;
+ }
+ if (NULL == msg)
{
-#if VERBOSE_CLIENT
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Client: %X Nothing to send! Suspending PUT handle!\n",
+ "Client: %p Nothing to send! Suspending PUT handle!\n",
s->client_put);
-#endif
s->client_put_paused = GNUNET_YES;
return CURL_READFUNC_PAUSE;
}
-
- GNUNET_assert (msg != NULL);
/* data to send */
- if (msg->pos < msg->size)
- {
- /* data fit in buffer */
- if ((msg->size - msg->pos) <= (size * nmemb))
- {
- len = (msg->size - msg->pos);
- memcpy (stream, &msg->buf[msg->pos], len);
- msg->pos += len;
- bytes_sent = len;
- }
- else
- {
- len = size * nmemb;
- memcpy (stream, &msg->buf[msg->pos], len);
- msg->pos += len;
- bytes_sent = len;
- }
- }
- /* no data to send */
- else
- {
- GNUNET_assert (0);
- bytes_sent = 0;
- }
-
+ GNUNET_assert (msg->pos < msg->size);
+ /* calculate how much fits in buffer */
+ len = GNUNET_MIN (msg->size - msg->pos,
+ size * nmemb);
+ memcpy (stream, &msg->buf[msg->pos], len);
+ msg->pos += len;
if (msg->pos == msg->size)
{
-#if VERBOSE_CLIENT
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
- "Client: %X Message with %u bytes sent, removing message from queue\n",
+ "Client: %p Message with %u bytes sent, removing message from queue\n",
s->client_put, msg->size, msg->pos);
-#endif
/* Calling transmit continuation */
+ GNUNET_CONTAINER_DLL_remove (s->msg_head, s->msg_tail, msg);
if (NULL != msg->transmit_cont)
msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK);
- GNUNET_CONTAINER_DLL_remove (s->msg_head, s->msg_tail, msg);
GNUNET_free (msg);
}
- return bytes_sent;
+ return len;
}
+
int
client_connect (struct Session *s)
{
@@ -493,15 +493,10 @@ client_connect (struct Session *s)
char *url;
CURLMcode mret;
-#if VERBOSE_CLIENT
-#endif
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,
"Initiating outbound session peer `%s'\n",
GNUNET_i2s (&s->target));
-
-
s->inbound = GNUNET_NO;
-
plugin->last_tag++;
/* create url */
GNUNET_asprintf (&url, "%s%s;%u",
@@ -595,6 +590,12 @@ client_connect (struct Session *s)
/* Perform connect */
plugin->cur_connections += 2;
+ plugin->outbound_sessions ++;
+ GNUNET_STATISTICS_set (plugin->env->stats,
+ "# HTTP outbound sessions",
+ plugin->outbound_sessions,
+ GNUNET_NO);
+
/* Re-schedule since handles have changed */
if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK)
{
@@ -606,10 +607,12 @@ client_connect (struct Session *s)
return res;
}
+
int
client_start (struct Plugin *plugin)
{
int res = GNUNET_OK;
+ p = plugin;
curl_global_init (CURL_GLOBAL_ALL);
plugin->client_mh = curl_multi_init ();
@@ -619,15 +622,17 @@ client_start (struct Plugin *plugin)
GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name,
_
("Could not initialize curl multi handle, failed to start %s plugin!\n"),
- plugin->name);
+ plugin->name);
res = GNUNET_SYSERR;
}
return res;
}
+
void
client_stop (struct Plugin *plugin)
{
+ p = NULL;
if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK)
{
GNUNET_SCHEDULER_cancel (plugin->client_perform_task);