diff options
Diffstat (limited to 'src/transport/plugin_transport_http_client.c')
-rw-r--r-- | src/transport/plugin_transport_http_client.c | 199 |
1 files changed, 102 insertions, 97 deletions
diff --git a/src/transport/plugin_transport_http_client.c b/src/transport/plugin_transport_http_client.c index 4679e45..f55311f 100644 --- a/src/transport/plugin_transport_http_client.c +++ b/src/transport/plugin_transport_http_client.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet - (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009 Christian Grothoff (and other contributing authors) + (C) 2002--2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -26,6 +26,8 @@ #include "plugin_transport_http.h" +static struct Plugin * p; + #if VERBOSE_CURL /** * Function to log curl debug messages with GNUNET_log @@ -53,10 +55,10 @@ client_log (CURL * curl, curl_infotype type, char *data, size_t size, void *cls) } #if BUILD_HTTPS GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-https", - "Client: %X - %s", cls, text); + "Client: %p - %s", cls, text); #else GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "transport-http", - "Client: %X - %s", cls, text); + "Client: %p - %s", cls, text); #endif } return 0; @@ -71,6 +73,7 @@ client_log (CURL * curl, curl_infotype type, char *data, size_t size, void *cls) static void client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); + /** * Function setting up file descriptors and scheduling task to run * @@ -97,7 +100,6 @@ client_schedule (struct Plugin *plugin, int now) GNUNET_SCHEDULER_cancel (plugin->client_perform_task); plugin->client_perform_task = GNUNET_SCHEDULER_NO_TASK; } - max = -1; FD_ZERO (&rs); FD_ZERO (&ws); @@ -133,7 +135,7 @@ client_schedule (struct Plugin *plugin, int now) plugin->client_perform_task = GNUNET_SCHEDULER_add_select (GNUNET_SCHEDULER_PRIORITY_DEFAULT, - GNUNET_SCHEDULER_NO_TASK, timeout, grs, gws, + timeout, grs, gws, &client_run, plugin); GNUNET_NETWORK_fdset_destroy (gws); GNUNET_NETWORK_fdset_destroy (grs); @@ -145,27 +147,29 @@ int client_send (struct Session *s, struct HTTP_Message *msg) { GNUNET_assert (s != NULL); - GNUNET_CONTAINER_DLL_insert (s->msg_head, s->msg_tail, msg); + GNUNET_CONTAINER_DLL_insert_tail (s->msg_head, s->msg_tail, msg); + if (GNUNET_YES != exist_session(p, s)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } if (s->client_put_paused == GNUNET_YES) { -#if VERBOSE_CLIENT GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name, - "Client: %X was suspended, unpausing\n", s->client_put); -#endif + "Client: %p was suspended, unpausing\n", s->client_put); s->client_put_paused = GNUNET_NO; curl_easy_pause (s->client_put, CURLPAUSE_CONT); } - client_schedule (s->plugin, GNUNET_YES); return GNUNET_OK; } - /** * Task performing curl operations + * * @param cls plugin as closure * @param tc gnunet scheduler task context */ @@ -210,12 +214,19 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) GNUNET_assert (CURLE_OK == curl_easy_getinfo (easy_h, CURLINFO_PRIVATE, &d)); s = (struct Session *) d; + + if (GNUNET_YES != exist_session(plugin, s)) + { + GNUNET_break (0); + return; + } + GNUNET_assert (s != NULL); if (msg->msg == CURLMSG_DONE) { GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Client: %X connection to '%s' %s ended with reason %i: `%s'\n", + "Client: %p connection to '%s' %s ended with reason %i: `%s'\n", msg->easy_handle, GNUNET_i2s (&s->target), http_plugin_address_to_string (NULL, s->addr, s->addrlen), @@ -223,7 +234,11 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) curl_easy_strerror (msg->data.result)); client_disconnect (s); - //GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name,"Notifying about ended session to peer `%s' `%s'\n", GNUNET_i2s (&s->target), http_plugin_address_to_string (plugin, s->addr, s->addrlen)); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + plugin->name, + "Notifying about ended session to peer `%s' `%s'\n", + GNUNET_i2s (&s->target), + http_plugin_address_to_string (plugin, s->addr, s->addrlen)); notify_session_end (plugin, &s->target, s); } } @@ -232,6 +247,7 @@ client_run (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) client_schedule (plugin, GNUNET_NO); } + int client_disconnect (struct Session *s) { @@ -241,15 +257,17 @@ client_disconnect (struct Session *s) struct HTTP_Message *msg; struct HTTP_Message *t; - + if (GNUNET_YES != exist_session(plugin, s)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } if (s->client_put != NULL) { -#if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Client: %X Deleting outbound PUT session to peer `%s'\n", + "Client: %p Deleting outbound PUT session to peer `%s'\n", s->client_put, GNUNET_i2s (&s->target)); -#endif mret = curl_multi_remove_handle (plugin->client_mh, s->client_put); if (mret != CURLM_OK) @@ -271,11 +289,9 @@ client_disconnect (struct Session *s) if (s->client_get != NULL) { -#if DEBUG_HTTP GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Client: %X Deleting outbound GET session to peer `%s'\n", + "Client: %p Deleting outbound GET session to peer `%s'\n", s->client_get, GNUNET_i2s (&s->target)); -#endif mret = curl_multi_remove_handle (plugin->client_mh, s->client_get); if (mret != CURLM_OK) @@ -300,6 +316,14 @@ client_disconnect (struct Session *s) } plugin->cur_connections -= 2; + + GNUNET_assert (plugin->outbound_sessions > 0); + plugin->outbound_sessions --; + GNUNET_STATISTICS_set (plugin->env->stats, + "# HTTP outbound sessions", + plugin->outbound_sessions, + GNUNET_NO); + /* Re-schedule since handles have changed */ if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) { @@ -312,84 +336,86 @@ client_disconnect (struct Session *s) return res; } -static void +static int client_receive_mst_cb (void *cls, void *client, const struct GNUNET_MessageHeader *message) { struct Session *s = cls; struct GNUNET_TIME_Relative delay; + if (GNUNET_YES != exist_session(p, s)) + { + GNUNET_break (0); + return GNUNET_OK; + } + delay = http_plugin_receive (s, &s->target, message, s, s->addr, s->addrlen); s->next_receive = GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), delay); if (GNUNET_TIME_absolute_get ().abs_value < s->next_receive.abs_value) { -#if VERBOSE_CLIENT struct Plugin *plugin = s->plugin; GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: peer `%s' address `%s' next read delayed for %llu ms\n", GNUNET_i2s (&s->target), GNUNET_a2s (s->addr, s->addrlen), delay); -#endif } + return GNUNET_OK; } + static void client_wake_up (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { struct Session *s = cls; + if (GNUNET_YES != exist_session(p, s)) + { + GNUNET_break (0); + return; + } s->recv_wakeup_task = GNUNET_SCHEDULER_NO_TASK; - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, s->plugin->name, - "Client: %X Waking up receive handle\n", s->client_get); - + "Client: %p Waking up receive handle\n", s->client_get); if (s->client_get != NULL) curl_easy_pause (s->client_get, CURLPAUSE_CONT); - } + /** -* Callback method used with libcurl -* Method is called when libcurl needs to write data during sending -* @param stream pointer where to write data -* @param size size of an individual element -* @param nmemb count of elements that can be written to the buffer -* @param cls destination pointer, passed to the libcurl handle -* @return bytes read from stream -*/ + * Callback method used with libcurl + * Method is called when libcurl needs to write data during sending + * + * @param stream pointer where to write data + * @param size size of an individual element + * @param nmemb count of elements that can be written to the buffer + * @param cls destination pointer, passed to the libcurl handle + * @return bytes read from stream + */ static size_t client_receive (void *stream, size_t size, size_t nmemb, void *cls) { struct Session *s = cls; struct GNUNET_TIME_Absolute now; size_t len = size * nmemb; - - -#if VERBOSE_CLIENT struct Plugin *plugin = s->plugin; GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Client: Received %Zu bytes from peer `%s'\n", len, GNUNET_i2s (&s->target)); -#endif - now = GNUNET_TIME_absolute_get (); if (now.abs_value < s->next_receive.abs_value) { struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); struct GNUNET_TIME_Relative delta = GNUNET_TIME_absolute_get_difference (now, s->next_receive); -#if DEBUG_CLIENT GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Client: %X No inbound bandwidth available! Next read was delayed for %llu ms\n", + "Client: %p No inbound bandwidth available! Next read was delayed for %llu ms\n", s->client_get, delta.rel_value); -#endif if (s->recv_wakeup_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (s->recv_wakeup_task); @@ -399,19 +425,17 @@ client_receive (void *stream, size_t size, size_t nmemb, void *cls) GNUNET_SCHEDULER_add_delayed (delta, &client_wake_up, s); return CURLPAUSE_ALL; } - - - if (s->msg_tk == NULL) + if (NULL == s->msg_tk) s->msg_tk = GNUNET_SERVER_mst_create (&client_receive_mst_cb, s); - GNUNET_SERVER_mst_receive (s->msg_tk, s, stream, len, GNUNET_NO, GNUNET_NO); - return len; } + /** * Callback method used with libcurl * Method is called when libcurl needs to read data during sending + * * @param stream pointer where to write data * @param size size of an individual element * @param nmemb count of elements that can be written to the buffer @@ -422,69 +446,45 @@ static size_t client_send_cb (void *stream, size_t size, size_t nmemb, void *cls) { struct Session *s = cls; - -#if VERBOSE_CLIENT struct Plugin *plugin = s->plugin; -#endif - size_t bytes_sent = 0; - size_t len; - struct HTTP_Message *msg = s->msg_head; + size_t len; - if (msg == NULL) + if (GNUNET_YES != exist_session(plugin, s)) + { + GNUNET_break (0); + return 0; + } + if (NULL == msg) { -#if VERBOSE_CLIENT GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Client: %X Nothing to send! Suspending PUT handle!\n", + "Client: %p Nothing to send! Suspending PUT handle!\n", s->client_put); -#endif s->client_put_paused = GNUNET_YES; return CURL_READFUNC_PAUSE; } - - GNUNET_assert (msg != NULL); /* data to send */ - if (msg->pos < msg->size) - { - /* data fit in buffer */ - if ((msg->size - msg->pos) <= (size * nmemb)) - { - len = (msg->size - msg->pos); - memcpy (stream, &msg->buf[msg->pos], len); - msg->pos += len; - bytes_sent = len; - } - else - { - len = size * nmemb; - memcpy (stream, &msg->buf[msg->pos], len); - msg->pos += len; - bytes_sent = len; - } - } - /* no data to send */ - else - { - GNUNET_assert (0); - bytes_sent = 0; - } - + GNUNET_assert (msg->pos < msg->size); + /* calculate how much fits in buffer */ + len = GNUNET_MIN (msg->size - msg->pos, + size * nmemb); + memcpy (stream, &msg->buf[msg->pos], len); + msg->pos += len; if (msg->pos == msg->size) { -#if VERBOSE_CLIENT GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, - "Client: %X Message with %u bytes sent, removing message from queue\n", + "Client: %p Message with %u bytes sent, removing message from queue\n", s->client_put, msg->size, msg->pos); -#endif /* Calling transmit continuation */ + GNUNET_CONTAINER_DLL_remove (s->msg_head, s->msg_tail, msg); if (NULL != msg->transmit_cont) msg->transmit_cont (msg->transmit_cont_cls, &s->target, GNUNET_OK); - GNUNET_CONTAINER_DLL_remove (s->msg_head, s->msg_tail, msg); GNUNET_free (msg); } - return bytes_sent; + return len; } + int client_connect (struct Session *s) { @@ -493,15 +493,10 @@ client_connect (struct Session *s) char *url; CURLMcode mret; -#if VERBOSE_CLIENT -#endif GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, plugin->name, "Initiating outbound session peer `%s'\n", GNUNET_i2s (&s->target)); - - s->inbound = GNUNET_NO; - plugin->last_tag++; /* create url */ GNUNET_asprintf (&url, "%s%s;%u", @@ -595,6 +590,12 @@ client_connect (struct Session *s) /* Perform connect */ plugin->cur_connections += 2; + plugin->outbound_sessions ++; + GNUNET_STATISTICS_set (plugin->env->stats, + "# HTTP outbound sessions", + plugin->outbound_sessions, + GNUNET_NO); + /* Re-schedule since handles have changed */ if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) { @@ -606,10 +607,12 @@ client_connect (struct Session *s) return res; } + int client_start (struct Plugin *plugin) { int res = GNUNET_OK; + p = plugin; curl_global_init (CURL_GLOBAL_ALL); plugin->client_mh = curl_multi_init (); @@ -619,15 +622,17 @@ client_start (struct Plugin *plugin) GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, plugin->name, _ ("Could not initialize curl multi handle, failed to start %s plugin!\n"), - plugin->name); + plugin->name); res = GNUNET_SYSERR; } return res; } + void client_stop (struct Plugin *plugin) { + p = NULL; if (plugin->client_perform_task != GNUNET_SCHEDULER_NO_TASK) { GNUNET_SCHEDULER_cancel (plugin->client_perform_task); |