diff options
Diffstat (limited to 'src/transport/plugin_transport_tcp.c')
-rw-r--r-- | src/transport/plugin_transport_tcp.c | 2115 |
1 files changed, 2115 insertions, 0 deletions
diff --git a/src/transport/plugin_transport_tcp.c b/src/transport/plugin_transport_tcp.c new file mode 100644 index 0000000..2b2d728 --- /dev/null +++ b/src/transport/plugin_transport_tcp.c @@ -0,0 +1,2115 @@ +/* + This file is part of GNUnet + (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ +/** + * @file transport/plugin_transport_tcp.c + * @brief Implementation of the TCP transport service + * @author Christian Grothoff + */ +#include "platform.h" +#include "gnunet_hello_lib.h" +#include "gnunet_constants.h" +#include "gnunet_connection_lib.h" +#include "gnunet_container_lib.h" +#include "gnunet_nat_lib.h" +#include "gnunet_os_lib.h" +#include "gnunet_protocols.h" +#include "gnunet_resolver_service.h" +#include "gnunet_server_lib.h" +#include "gnunet_service_lib.h" +#include "gnunet_signatures.h" +#include "gnunet_statistics_service.h" +#include "gnunet_transport_service.h" +#include "gnunet_transport_plugin.h" +#include "transport.h" + +#define DEBUG_TCP GNUNET_EXTRA_LOGGING + +#define DEBUG_TCP_NAT GNUNET_EXTRA_LOGGING + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Initial handshake message for a session. + */ +struct WelcomeMessage +{ + /** + * Type is GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME. + */ + struct GNUNET_MessageHeader header; + + /** + * Identity of the node connecting (TCP client) + */ + struct GNUNET_PeerIdentity clientIdentity; + +}; + + +/** + * Basically a WELCOME message, but with the purpose + * of giving the waiting peer a client handle to use + */ +struct TCP_NAT_ProbeMessage +{ + /** + * Type is GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_NAT_PROBE. + */ + struct GNUNET_MessageHeader header; + + /** + * Identity of the sender of the message. + */ + struct GNUNET_PeerIdentity clientIdentity; + +}; +GNUNET_NETWORK_STRUCT_END + +/** + * Context for sending a NAT probe via TCP. + */ +struct TCPProbeContext +{ + + /** + * Active probes are kept in a DLL. + */ + struct TCPProbeContext *next; + + /** + * Active probes are kept in a DLL. + */ + struct TCPProbeContext *prev; + + /** + * Probe connection. + */ + struct GNUNET_CONNECTION_Handle *sock; + + /** + * Message to be sent. + */ + struct TCP_NAT_ProbeMessage message; + + /** + * Handle to the transmission. + */ + struct GNUNET_CONNECTION_TransmitHandle *transmit_handle; + + /** + * Transport plugin handle. + */ + struct Plugin *plugin; +}; + + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Network format for IPv4 addresses. + */ +struct IPv4TcpAddress +{ + /** + * IPv4 address, in network byte order. + */ + uint32_t ipv4_addr GNUNET_PACKED; + + /** + * Port number, in network byte order. + */ + uint16_t t4_port GNUNET_PACKED; + +}; + + +/** + * Network format for IPv6 addresses. + */ +struct IPv6TcpAddress +{ + /** + * IPv6 address. + */ + struct in6_addr ipv6_addr GNUNET_PACKED; + + /** + * Port number, in network byte order. + */ + uint16_t t6_port GNUNET_PACKED; + +}; +GNUNET_NETWORK_STRUCT_END + +/** + * Encapsulation of all of the state of the plugin. + */ +struct Plugin; + + +/** + * Information kept for each message that is yet to + * be transmitted. + */ +struct PendingMessage +{ + + /** + * This is a doubly-linked list. + */ + struct PendingMessage *next; + + /** + * This is a doubly-linked list. + */ + struct PendingMessage *prev; + + /** + * The pending message + */ + const char *msg; + + /** + * Continuation function to call once the message + * has been sent. Can be NULL if there is no + * continuation to call. + */ + GNUNET_TRANSPORT_TransmitContinuation transmit_cont; + + /** + * Closure for transmit_cont. + */ + void *transmit_cont_cls; + + /** + * Timeout value for the pending message. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * So that the gnunet-service-transport can group messages together, + * these pending messages need to accept a message buffer and size + * instead of just a GNUNET_MessageHeader. + */ + size_t message_size; + +}; + + +/** + * Session handle for TCP connections. + */ +struct Session +{ + + /** + * API requirement. + */ + struct SessionHeader header; + + /** + * Stored in a linked list. + */ + struct Session *next; + + /** + * Pointer to the global plugin struct. + */ + struct Plugin *plugin; + + /** + * The client (used to identify this connection) + */ + struct GNUNET_SERVER_Client *client; + + /** + * Messages currently pending for transmission + * to this peer, if any. + */ + struct PendingMessage *pending_messages_head; + + /** + * Messages currently pending for transmission + * to this peer, if any. + */ + struct PendingMessage *pending_messages_tail; + + /** + * Handle for pending transmission request. + */ + struct GNUNET_CONNECTION_TransmitHandle *transmit_handle; + + /** + * To whom are we talking to (set to our identity + * if we are still waiting for the welcome message) + */ + struct GNUNET_PeerIdentity target; + + /** + * ID of task used to delay receiving more to throttle sender. + */ + GNUNET_SCHEDULER_TaskIdentifier receive_delay_task; + + /** + * Address of the other peer (either based on our 'connect' + * call or on our 'accept' call). + * + * struct IPv4TcpAddress or struct IPv6TcpAddress + * + */ + void *addr; + + /** + * Length of connect_addr. + */ + size_t addrlen; + + /** + * Last activity on this connection. Used to select preferred + * connection. + */ + struct GNUNET_TIME_Absolute last_activity; + + /** + * Are we still expecting the welcome message? (GNUNET_YES/GNUNET_NO) + */ + int expecting_welcome; + + /** + * Was this a connection that was inbound (we accepted)? (GNUNET_YES/GNUNET_NO) + */ + int inbound; + + /** + * Was this session created using NAT traversal? + */ + int is_nat; + + /** + * ATS network type in NBO + */ + uint32_t ats_address_network_type; +}; + + +/** + * Encapsulation of all of the state of the plugin. + */ +struct Plugin +{ + /** + * Our environment. + */ + struct GNUNET_TRANSPORT_PluginEnvironment *env; + + /** + * The listen socket. + */ + struct GNUNET_CONNECTION_Handle *lsock; + + /** + * Our handle to the NAT module. + */ + struct GNUNET_NAT_Handle *nat; + + struct GNUNET_CONTAINER_MultiHashMap * sessionmap; + + /** + * Handle to the network service. + */ + struct GNUNET_SERVICE_Context *service; + + /** + * Handle to the server for this service. + */ + struct GNUNET_SERVER_Handle *server; + + /** + * Copy of the handler array where the closures are + * set to this struct's instance. + */ + struct GNUNET_SERVER_MessageHandler *handlers; + + /** + * Map of peers we have tried to contact behind a NAT + */ + struct GNUNET_CONTAINER_MultiHashMap *nat_wait_conns; + + /** + * List of active TCP probes. + */ + struct TCPProbeContext *probe_head; + + /** + * List of active TCP probes. + */ + struct TCPProbeContext *probe_tail; + + /** + * Handle for (DYN)DNS lookup of our external IP. + */ + struct GNUNET_RESOLVER_RequestHandle *ext_dns; + + /** + * How many more TCP sessions are we allowed to open right now? + */ + unsigned long long max_connections; + + /** + * ID of task used to update our addresses when one expires. + */ + GNUNET_SCHEDULER_TaskIdentifier address_update_task; + + /** + * Port that we are actually listening on. + */ + uint16_t open_port; + + /** + * Port that the user said we would have visible to the + * rest of the world. + */ + uint16_t adv_port; + +}; + + +/** + * Function to check if an inbound connection is acceptable. + * Mostly used to limit the total number of open connections + * we can have. + * + * @param cls the 'struct Plugin' + * @param ucred credentials, if available, otherwise NULL + * @param addr address + * @param addrlen length of address + * @return GNUNET_YES to allow, GNUNET_NO to deny, GNUNET_SYSERR + * for unknown address family (will be denied). + */ +static int +plugin_tcp_access_check (void *cls, + const struct GNUNET_CONNECTION_Credentials *ucred, + const struct sockaddr *addr, socklen_t addrlen) +{ + struct Plugin *plugin = cls; + + if (0 == plugin->max_connections) + return GNUNET_NO; + plugin->max_connections--; + return GNUNET_YES; +} + + +/** + * Our external IP address/port mapping has changed. + * + * @param cls closure, the 'struct LocalAddrList' + * @param add_remove GNUNET_YES to mean the new public IP address, GNUNET_NO to mean + * the previous (now invalid) one + * @param addr either the previous or the new public IP address + * @param addrlen actual lenght of the address + */ +static void +tcp_nat_port_map_callback (void *cls, int add_remove, + const struct sockaddr *addr, socklen_t addrlen) +{ + struct Plugin *plugin = cls; + struct IPv4TcpAddress t4; + struct IPv6TcpAddress t6; + void *arg; + size_t args; + + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "NPMC called with %d for address `%s'\n", add_remove, + GNUNET_a2s (addr, addrlen)); + /* convert 'addr' to our internal format */ + switch (addr->sa_family) + { + case AF_INET: + GNUNET_assert (addrlen == sizeof (struct sockaddr_in)); + t4.ipv4_addr = ((struct sockaddr_in *) addr)->sin_addr.s_addr; + t4.t4_port = ((struct sockaddr_in *) addr)->sin_port; + arg = &t4; + args = sizeof (t4); + break; + case AF_INET6: + GNUNET_assert (addrlen == sizeof (struct sockaddr_in6)); + memcpy (&t6.ipv6_addr, &((struct sockaddr_in6 *) addr)->sin6_addr, + sizeof (struct in6_addr)); + t6.t6_port = ((struct sockaddr_in6 *) addr)->sin6_port; + arg = &t6; + args = sizeof (t6); + break; + default: + GNUNET_break (0); + return; + } + /* modify our published address list */ + plugin->env->notify_address (plugin->env->cls, add_remove, arg, args); +} + + +/** + * Function called for a quick conversion of the binary address to + * a numeric address. Note that the caller must not free the + * address and that the next call to this function is allowed + * to override the address again. + * + * @param cls closure ('struct Plugin*') + * @param addr binary address + * @param addrlen length of the address + * @return string representing the same address + */ +static const char * +tcp_address_to_string (void *cls, const void *addr, size_t addrlen) +{ + static char rbuf[INET6_ADDRSTRLEN + 12]; + char buf[INET6_ADDRSTRLEN]; + const void *sb; + struct in_addr a4; + struct in6_addr a6; + const struct IPv4TcpAddress *t4; + const struct IPv6TcpAddress *t6; + int af; + uint16_t port; + + if (addrlen == sizeof (struct IPv6TcpAddress)) + { + t6 = addr; + af = AF_INET6; + port = ntohs (t6->t6_port); + memcpy (&a6, &t6->ipv6_addr, sizeof (a6)); + sb = &a6; + } + else if (addrlen == sizeof (struct IPv4TcpAddress)) + { + t4 = addr; + af = AF_INET; + port = ntohs (t4->t4_port); + memcpy (&a4, &t4->ipv4_addr, sizeof (a4)); + sb = &a4; + } + else + { + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "tcp", + _("Unexpected address length: %u bytes\n"), + (unsigned int) addrlen); + GNUNET_break (0); + return NULL; + } + if (NULL == inet_ntop (af, sb, buf, INET6_ADDRSTRLEN)) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "inet_ntop"); + return NULL; + } + GNUNET_snprintf (rbuf, sizeof (rbuf), (af == AF_INET6) ? "[%s]:%u" : "%s:%u", + buf, port); + return rbuf; +} + + +struct SessionClientCtx +{ + const struct GNUNET_SERVER_Client *client; + struct Session *ret; +}; + +int session_lookup_by_client_it (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct SessionClientCtx *sc_ctx = cls; + struct Session *s = value; + + if (s->client == sc_ctx->client) + { + sc_ctx->ret = s; + return GNUNET_NO; + } + return GNUNET_YES; +} + +/** + * Find the session handle for the given client. + * + * @param plugin the plugin + * @param client which client to find the session handle for + * @return NULL if no matching session exists + */ +static struct Session * +lookup_session_by_client (struct Plugin *plugin, + const struct GNUNET_SERVER_Client *client) +{ + struct SessionClientCtx sc_ctx; + sc_ctx.client = client; + sc_ctx.ret = NULL; + + GNUNET_CONTAINER_multihashmap_iterate (plugin->sessionmap, &session_lookup_by_client_it, &sc_ctx); + + return sc_ctx.ret; +} + + +/** + * Create a new session. Also queues a welcome message. + * + * @param plugin the plugin + * @param target peer to connect to + * @param client client to use + * @param is_nat this a NAT session, we should wait for a client to + * connect to us from an address, then assign that to + * the session + * @return new session object + */ +static struct Session * +create_session (struct Plugin *plugin, const struct GNUNET_PeerIdentity *target, + struct GNUNET_SERVER_Client *client, int is_nat) +{ + struct Session *ret; + struct PendingMessage *pm; + struct WelcomeMessage welcome; + + if (is_nat != GNUNET_YES) + GNUNET_assert (client != NULL); + else + GNUNET_assert (client == NULL); + + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Creating new session for peer `%4s'\n", + GNUNET_i2s (target)); + + ret = GNUNET_malloc (sizeof (struct Session)); + ret->last_activity = GNUNET_TIME_absolute_get (); + ret->plugin = plugin; + ret->is_nat = is_nat; + ret->client = client; + ret->target = *target; + ret->expecting_welcome = GNUNET_YES; + ret->ats_address_network_type = htonl (GNUNET_ATS_NET_UNSPECIFIED); + pm = GNUNET_malloc (sizeof (struct PendingMessage) + + sizeof (struct WelcomeMessage)); + pm->msg = (const char *) &pm[1]; + pm->message_size = sizeof (struct WelcomeMessage); + welcome.header.size = htons (sizeof (struct WelcomeMessage)); + welcome.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_TCP_WELCOME); + welcome.clientIdentity = *plugin->env->my_identity; + memcpy (&pm[1], &welcome, sizeof (welcome)); + pm->timeout = GNUNET_TIME_UNIT_FOREVER_ABS; + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes currently in TCP buffers"), + pm->message_size, GNUNET_NO); + GNUNET_CONTAINER_DLL_insert (ret->pending_messages_head, + ret->pending_messages_tail, pm); + if (is_nat != GNUNET_YES) + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# TCP sessions active"), 1, + GNUNET_NO); + return ret; +} + + +/** + * If we have pending messages, ask the server to + * transmit them (schedule the respective tasks, etc.) + * + * @param session for which session should we do this + */ +static void +process_pending_messages (struct Session *session); + + +/** + * Function called to notify a client about the socket + * being ready to queue more data. "buf" will be + * NULL and "size" zero if the socket was closed for + * writing in the meantime. + * + * @param cls closure + * @param size number of bytes available in buf + * @param buf where the callee should write the message + * @return number of bytes written to buf + */ +static size_t +do_transmit (void *cls, size_t size, void *buf) +{ + struct Session *session = cls; + struct GNUNET_PeerIdentity pid; + struct Plugin *plugin; + struct PendingMessage *pos; + struct PendingMessage *hd; + struct PendingMessage *tl; + struct GNUNET_TIME_Absolute now; + char *cbuf; + size_t ret; + + GNUNET_assert (session != NULL); + session->transmit_handle = NULL; + plugin = session->plugin; + if (buf == NULL) + { +#if DEBUG_TCP + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Timeout trying to transmit to peer `%4s', discarding message queue.\n", + GNUNET_i2s (&session->target)); +#endif + /* timeout; cancel all messages that have already expired */ + hd = NULL; + tl = NULL; + ret = 0; + now = GNUNET_TIME_absolute_get (); + while ((NULL != (pos = session->pending_messages_head)) && + (pos->timeout.abs_value <= now.abs_value)) + { + GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, + session->pending_messages_tail, pos); +#if DEBUG_TCP + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Failed to transmit %u byte message to `%4s'.\n", + pos->message_size, GNUNET_i2s (&session->target)); +#endif + ret += pos->message_size; + GNUNET_CONTAINER_DLL_insert_after (hd, tl, tl, pos); + } + /* do this call before callbacks (so that if callbacks destroy + * session, they have a chance to cancel actions done by this + * call) */ + process_pending_messages (session); + pid = session->target; + /* no do callbacks and do not use session again since + * the callbacks may abort the session */ + while (NULL != (pos = hd)) + { + GNUNET_CONTAINER_DLL_remove (hd, tl, pos); + if (pos->transmit_cont != NULL) + pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_SYSERR); + GNUNET_free (pos); + } + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes currently in TCP buffers"), + -(int64_t) ret, GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop + ("# bytes discarded by TCP (timeout)"), ret, + GNUNET_NO); + return 0; + } + /* copy all pending messages that would fit */ + ret = 0; + cbuf = buf; + hd = NULL; + tl = NULL; + while (NULL != (pos = session->pending_messages_head)) + { + if (ret + pos->message_size > size) + break; + GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, + session->pending_messages_tail, pos); + GNUNET_assert (size >= pos->message_size); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Transmitting message of type %u\n", + ntohs (((struct GNUNET_MessageHeader *) pos->msg)->type)); + /* FIXME: this memcpy can be up to 7% of our total runtime */ + memcpy (cbuf, pos->msg, pos->message_size); + cbuf += pos->message_size; + ret += pos->message_size; + size -= pos->message_size; + GNUNET_CONTAINER_DLL_insert_tail (hd, tl, pos); + } + /* schedule 'continuation' before callbacks so that callbacks that + * cancel everything don't cause us to use a session that no longer + * exists... */ + process_pending_messages (session); + session->last_activity = GNUNET_TIME_absolute_get (); + pid = session->target; + /* we'll now call callbacks that may cancel the session; hence + * we should not use 'session' after this point */ + while (NULL != (pos = hd)) + { + GNUNET_CONTAINER_DLL_remove (hd, tl, pos); + if (pos->transmit_cont != NULL) + pos->transmit_cont (pos->transmit_cont_cls, &pid, GNUNET_OK); + GNUNET_free (pos); + } + GNUNET_assert (hd == NULL); + GNUNET_assert (tl == NULL); +#if DEBUG_TCP > 1 + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", "Transmitting %u bytes\n", + ret); +#endif + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes currently in TCP buffers"), + -(int64_t) ret, GNUNET_NO); + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes transmitted via TCP"), ret, + GNUNET_NO); + return ret; +} + + +/** + * If we have pending messages, ask the server to + * transmit them (schedule the respective tasks, etc.) + * + * @param session for which session should we do this + */ +static void +process_pending_messages (struct Session *session) +{ + struct PendingMessage *pm; + + GNUNET_assert (session->client != NULL); + if (session->transmit_handle != NULL) + return; + if (NULL == (pm = session->pending_messages_head)) + return; + + session->transmit_handle = + GNUNET_SERVER_notify_transmit_ready (session->client, pm->message_size, + GNUNET_TIME_absolute_get_remaining + (pm->timeout), &do_transmit, + session); +} + + +/** + * Functions with this signature are called whenever we need + * to close a session due to a disconnect or failure to + * establish a connection. + * + * @param session session to close down + */ +static void +disconnect_session (struct Session *session) +{ + struct PendingMessage *pm; + struct Plugin * plugin = session->plugin; + + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Disconnecting session %p for peer `%s' address `%s'\n", + session, + GNUNET_i2s (&session->target), + tcp_address_to_string(NULL, session->addr, session->addrlen)); + + GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove(plugin->sessionmap, &session->target.hashPubKey, session)); + + /* clean up state */ + if (session->transmit_handle != NULL) + { + GNUNET_CONNECTION_notify_transmit_ready_cancel (session->transmit_handle); + session->transmit_handle = NULL; + } + session->plugin->env->session_end (session->plugin->env->cls, + &session->target, session); + while (NULL != (pm = session->pending_messages_head)) + { +#if DEBUG_TCP + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + pm->transmit_cont != + NULL ? "Could not deliver message to `%4s'.\n" : + "Could not deliver message to `%4s', notifying.\n", + GNUNET_i2s (&session->target)); +#endif + GNUNET_STATISTICS_update (session->plugin->env->stats, + gettext_noop ("# bytes currently in TCP buffers"), + -(int64_t) pm->message_size, GNUNET_NO); + GNUNET_STATISTICS_update (session->plugin->env->stats, + gettext_noop + ("# bytes discarded by TCP (disconnect)"), + pm->message_size, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (session->pending_messages_head, + session->pending_messages_tail, pm); + if (NULL != pm->transmit_cont) + pm->transmit_cont (pm->transmit_cont_cls, &session->target, + GNUNET_SYSERR); + GNUNET_free (pm); + } + GNUNET_break (session->client != NULL); + if (session->receive_delay_task != GNUNET_SCHEDULER_NO_TASK) + { + GNUNET_SCHEDULER_cancel (session->receive_delay_task); + if (session->client != NULL) + GNUNET_SERVER_receive_done (session->client, GNUNET_SYSERR); + } + if (session->client != NULL) + { + GNUNET_SERVER_client_drop (session->client); + session->client = NULL; + } + GNUNET_STATISTICS_update (session->plugin->env->stats, + gettext_noop ("# TCP sessions active"), -1, + GNUNET_NO); + GNUNET_free_non_null (session->addr); + GNUNET_assert (NULL == session->transmit_handle); + GNUNET_free (session); +} + + +/** + * Function that can be used by the transport service to transmit + * a message using the plugin. Note that in the case of a + * peer disconnecting, the continuation MUST be called + * prior to the disconnect notification itself. This function + * will be called with this peer's HELLO message to initiate + * a fresh connection to another peer. + * + * @param cls closure + * @param session which session must be used + * @param msgbuf the message to transmit + * @param msgbuf_size number of bytes in 'msgbuf' + * @param priority how important is the message (most plugins will + * ignore message priority and just FIFO) + * @param to how long to wait at most for the transmission (does not + * require plugins to discard the message after the timeout, + * just advisory for the desired delay; most plugins will ignore + * this as well) + * @param cont continuation to call once the message has + * been transmitted (or if the transport is ready + * for the next transmission call; or if the + * peer disconnected...); can be NULL + * @param cont_cls closure for cont + * @return number of bytes used (on the physical network, with overheads); + * -1 on hard errors (i.e. address invalid); 0 is a legal value + * and does NOT mean that the message was not transmitted (DV) + */ +static ssize_t +tcp_plugin_send (void *cls, + struct Session *session, + const char *msgbuf, size_t msgbuf_size, + unsigned int priority, + struct GNUNET_TIME_Relative to, + GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) +{ + struct Plugin * plugin = cls; + struct PendingMessage *pm; + + GNUNET_assert (plugin != NULL); + GNUNET_assert (session != NULL); + GNUNET_assert (session->client != NULL); + + GNUNET_SERVER_client_set_timeout (session->client, + GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_STATISTICS_update (plugin->env->stats, + gettext_noop ("# bytes currently in TCP buffers"), + msgbuf_size, GNUNET_NO); + /* create new message entry */ + pm = GNUNET_malloc (sizeof (struct PendingMessage) + msgbuf_size); + pm->msg = (const char *) &pm[1]; + memcpy (&pm[1], msgbuf, msgbuf_size); + pm->message_size = msgbuf_size; + pm->timeout = GNUNET_TIME_relative_to_absolute (to); + pm->transmit_cont = cont; + pm->transmit_cont_cls = cont_cls; + + /* append pm to pending_messages list */ + GNUNET_CONTAINER_DLL_insert_tail (session->pending_messages_head, + session->pending_messages_tail, pm); + + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Asked to transmit %u bytes to `%s', added message to list.\n", + msgbuf_size, GNUNET_i2s (&session->target)); + + process_pending_messages (session); + return msgbuf_size; +} + +struct SessionItCtx +{ + void * addr; + size_t addrlen; + struct Session * result; +}; + +int session_lookup_it (void *cls, + const GNUNET_HashCode * key, + void *value) +{ + struct SessionItCtx * si_ctx = cls; + struct Session * session = value; +#if 0 + char * a1 = strdup (tcp_address_to_string(NULL, session->addr, session->addrlen)); + char * a2 = strdup (tcp_address_to_string(NULL, si_ctx->addr, si_ctx->addrlen)); + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "tcp", + "Comparing: %s %u <-> %s %u\n", + a1, + session->addrlen, + a2, + si_ctx->addrlen); + GNUNET_free (a1); + GNUNET_free (a2); +#endif + if (session->addrlen != si_ctx->addrlen) + { + return GNUNET_YES; + } + if (0 != memcmp (session->addr, si_ctx->addr, si_ctx->addrlen)) + { + return GNUNET_YES; + } +#if 0 + a1 = strdup (tcp_address_to_string(NULL, session->addr, session->addrlen)); + a2 = strdup (tcp_address_to_string(NULL, si_ctx->addr, si_ctx->addrlen)); + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "tcp", + "Comparing: %s %u <-> %s %u , OK!\n", + a1, + session->addrlen, + a2, + si_ctx->addrlen); + GNUNET_free (a1); + GNUNET_free (a2); +#endif + /* Found existing session */ + si_ctx->result = session; + return GNUNET_NO; +} + + +/** + * Create a new session to transmit data to the target + * This session will used to send data to this peer and the plugin will + * notify us by calling the env->session_end function + * + * @param cls closure + * @param address pointer to the GNUNET_HELLO_Address + * @return the session if the address is valid, NULL otherwise + */ +static struct Session * +tcp_plugin_get_session (void *cls, + const struct GNUNET_HELLO_Address *address) +{ + struct Plugin * plugin = cls; + struct Session * session = NULL; + + int af; + const void *sb; + size_t sbs; + struct GNUNET_CONNECTION_Handle *sa; + struct sockaddr_in a4; + struct sockaddr_in6 a6; + const struct IPv4TcpAddress *t4; + const struct IPv6TcpAddress *t6; + struct GNUNET_ATS_Information ats; + unsigned int is_natd = GNUNET_NO; + size_t addrlen = 0; + + GNUNET_assert (plugin != NULL); + GNUNET_assert (address != NULL); + + addrlen = address->address_length; + + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Trying to get session for `%s' address length %i\n", + tcp_address_to_string(NULL, address->address, address->address_length), + addrlen); + + /* look for existing session */ + if (GNUNET_CONTAINER_multihashmap_contains(plugin->sessionmap, &address->peer.hashPubKey)) + { + struct SessionItCtx si_ctx; + + si_ctx.addr = (void *) address->address; + si_ctx.addrlen = address->address_length; + + si_ctx.result = NULL; + + GNUNET_CONTAINER_multihashmap_get_multiple(plugin->sessionmap, &address->peer.hashPubKey, &session_lookup_it, &si_ctx); + if (si_ctx.result != NULL) + { + session = si_ctx.result; + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Found exisiting session for `%s' address `%s' session %p\n", + GNUNET_i2s (&address->peer), + tcp_address_to_string(NULL, address->address, address->address_length), + session); + return session; + } + } + + if (addrlen == sizeof (struct IPv6TcpAddress)) + { + GNUNET_assert (NULL != address->address); /* make static analysis happy */ + t6 = address->address; + af = AF_INET6; + memset (&a6, 0, sizeof (a6)); +#if HAVE_SOCKADDR_IN_SIN_LEN + a6.sin6_len = sizeof (a6); +#endif + a6.sin6_family = AF_INET6; + a6.sin6_port = t6->t6_port; + if (t6->t6_port == 0) + is_natd = GNUNET_YES; + memcpy (&a6.sin6_addr, &t6->ipv6_addr, sizeof (struct in6_addr)); + sb = &a6; + sbs = sizeof (a6); + } + else if (addrlen == sizeof (struct IPv4TcpAddress)) + { + GNUNET_assert (NULL != address->address); /* make static analysis happy */ + t4 = address->address; + af = AF_INET; + memset (&a4, 0, sizeof (a4)); +#if HAVE_SOCKADDR_IN_SIN_LEN + a4.sin_len = sizeof (a4); +#endif + a4.sin_family = AF_INET; + a4.sin_port = t4->t4_port; + if (t4->t4_port == 0) + is_natd = GNUNET_YES; + a4.sin_addr.s_addr = t4->ipv4_addr; + sb = &a4; + sbs = sizeof (a4); + } + else + { + GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR, "tcp", + _("Address of unexpected length: %u\n"), addrlen); + GNUNET_break (0); + return NULL; + } + + ats = plugin->env->get_address_type (plugin->env->cls, sb ,sbs); + + if ((is_natd == GNUNET_YES) && (addrlen == sizeof (struct IPv6TcpAddress))) + { + /* NAT client only works with IPv4 addresses */ + return NULL; + } + + if (0 == plugin->max_connections) + { + /* saturated */ + return NULL; + } + + if ((is_natd == GNUNET_YES) && + (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_contains (plugin->nat_wait_conns, + &address->peer.hashPubKey))) + { + /* Only do one NAT punch attempt per peer identity */ + return NULL; + } + + if ((is_natd == GNUNET_YES) && (NULL != plugin->nat) && + (GNUNET_NO == + GNUNET_CONTAINER_multihashmap_contains (plugin->nat_wait_conns, + &address->peer.hashPubKey))) + { +#if DEBUG_TCP_NAT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + _("Found valid IPv4 NAT address (creating session)!\n")); +#endif + session = create_session (plugin, &address->peer, NULL, GNUNET_YES); + session->addrlen = 0; + session->addr = NULL; + session->ats_address_network_type = ats.value; + GNUNET_assert (session != NULL); + + GNUNET_assert (GNUNET_CONTAINER_multihashmap_put + (plugin->nat_wait_conns, &address->peer.hashPubKey, session, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY) == GNUNET_OK); +#if DEBUG_TCP_NAT + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Created NAT WAIT connection to `%4s' at `%s'\n", + GNUNET_i2s (&session->target), GNUNET_a2s (sb, sbs)); +#endif + GNUNET_NAT_run_client (plugin->nat, &a4); + return session; + } + + /* create new outbound session */ + GNUNET_assert (0 != plugin->max_connections); + sa = GNUNET_CONNECTION_create_from_sockaddr (af, sb, sbs); + if (sa == NULL) + { +#if DEBUG_TCP + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "tcp", + "Failed to create connection to `%4s' at `%s'\n", + GNUNET_i2s (&session->target), GNUNET_a2s (sb, sbs)); +#endif + return NULL; + } + plugin->max_connections--; + |