diff options
Diffstat (limited to 'src/transport/gnunet-service-transport_neighbours.c')
-rw-r--r-- | src/transport/gnunet-service-transport_neighbours.c | 4102 |
1 files changed, 2264 insertions, 1838 deletions
diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c index 3e8ef5a..c580168 100644 --- a/src/transport/gnunet-service-transport_neighbours.c +++ b/src/transport/gnunet-service-transport_neighbours.c @@ -1,6 +1,6 @@ /* This file is part of GNUnet. - (C) 2010,2011 Christian Grothoff (and other contributing authors) + (C) 2010,2011,2012 Christian Grothoff (and other contributing authors) GNUnet is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published @@ -22,6 +22,10 @@ * @file transport/gnunet-service-transport_neighbours.c * @brief neighbour management * @author Christian Grothoff + * + * TODO: + * - "address_change_cb" is NEVER invoked; when should we call this one exactly? + * - TEST, TEST, TEST... */ #include "platform.h" #include "gnunet_ats_service.h" @@ -42,6 +46,12 @@ #define NEIGHBOUR_TABLE_SIZE 256 /** + * Time we give plugin to transmit DISCONNECT message before the + * neighbour entry self-destructs. + */ +#define DISCONNECT_SENT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 100) + +/** * How often must a peer violate bandwidth quotas before we start * to simply drop its messages? */ @@ -50,38 +60,53 @@ /** * How often do we send KEEPALIVE messages to each of our neighbours and measure * the latency with this neighbour? - * (idle timeout is 5 minutes or 300 seconds, so with 30s interval we - * send 10 keepalives in each interval, so 10 messages would need to be + * (idle timeout is 5 minutes or 300 seconds, so with 100s interval we + * send 3 keepalives in each interval, so 3 messages would need to be * lost in a row for a disconnect). */ -#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) +#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 100) +/** + * How long are we willing to wait for a response from ATS before timing out? + */ +#define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500) -#define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 5) - -#define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) - +/** + * How long are we willing to wait for an ACK from the other peer before + * giving up on our connect operation? + */ #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 15) -#define TEST_NEW_CODE GNUNET_NO +/** + * How long are we willing to wait for a successful reconnect if + * an existing connection went down? Much shorter than the + * usual SETUP_CONNECTION_TIMEOUT as we do not inform the + * higher layers about the disconnect during this period. + */ +#define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 1) /** - * Entry in neighbours. + * How long are we willing to wait for a response from the blacklist + * subsystem before timing out? */ -struct NeighbourMapEntry; +#define BLACKLIST_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500) + GNUNET_NETWORK_STRUCT_BEGIN /** - * Message a peer sends to another to indicate its - * preference for communicating via a particular - * session (and the desire to establish a real - * connection). + * Message a peer sends to another to indicate that it intends to + * setup a connection/session for data exchange. A 'SESSION_CONNECT' + * should be answered with a 'SESSION_CONNECT_ACK' with the same body + * to confirm. A 'SESSION_CONNECT_ACK' should then be followed with + * a 'SESSION_ACK'. Once the 'SESSION_ACK' is received, both peers + * should be connected. */ struct SessionConnectMessage { /** * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT' + * or 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK' */ struct GNUNET_MessageHeader header; @@ -99,6 +124,12 @@ struct SessionConnectMessage }; +/** + * Message we send to the other peer to notify him that we intentionally + * are disconnecting (to reduce timeouts). This is just a friendly + * notification, peers must not rely on always receiving disconnect + * messages. + */ struct SessionDisconnectMessage { /** @@ -136,8 +167,10 @@ struct SessionDisconnectMessage struct GNUNET_CRYPTO_RsaSignature signature; }; + GNUNET_NETWORK_STRUCT_END + /** * For each neighbour we keep a list of messages * that we still want to transmit to the neighbour. @@ -156,12 +189,6 @@ struct MessageQueue struct MessageQueue *prev; /** - * Once this message is actively being transmitted, which - * neighbour is it associated with? - */ - struct NeighbourMapEntry *n; - - /** * Function to call once we're done. */ GST_NeighbourSendContinuation cont; @@ -190,44 +217,196 @@ struct MessageQueue }; +/** + * Possible state of a neighbour. Initially, we are S_NOT_CONNECTED. + * + * Then, there are two main paths. If we receive a CONNECT message, we + * first run a check against the blacklist and ask ATS for a + * suggestion. (S_CONNECT_RECV_ATS). If the blacklist comes back + * positive, we give the address to ATS. If ATS makes a suggestion, + * we ALSO give that suggestion to the blacklist + * (S_CONNECT_RECV_BLACKLIST). Once the blacklist approves the + * address we got from ATS, we send our CONNECT_ACK and go to + * S_CONNECT_RECV_ACK. If we receive a SESSION_ACK, we go to + * S_CONNECTED (and notify everyone about the new connection). If the + * operation times out, we go to S_DISCONNECT. + * + * The other case is where we transmit a CONNECT message first. We + * start with S_INIT_ATS. If we get an address, we enter + * S_INIT_BLACKLIST and check the blacklist. If the blacklist is OK + * with the connection, we actually send the CONNECT message and go to + * state S_CONNECT_SENT. Once we receive a CONNECT_ACK, we go to + * S_CONNECTED (and notify everyone about the new connection and send + * back a SESSION_ACK). If the operation times out, we go to + * S_DISCONNECT. + * + * If the session is in trouble (i.e. transport-level disconnect or + * timeout), we go to S_RECONNECT_ATS where we ask ATS for a new + * address (we don't notify anyone about the disconnect yet). Once we + * have a new address, we go to S_RECONNECT_BLACKLIST to check the new + * address against the blacklist. If the blacklist approves, we enter + * S_RECONNECT_SENT and send a CONNECT message. If we receive a + * CONNECT_ACK, we go to S_CONNECTED and nobody noticed that we had + * trouble; we also send a SESSION_ACK at this time just in case. If + * the operation times out, we go to S_DISCONNECT (and notify everyone + * about the lost connection). + * + * If ATS decides to switch addresses while we have a normal + * connection, we go to S_CONNECTED_SWITCHING_BLACKLIST to check the + * new address against the blacklist. If the blacklist approves, we + * go to S_CONNECTED_SWITCHING_CONNECT_SENT and send a + * SESSION_CONNECT. If we get a SESSION_ACK back, we switch the + * primary connection to the suggested alternative from ATS, go back + * to S_CONNECTED and send a SESSION_ACK to the other peer just to be + * sure. If the operation times out (or the blacklist disapproves), + * we go to S_CONNECTED (and notify ATS that the given alternative + * address is "invalid"). + * + * Once a session is in S_DISCONNECT, it is cleaned up and then goes + * to (S_DISCONNECT_FINISHED). If we receive an explicit disconnect + * request, we can go from any state to S_DISCONNECT, possibly after + * generating disconnect notifications. + * + * Note that it is quite possible that while we are in any of these + * states, we could receive a 'CONNECT' request from the other peer. + * We then enter a 'weird' state where we pursue our own primary state + * machine (as described above), but with the 'send_connect_ack' flag + * set to 1. If our state machine allows us to send a 'CONNECT_ACK' + * (because we have an acceptable address), we send the 'CONNECT_ACK' + * and set the 'send_connect_ack' to 2. If we then receive a + * 'SESSION_ACK', we go to 'S_CONNECTED' (and reset 'send_connect_ack' + * to 0). + * + */ enum State { /** * fresh peer or completely disconnected */ - S_NOT_CONNECTED, + S_NOT_CONNECTED = 0, + + /** + * Asked to initiate connection, trying to get address from ATS + */ + S_INIT_ATS, + + /** + * Asked to initiate connection, trying to get address approved + * by blacklist. + */ + S_INIT_BLACKLIST, /** - * sent CONNECT message to other peer, waiting for CONNECT_ACK + * Sent CONNECT message to other peer, waiting for CONNECT_ACK */ S_CONNECT_SENT, /** - * received CONNECT message to other peer, sending CONNECT_ACK + * Received a CONNECT, asking ATS about address suggestions. + */ + S_CONNECT_RECV_ATS, + + /** + * Received CONNECT from other peer, got an address, checking with blacklist. + */ + S_CONNECT_RECV_BLACKLIST, + + /** + * CONNECT request from other peer was SESSION_ACK'ed, waiting for + * SESSION_ACK. */ - S_CONNECT_RECV, + S_CONNECT_RECV_ACK, /** - * received ACK or payload + * Got our CONNECT_ACK/SESSION_ACK, connection is up. */ S_CONNECTED, /** - * connection ended, fast reconnect + * Connection got into trouble, rest of the system still believes + * it to be up, but we're getting a new address from ATS. + */ + S_RECONNECT_ATS, + + /** + * Connection got into trouble, rest of the system still believes + * it to be up; we are checking the new address against the blacklist. + */ + S_RECONNECT_BLACKLIST, + + /** + * Sent CONNECT over new address (either by ATS telling us to switch + * addresses or from RECONNECT_ATS); if this fails, we need to tell + * the rest of the system about a disconnect. + */ + S_RECONNECT_SENT, + + /** + * We have some primary connection, but ATS suggested we switch + * to some alternative; we're now checking the alternative against + * the blacklist. + */ + S_CONNECTED_SWITCHING_BLACKLIST, + + /** + * We have some primary connection, but ATS suggested we switch + * to some alternative; we now sent a CONNECT message for the + * alternative session to the other peer and waiting for a + * CONNECT_ACK to make this our primary connection. + */ + S_CONNECTED_SWITCHING_CONNECT_SENT, + + /** + * Disconnect in progress (we're sending the DISCONNECT message to the + * other peer; after that is finished, the state will be cleaned up). */ - S_FAST_RECONNECT, + S_DISCONNECT, /** - * Disconnect in progress + * We're finished with the disconnect; clean up state now! */ - S_DISCONNECT + S_DISCONNECT_FINISHED }; -enum Address_State + +/** + * A possible address we could use to communicate with a neighbour. + */ +struct NeighbourAddress { - USED, - UNUSED, - FRESH, + + /** + * Active session for this address. + */ + struct Session *session; + + /** + * Network-level address information. + */ + struct GNUNET_HELLO_Address *address; + + /** + * Timestamp of the 'SESSION_CONNECT' message we sent to the other + * peer for this address. Use to check that the ACK is in response + * to our most recent 'CONNECT'. + */ + struct GNUNET_TIME_Absolute connect_timestamp; + + /** + * Inbound bandwidth from ATS for this address. + */ + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in; + + /** + * Outbound bandwidth from ATS for this address. + */ + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out; + + /** + * Did we tell ATS that this is our 'active' address? + */ + int ats_active; + }; @@ -255,14 +434,15 @@ struct NeighbourMapEntry struct MessageQueue *is_active; /** - * Active session for communicating with the peer. + * Primary address we currently use to communicate with the neighbour. */ - struct Session *session; + struct NeighbourAddress primary_address; /** - * Address we currently use. + * Alternative address currently under consideration for communicating + * with the neighbour. */ - struct GNUNET_HELLO_Address *address; + struct NeighbourAddress alternative_address; /** * Identity of this neighbour. @@ -270,93 +450,133 @@ struct NeighbourMapEntry struct GNUNET_PeerIdentity id; /** - * ID of task scheduled to run when this peer is about to - * time out (will free resources associated with the peer). + * Main task that drives this peer (timeouts, keepalives, etc.). + * Always runs the 'master_task'. */ - GNUNET_SCHEDULER_TaskIdentifier timeout_task; + GNUNET_SCHEDULER_TaskIdentifier task; /** - * ID of task scheduled to send keepalives. + * At what time should we sent the next keep-alive message? */ - GNUNET_SCHEDULER_TaskIdentifier keepalive_task; + struct GNUNET_TIME_Absolute keep_alive_time; /** - * ID of task scheduled to run when we should try transmitting - * the head of the message queue. + * At what time did we sent the last keep-alive message? Used + * to calculate round-trip time ("latency"). */ - GNUNET_SCHEDULER_TaskIdentifier transmission_task; + struct GNUNET_TIME_Absolute last_keep_alive_time; /** - * Tracker for inbound bandwidth. + * Timestamp we should include in our next CONNECT_ACK message. + * (only valid if 'send_connect_ack' is GNUNET_YES). Used to build + * our CONNECT_ACK message. */ - struct GNUNET_BANDWIDTH_Tracker in_tracker; + struct GNUNET_TIME_Absolute connect_ack_timestamp; /** - * Inbound bandwidth from ATS, activated when connection is up + * Time where we should cut the connection (timeout) if we don't + * make progress in the state machine (or get a KEEPALIVE_RESPONSE + * if we are in S_CONNECTED). */ - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in; + struct GNUNET_TIME_Absolute timeout; /** - * Inbound bandwidth from ATS, activated when connection is up + * Latest calculated latency value */ - struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out; + struct GNUNET_TIME_Relative latency; /** - * Timestamp of the 'SESSION_CONNECT' message we got from the other peer + * Tracker for inbound bandwidth. */ - struct GNUNET_TIME_Absolute connect_ts; + struct GNUNET_BANDWIDTH_Tracker in_tracker; /** - * When did we sent the last keep-alive message? + * How often has the other peer (recently) violated the inbound + * traffic limit? Incremented by 10 per violation, decremented by 1 + * per non-violation (for each time interval). */ - struct GNUNET_TIME_Absolute keep_alive_sent; + unsigned int quota_violation_count; /** - * Latest calculated latency value + * The current state of the peer. */ - struct GNUNET_TIME_Relative latency; + enum State state; /** - * Timeout for ATS - * We asked ATS for a new address for this peer + * Did we sent an KEEP_ALIVE message and are we expecting a response? */ - GNUNET_SCHEDULER_TaskIdentifier ats_suggest; + int expect_latency_response; /** - * Task the resets the peer state after due to an pending - * unsuccessful connection setup + * Flag to set if we still need to send a CONNECT_ACK message to the other peer + * (once we have an address to use and the peer has been allowed by our + * blacklist). Set to 1 if we need to send a CONNECT_ACK. Set to 2 if we + * did send a CONNECT_ACK and should go to 'S_CONNECTED' upon receiving + * a 'SESSION_ACK' (regardless of what our own state machine might say). */ - GNUNET_SCHEDULER_TaskIdentifier state_reset; + int send_connect_ack; + +}; +/** + * Context for blacklist checks and the 'handle_test_blacklist_cont' + * function. Stores information about ongoing blacklist checks. + */ +struct BlackListCheckContext +{ + /** - * How often has the other peer (recently) violated the inbound - * traffic limit? Incremented by 10 per violation, decremented by 1 - * per non-violation (for each time interval). + * We keep blacklist checks in a DLL. */ - unsigned int quota_violation_count; + struct BlackListCheckContext *next; + /** + * We keep blacklist checks in a DLL. + */ + struct BlackListCheckContext *prev; /** - * The current state of the peer - * Element of enum State + * Address that is being checked. */ - int state; + struct NeighbourAddress na; + + /** + * ATS information about the address. + */ + struct GNUNET_ATS_Information *ats; /** - * Did we sent an KEEP_ALIVE message and are we expecting a response? + * Handle to the ongoing blacklist check. */ - int expect_latency_response; - int address_state; + struct GST_BlacklistCheck *bc; + + /** + * Size of the 'ats' array. + */ + uint32_t ats_count; + }; /** - * All known neighbours and their HELLOs. + * Hash map from peer identities to the respective 'struct NeighbourMapEntry'. */ static struct GNUNET_CONTAINER_MultiHashMap *neighbours; /** + * We keep blacklist checks in a DLL so that we can find + * the 'sessions' in their 'struct NeighbourAddress' if + * a session goes down. + */ +static struct BlackListCheckContext *bc_head; + +/** + * We keep blacklist checks in a DLL. + */ +static struct BlackListCheckContext *bc_tail; + +/** * Closure for connect_notify_cb, disconnect_notify_cb and address_change_cb */ static void *callback_cls; @@ -379,7 +599,13 @@ static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb; /** * counter for connected neighbours */ -static int neighbours_connected; +static unsigned int neighbours_connected; + +/** + * Number of bytes we have currently queued for transmission. + */ +static unsigned long long bytes_in_send_queue; + /** * Lookup a neighbour entry in the neighbours hash map. @@ -390,468 +616,395 @@ static int neighbours_connected; static struct NeighbourMapEntry * lookup_neighbour (const struct GNUNET_PeerIdentity *pid) { + if (NULL == neighbours) + return NULL; return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey); } -/** - * Disconnect from the given neighbour, clean up the record. - * - * @param n neighbour to disconnect from - */ -static void -disconnect_neighbour (struct NeighbourMapEntry *n); - -#define change_state(n, state, ...) change (n, state, __LINE__) - -static int -is_connecting (struct NeighbourMapEntry *n) -{ - if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED)) - return GNUNET_YES; - return GNUNET_NO; -} - -static int -is_connected (struct NeighbourMapEntry *n) -{ - if (n->state == S_CONNECTED) - return GNUNET_YES; - return GNUNET_NO; -} - -static int -is_disconnecting (struct NeighbourMapEntry *n) -{ - if (n->state == S_DISCONNECT) - return GNUNET_YES; - return GNUNET_NO; -} - static const char * print_state (int state) { + switch (state) { - case S_CONNECTED: - return "S_CONNECTED"; + case S_NOT_CONNECTED: + return "S_NOT_CONNECTED"; + break; + case S_INIT_ATS: + return "S_INIT_ATS"; break; - case S_CONNECT_RECV: - return "S_CONNECT_RECV"; + case S_INIT_BLACKLIST: + return "S_INIT_BLACKLIST"; break; case S_CONNECT_SENT: return "S_CONNECT_SENT"; break; - case S_DISCONNECT: - return "S_DISCONNECT"; + case S_CONNECT_RECV_ATS: + return "S_CONNECT_RECV_ATS"; break; - case S_NOT_CONNECTED: - return "S_NOT_CONNECTED"; + case S_CONNECT_RECV_BLACKLIST: + return "S_CONNECT_RECV_BLACKLIST"; break; - case S_FAST_RECONNECT: - return "S_FAST_RECONNECT"; + case S_CONNECT_RECV_ACK: + return "S_CONNECT_RECV_ACK"; break; - default: - GNUNET_break (0); + case S_CONNECTED: + return "S_CONNECTED"; break; - } - return NULL; -} - -static int -change (struct NeighbourMapEntry *n, int state, int line); - -static void -ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); - - -static void -reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NeighbourMapEntry *n = cls; - - if (n == NULL) - return; - - n->state_reset = GNUNET_SCHEDULER_NO_TASK; - if (n->state == S_CONNECTED) - return; - -#if DEBUG_TRANSPORT - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# failed connection attempts due to timeout"), 1, - GNUNET_NO); -#endif - - /* resetting state */ - - if (n->state == S_FAST_RECONNECT) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Fast reconnect time out, disconnecting peer `%s'\n", - GNUNET_i2s (&n->id)); - disconnect_neighbour(n); - return; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n", - GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECTED", __LINE__); - - n->state = S_NOT_CONNECTED; - - /* destroying address */ - if (n->address != NULL) - { - GNUNET_assert (strlen (n->address->transport_name) > 0); - GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session); - } - - /* request new address */ - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = - GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, &n->id); -} - -static int -change (struct NeighbourMapEntry *n, int state, int line) -{ - int previous_state; - /* allowed transitions */ - int allowed = GNUNET_NO; - - previous_state = n->state; - - switch (n->state) - { - case S_NOT_CONNECTED: - if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) || - (state == S_DISCONNECT)) - allowed = GNUNET_YES; + case S_RECONNECT_ATS: + return "S_RECONNECT_ATS"; break; - case S_CONNECT_RECV: - allowed = GNUNET_YES; + case S_RECONNECT_BLACKLIST: + return "S_RECONNECT_BLACKLIST"; break; - case S_CONNECT_SENT: - allowed = GNUNET_YES; + case S_RECONNECT_SENT: + return "S_RECONNECT_SENT"; break; - case S_CONNECTED: - if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT)) - allowed = GNUNET_YES; + case S_CONNECTED_SWITCHING_BLACKLIST: + return "S_CONNECTED_SWITCHING_BLACKLIST"; + break; + case S_CONNECTED_SWITCHING_CONNECT_SENT: + return "S_CONNECTED_SWITCHING_CONNECT_SENT"; break; case S_DISCONNECT: + return "S_DISCONNECT"; break; - case S_FAST_RECONNECT: - if ((state == S_CONNECTED) || (state == S_DISCONNECT)) - allowed = GNUNET_YES; + case S_DISCONNECT_FINISHED: + return "S_DISCONNECT_FINISHED"; break; default: + return "UNDEFINED"; GNUNET_break (0); break; } - if (allowed == GNUNET_NO) - { - char *old = GNUNET_strdup (print_state (n->state)); - char *new = GNUNET_strdup (print_state (state)); - - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Illegal state transition from `%s' to `%s' in line %u \n", old, - new, line); - GNUNET_break (0); - GNUNET_free (old); - GNUNET_free (new); - return GNUNET_SYSERR; - } - { - char *old = GNUNET_strdup (print_state (n->state)); - char *new = GNUNET_strdup (print_state (state)); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "State for neighbour `%s' %X changed from `%s' to `%s' in line %u\n", - GNUNET_i2s (&n->id), n, old, new, line); - GNUNET_free (old); - GNUNET_free (new); - } - n->state = state; + GNUNET_break (0); + return "UNDEFINED"; +} +/** + * Test if we're connected to the given peer. + * + * @param n neighbour entry of peer to test + * @return GNUNET_YES if we are connected, GNUNET_NO if not + */ +static int +test_connected (struct NeighbourMapEntry *n) +{ + if (NULL == n) + return GNUNET_NO; switch (n->state) { - case S_FAST_RECONNECT: - case S_CONNECT_RECV: + case S_NOT_CONNECTED: + case S_INIT_ATS: + case S_INIT_BLACKLIST: case S_CONNECT_SENT: - if (n->state_reset != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->state_reset); - n->state_reset = - GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task, n); - break; + case S_CONNECT_RECV_ATS: + case S_CONNECT_RECV_BLACKLIST: + case S_CONNECT_RECV_ACK: + return GNUNET_NO; case S_CONNECTED: - case S_NOT_CONNECTED: + case S_RECONNECT_ATS: + case S_RECONNECT_BLACKLIST: + case S_RECONNECT_SENT: + case S_CONNECTED_SWITCHING_BLACKLIST: + case S_CONNECTED_SWITCHING_CONNECT_SENT: + return GNUNET_YES; case S_DISCONNECT: - if (GNUNET_SCHEDULER_NO_TASK != n->state_reset) - { -#if DEBUG_TRANSPORT - char *old = GNUNET_strdup (print_state (n->state)); - char *new = GNUNET_strdup (print_state (state)); - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Removed reset task for peer `%s' %s failed in state transition `%s' -> `%s' \n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, new); - GNUNET_free (old); - GNUNET_free (new); -#endif - GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK); - GNUNET_SCHEDULER_cancel (n->state_reset); - n->state_reset = GNUNET_SCHEDULER_NO_TASK; - } - break; - + case S_DISCONNECT_FINISHED: + return GNUNET_NO; default: - GNUNET_assert (0); - } - - if (NULL != address_change_cb) - { - if (n->state == S_CONNECTED) - address_change_cb (callback_cls, &n->id, n->address); - else if (previous_state == S_CONNECTED) - address_change_cb (callback_cls, &n->id, NULL); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); + break; } - - return GNUNET_OK; + return GNUNET_SYSERR; } -static ssize_t -send_with_session (struct NeighbourMapEntry *n, - const char *msgbuf, size_t msgbuf_size, - uint32_t priority, - struct GNUNET_TIME_Relative timeout, - GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_cls) +/** + * Send information about a new outbound quota to our clients. + * + * @param target affected peer + * @param quota new quota + */ +static void +send_outbound_quota (const struct GNUNET_PeerIdentity *target, + struct GNUNET_BANDWIDTH_Value32NBO quota) { - struct GNUNET_TRANSPORT_PluginFunctions *papi; - size_t ret = GNUNET_SYSERR; - - GNUNET_assert (n != NULL); - GNUNET_assert (n->session != NULL); - - papi = GST_plugins_find (n->address->transport_name); - if (papi == NULL) - { - if (cont != NULL) - cont (cont_cls, &n->id, GNUNET_SYSERR); - return GNUNET_SYSERR; - } - - ret = papi->send (papi->cls, - n->session, - msgbuf, msgbuf_size, - 0, - timeout, - cont, cont_cls); + struct QuotaSetMessage q_msg; - if ((ret == -1) && (cont != NULL)) - cont (cont_cls, &n->id, GNUNET_SYSERR); - return ret; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending outbound quota of %u Bps for peer `%s' to all clients\n", + ntohl (quota.value__), GNUNET_i2s (target)); + q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); + q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); + q_msg.quota = quota; + q_msg.peer = (*target); + GST_clients_broadcast (&q_msg.header, GNUNET_NO); } + /** - * Task invoked to start a transmission to another peer. + * We don't need a given neighbour address any more. + * Release its resources and give appropriate notifications + * to ATS and other subsystems. * - * @param cls the 'struct NeighbourMapEntry' - * @param tc scheduler context + * @param na address we are done with; 'na' itself must NOT be 'free'd, only the contents! */ static void -transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc); +free_address (struct NeighbourAddress *na) +{ + if (GNUNET_YES == na->ats_active) + { + GST_validation_set_address_use (na->address, na->session, GNUNET_NO, __LINE__); + GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_NO); + } + na->ats_active = GNUNET_NO; + if (NULL != na->address) + { + GNUNET_HELLO_address_free (na->address); + na->address = NULL; + } + na->session = NULL; +} /** - * We're done with our transmission attempt, continue processing. + * Initialize the 'struct NeighbourAddress'. * - * @param cls the 'struct MessageQueue' of the message - * @param receiver intended receiver - * @param success whether it worked or not + * @param na neighbour address to initialize + * @param address address of the other peer, NULL if other peer + * connected to us + * @param session session to use (or NULL, in which case an + * address must be setup) + * @param bandwidth_in inbound quota to be used when connection is up + * @param bandwidth_out outbound quota to be used when connection is up + * @param is_active GNUNET_YES to mark this as the active address with ATS */ static void -transmit_send_continuation (void *cls, - const struct GNUNET_PeerIdentity *receiver, - int success) +set_address (struct NeighbourAddress *na, + const struct GNUNET_HELLO_Address *address, + struct Session *session, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, + int is_active) { - struct MessageQueue *mq = cls; - struct NeighbourMapEntry *n; - struct NeighbourMapEntry *tmp; + struct GNUNET_TRANSPORT_PluginFunctions *papi; - tmp = lookup_neighbour (receiver); - n = mq->n; - if ((NULL != n) && (tmp != NULL) && (tmp == n)) + if (NULL == (papi = GST_plugins_find (address->transport_name))) { - GNUNET_assert (n->is_active == mq); - n->is_active = NULL; - if (success == GNUNET_YES) + GNUNET_break (0); + return; + } + if (session == na->session) + { + na->bandwidth_in = bandwidth_in; + na->bandwidth_out = bandwidth_out; + if (is_active != na->ats_active) { - GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); - n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); + na->ats_active = is_active; + GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, is_active); + GST_validation_set_address_use (na->address, na->session, is_active, __LINE__); } + if (GNUNET_YES == is_active) + { + /* FIXME: is this the right place to set quotas? */ + GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in); + send_outbound_quota (&address->peer, bandwidth_out); + } + return; + } + free_address (na); + if (NULL == session) + session = papi->get_session (papi->cls, address); + if (NULL == session) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Failed to obtain new session for peer `%s' and address '%s'\n", + GNUNET_i2s (&address->peer), GST_plugins_a2s (address)); + GNUNET_ATS_address_destroyed (GST_ats, address, NULL); + return; + } + na->address = GNUNET_HELLO_address_copy (address); + na->bandwidth_in = bandwidth_in; + na->bandwidth_out = bandwidth_out; + na->session = session; + na->ats_active = is_active; + if (GNUNET_YES == is_active) + { + /* Telling ATS about new session */ + GNUNET_ATS_address_update (GST_ats, na->address, na->session, NULL, 0); + GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_YES); + GST_validation_set_address_use (na->address, na->session, GNUNET_YES, __LINE__); + + /* FIXME: is this the right place to set quotas? */ + GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in); + send_outbound_quota (&address->peer, bandwidth_out); } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n", - ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type), - (success == GNUNET_OK) ? "successful" : "FAILED"); -#endif - if (NULL != mq->cont) - mq->cont (mq->cont_cls, success); - GNUNET_free (mq); } /** - * Check the ready list for the given neighbour and if a plugin is - * ready for transmission (and if we have a message), do so! + * Free a neighbour map entry. * - * @param n target peer for which to transmit + * @param n entry to free + * @param keep_sessions GNUNET_NO to tell plugin to terminate sessions, + * GNUNET_YES to keep all sessions */ static void -try_transmission_to_peer (struct NeighbourMapEntry *n) +free_neighbour (struct NeighbourMapEntry *n, int keep_sessions) { struct MessageQueue *mq; - struct GNUNET_TIME_Relative timeout; - ssize_t ret; + struct GNUNET_TRANSPORT_PluginFunctions *papi; - if (n->is_active != NULL) - { - GNUNET_break (0); - return; /* transmission already pending */ - } - if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_break (0); - return; /* currently waiting for bandwidth */ - } + n->is_active = NULL; /* always free'd by its own continuation! */ + + /* fail messages currently in the queue */ while (NULL != (mq = n->messages_head)) { - timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); - if (timeout.rel_value > 0) - break; GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); - n->is_active = mq; - mq->n = n; - transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */ + if (NULL != mq->cont) + mq->cont (mq->cont_cls, GNUNET_SYSERR); + GNUNET_free (mq); } - if (NULL == mq) - return; /* no more messages */ - - if (n->address == NULL) + /* It is too late to send other peer disconnect notifications, but at + least internally we need to get clean... */ + if (GNUNET_YES == test_connected (n)) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n", - GNUNET_i2s (&n->id)); -#endif - GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); - transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); - GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); - n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); - return; + GNUNET_STATISTICS_set (GST_stats, + gettext_noop ("# peers connected"), + --neighbours_connected, + GNUNET_NO); + disconnect_notify_cb (callback_cls, &n->id); } - if (GST_plugins_find (n->address->transport_name) == NULL) - { - GNUNET_break (0); - return; - } - GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); - n->is_active = mq; - mq->n = n; + /* FIXME-PLUGIN-API: This does not seem to guarantee that all + transport sessions eventually get killed due to inactivity; they + MUST have their own timeout logic (but at least TCP doesn't have + one yet). Are we sure that EVERY 'session' of a plugin is + actually cleaned up this way!? Note that if we are switching + between two TCP sessions to the same peer, the existing plugin + API gives us not even the means to selectively kill only one of + them! Killing all sessions like this seems to be very, very + wrong. */ + if ((GNUNET_NO == keep_sessions) && + (NULL != n->primary_address.address) && + (NULL != (papi = GST_plugins_find (n->primary_address.address->transport_name)))) + papi->disconnect (papi->cls, &n->id); - if ((n->address->address_length == 0) && (n->session == NULL)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n", - GNUNET_i2s (&n->id)); - transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); - GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); - n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); - return; - } + n->state = S_DISCONNECT_FINISHED; - ret = send_with_session(n, - mq->message_buf, mq->message_buf_size, - 0, timeout, - &transmit_send_continuation, mq); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multihashmap_remove (neighbours, + &n->id.hashPubKey, n)); + + /* cut transport-level connection */ + free_address (&n->primary_address); + free_address (&n->alternative_address); - if (ret == -1) + // FIXME-ATS-API: we might want to be more specific about + // which states we do this from in the future (ATS should + // have given us a 'suggest_address' handle, and if we have + // such a handle, we should cancel the operation here! + GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id); + + if (GNUNET_SCHEDULER_NO_TASK != n->task) { - /* failure, but 'send' would not call continuation in this case, - * so we need to do it here! */ - transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); + GNUNET_SCHEDULER_cancel (n->task); + n->task = GNUNET_SCHEDULER_NO_TASK; } - + /* free rest of memory */ + GNUNET_free (n); } /** - * Task invoked to start a transmission to another peer. + * Transmit a message using the current session of the given + * neighbour. * - * @param cls the 'struct NeighbourMapEntry' - * @param tc scheduler context + * @param n entry for the recipient + * @param msgbuf buffer to transmit + * @param msgbuf_size number of bytes in buffer + * @param priority transmission priority + * @param timeout transmission timeout + * @param cont continuation to call when finished (can be NULL) + * @param cont_cls closure for cont */ static void -transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +send_with_session (struct NeighbourMapEntry *n, + const char *msgbuf, size_t msgbuf_size, + uint32_t priority, + struct GNUNET_TIME_Relative timeout, + GNUNET_TRANSPORT_TransmitContinuation cont, + void *cont_cls) { - struct NeighbourMapEntry *n = cls; + struct GNUNET_TRANSPORT_PluginFunctions *papi; - GNUNET_assert (NULL != lookup_neighbour (&n->id)); - n->transmission_task = GNUNET_SCHEDULER_NO_TASK; - try_transmission_to_peer (n); + GNUNET_assert (n->primary_address.session != NULL); + if ( ( (NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name))) || + (-1 == papi->send (papi->cls, + n->primary_address.session, + msgbuf, msgbuf_size, + priority, + timeout, + cont, cont_cls))) && + (NULL != cont) ) + cont (cont_cls, &n->id, GNUNET_SYSERR); + GNUNET_break (NULL != papi); } /** - * Initialize the neighbours subsystem. + * Master task run for every neighbour. Performs all of the time-related + * activities (keep alive, send next message, disconnect if idle, finish + * clean up after disconnect). * - * @param cls closure for callbacks - * @param connect_cb function to call if we connect to a peer - * @param disconnect_cb function to call if we disconnect from a peer - * @param peer_address_cb function to call if we change an active address - * of a neighbour + * @param cls the 'struct NeighbourMapEntry' for which we are running + * @param tc scheduler context (unused) */ -void -GST_neighbours_start (void *cls, - GNUNET_TRANSPORT_NotifyConnect connect_cb, - GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb, - GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb) -{ - callback_cls = cls; - connect_notify_cb = connect_cb; - disconnect_notify_cb = disconnect_cb; - address_change_cb = peer_address_cb; - neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); -} +static void +master_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc); +/** + * Function called when the 'DISCONNECT' message has been sent by the + * plugin. Frees the neighbour --- if the entry still exists. + * + * @param cls NULL + * @param target identity of the neighbour that was disconnected + * @param result GNUNET_OK if the disconnect got out successfully + */ static void send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target, int result) { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending DISCONNECT message to peer `%4s': %i\n", - GNUNET_i2s (target), result); -#endif + struct NeighbourMapEntry *n; + + n = lookup_neighbour (target); + if (NULL == n) + return; /* already gone */ + if (S_DISCONNECT != n->state) + return; /* have created a fresh entry since */ + n->state = S_DISCONNECT; + if (GNUNET_SCHEDULER_NO_TASK != n->task) + GNUNET_SCHEDULER_cancel (n->task); + n->task = GNUNET_SCHEDULER_add_now (&master_task, n); } -static int -send_disconnect (struct NeighbourMapEntry * n) +/** + * Transmit a DISCONNECT message to the other peer. + * + * @param n neighbour to send DISCONNECT message. + */ +static void +send_disconnect (struct NeighbourMapEntry *n) { - size_t ret; struct SessionDisconnectMessage disconnect_msg; -#if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending DISCONNECT message to peer `%4s'\n", GNUNET_i2s (&n->id)); -#endif - disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessage)); disconnect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT); @@ -870,19 +1023,14 @@ send_disconnect (struct NeighbourMapEntry * n) &disconnect_msg.purpose, &disconnect_msg.signature)); - ret = send_with_session (n, - (const char *) &disconnect_msg, sizeof (disconnect_msg), - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, - &send_disconnect_cont, NULL); - - if (ret == GNUNET_SYSERR) - return GNUNET_SYSERR; - + send_with_session (n, + (const char *) &disconnect_msg, sizeof (disconnect_msg), + UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, + &send_disconnect_cont, NULL); GNUNET_STATISTICS_update (GST_stats, gettext_noop - ("# peers disconnected due to external request"), 1, + ("# DISCONNECT messages sent"), 1, GNUNET_NO); - return GNUNET_OK; } @@ -894,780 +1042,541 @@ send_disconnect (struct NeighbourMapEntry * n) static void disconnect_neighbour (struct NeighbourMapEntry *n) { - struct MessageQueue *mq; - int previous_state; - - previous_state = n->state; - - if (is_disconnecting (n)) - return; - - /* send DISCONNECT MESSAGE */ - if (previous_state == S_CONNECTED) - { - if (GNUNET_OK == send_disconnect (n)) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n", - GNUNET_i2s (&n->id)); - else - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not send DISCONNECT_MSG to `%s'\n", - GNUNET_i2s (&n->id)); - } - - change_state (n, S_DISCONNECT); - - if (previous_state == S_CONNECTED) - { - GNUNET_assert (NULL != n->address); - if (n->address_state == USED) - { - GST_validation_set_address_use (n->address, n->session, GNUNET_NO); - GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO); - n->address_state = UNUSED; - } - } - - if (n->address != NULL) - { - struct GNUNET_TRANSPORT_PluginFunctions *papi; - - papi = GST_plugins_find (n->address->transport_name); - if (papi != NULL) - papi->disconnect (papi->cls, &n->id); - } - while (NULL != (mq = n->messages_head)) - { - GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); - if (NULL != mq->cont) - mq->cont (mq->cont_cls, GNUNET_SYSERR); - GNUNET_free (mq); - } - if (NULL != n->is_active) - { - n->is_active->n = NULL; - n->is_active = NULL; - } - - switch (previous_state) + /* depending on state, notify neighbour and/or upper layers of this peer + about disconnect */ + switch (n->state) { + case S_NOT_CONNECTED: + case S_INIT_ATS: + case S_INIT_BLACKLIST: + /* other peer is completely unaware of us, no need to send DISCONNECT */ + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; + case S_CONNECT_SENT: + send_disconnect (n); + n->state = S_DISCONNECT; + break; + case S_CONNECT_RECV_ATS: + case S_CONNECT_RECV_BLACKLIST: + /* we never ACK'ed the other peer's request, no need to send DISCONNECT */ + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; + case S_CONNECT_RECV_ACK: + /* we DID ACK the other peer's request, must send DISCONNECT */ + send_disconnect (n); + n->state = S_DISCONNECT; + break; case S_CONNECTED: - GNUNET_assert (neighbours_connected > 0); - neighbours_connected--; - GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task); - GNUNET_SCHEDULER_cancel (n->keepalive_task); - n->keepalive_task = GNUNET_SCHEDULER_NO_TASK; - n->expect_latency_response = GNUNET_NO; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1, - GNUNET_NO); + case S_RECONNECT_BLACKLIST: + case S_RECONNECT_SENT: + case S_CONNECTED_SWITCHING_BLACKLIST: + case S_CONNECTED_SWITCHING_CONNECT_SENT: + /* we are currently connected, need to send disconnect and do + internal notifications and update statistics */ + send_disconnect (n); + GNUNET_STATISTICS_set (GST_stats, + gettext_noop ("# peers connected"), + --neighbours_connected, + GNUNET_NO); disconnect_notify_cb (callback_cls, &n->id); + n->state = S_DISCONNECT; break; - case S_FAST_RECONNECT: - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# fast reconnects failed"), 1, - GNUNET_NO); + case S_RECONNECT_ATS: + /* ATS address request timeout, disconnect without sending disconnect message */ + GNUNET_STATISTICS_set (GST_stats, + gettext_noop ("# peers connected"), + --neighbours_connected, + GNUNET_NO); disconnect_notify_cb (callback_cls, &n->id); + n->state = S_DISCONNECT; + break; + case S_DISCONNECT: + /* already disconnected, ignore */ + break; + case S_DISCONNECT_FINISHED: + /* already cleaned up, how did we get here!? */ + GNUNET_assert (0); break; default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); break; } + /* schedule timeout to clean up */ + if (GNUNET_SCHEDULER_NO_TASK != n->task) + GNUNET_SCHEDULER_cancel (n->task); + n->task = GNUNET_SCHEDULER_add_delayed (DISCONNECT_SENT_TIMEOUT, + &master_task, n); +} - GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (neighbours, - &n->id.hashPubKey, n)); - if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest) - { - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = GNUNET_SCHEDULER_NO_TASK; - } - if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task) - { - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = GNUNET_SCHEDULER_NO_TASK; - } - if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task) +/** + * We're done with our transmission attempt, continue processing. + * + * @param cls the 'struct MessageQueue' of the message + * @param receiver intended receiver + * @param success whether it worked or not + */ +static void +transmit_send_continuation (void *cls, + const struct GNUNET_PeerIdentity *receiver, + int success) +{ + struct MessageQueue *mq = cls; + struct NeighbourMapEntry *n; + + if (NULL == (n = lookup_neighbour (receiver))) { - GNUNET_SCHEDULER_cancel (n->transmission_task); - n->transmission_task = GNUNET_SCHEDULER_NO_TASK; + GNUNET_free (mq); + return; /* disconnect or other error while transmitting, can happen */ } - if (NULL != n->address) + if (n->is_active == mq) { - GNUNET_HELLO_address_free (n->address); - n->address = NULL; - } - n->session = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n", - GNUNET_i2s (&n->id), n); - GNUNET_free (n); + /* this is still "our" neighbour, remove us from its queue + and allow it to send the next message now */ + n->is_active = NULL; + if (GNUNET_SCHEDULER_NO_TASK != n->task) + GNUNET_SCHEDULER_cancel (n->task); + n->task = GNUNET_SCHEDULER_add_now (&master_task, n); + } + GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size); + bytes_in_send_queue -= mq->message_buf_size; + GNUNET_STATISTICS_set (GST_stats, + gettext_noop + ("# bytes in message queue for other peers"), + bytes_in_send_queue, GNUNET_NO); + if (GNUNET_OK == success) + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# messages transmitted to other peers"), + 1, GNUNET_NO); + else + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# transmission failures for messages to other peers"), + 1, GNUNET_NO); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending message to `%s' of type %u was a %s\n", + GNUNET_i2s (receiver), + ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type), + (success == GNUNET_OK) ? "success" : "FAILURE"); + if (NULL != mq->cont) + mq->cont (mq->cont_cls, success); + GNUNET_free (mq); } /** - * Peer has been idle for too long. Disconnect. + * Check the message list for the given neighbour and if we can + * send a message, do so. This function should only be called + * if the connection is at least generally ready for transmission. + * While we will only send one message at a time, no bandwidth + * quota management is performed here. If a message was given to + * the plugin, the continuation will automatically re-schedule + * the 'master' task once the next message might be transmitted. * - * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle - * @param tc scheduler context + * @param n target peer for which to transmit */ static void -neighbour_timeout_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +try_transmission_to_peer (struct NeighbourMapEntry *n) { - struct NeighbourMapEntry *n = cls; + struct MessageQueue *mq; + struct GNUNET_TIME_Relative timeout; - n->timeout_task = GNUNET_SCHEDULER_NO_TASK; + if (NULL == n->primary_address.address) + { + /* no address, why are we here? */ + GNUNET_break (0); + return; + } + if ((0 == n->primary_address.address->address_length) && + (NULL == n->primary_address.session)) + { + /* no address, why are we here? */ + GNUNET_break (0); + return; + } + if (NULL != n->is_active) + { + /* transmission already pending */ + return; + } - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# peers disconnected due to timeout"), 1, - GNUNET_NO); - disconnect_neighbour (n); + /* timeout messages from the queue that are past their due date */ + while (NULL != (mq = n->messages_head)) + { + timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); + if (timeout.rel_value > 0) + break; + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# messages timed out while in transport queue"), + 1, GNUNET_NO); + GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); + n->is_active = mq; + transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */ + } + if (NULL == mq) + return; /* no more messages */ + GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); + n->is_active = mq; + send_with_session (n, + mq->message_buf, mq->message_buf_size, + 0 /* priority */, timeout, + &transmit_send_continuation, mq); } /** - * Send another keepalive message. + * Send keepalive message to the neighbour. Must only be called + * if we are on 'connected' state. Will internally determine + * if a keepalive is truly needed (so can always be called). * - * @param cls the 'struct NeighbourMapEntry' of the neighbour that went idle - * @param tc scheduler context + * @param n neighbour that went idle and needs a keepalive */ static void -neighbour_keepalive_task (void *cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) +send_keepalive (struct NeighbourMapEntry *n) { - struct NeighbourMapEntry *n = cls; struct GNUNET_MessageHeader m; - int ret; GNUNET_assert (S_CONNECTED == n->state); - n->keepalive_task = - GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, - &neighbour_keepalive_task, n); - - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1, - GNUNET_NO); + if (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time).rel_value > 0) + return; /* no keepalive needed at this time */ m.size = htons (sizeof (struct GNUNET_MessageHeader)); m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE); - - ret = send_with_session (n, - (const void *) &m, sizeof (m), - UINT32_MAX /* priority */ , - GNUNET_TIME_UNIT_FOREVER_REL, - NULL, NULL); - - n->expect_latency_response = GNUNET_NO; - n->keep_alive_sent = GNUNET_TIME_absolute_get_zero (); - if (ret != GNUNET_SYSERR) - { - n->expect_latency_response = GNUNET_YES; - n->keep_alive_sent = GNUNET_TIME_absolute_get (); - } - + send_with_session (n, + (const void *) &m, sizeof (m), + UINT32_MAX /* priority */, + KEEPALIVE_FREQUENCY, + NULL, NULL); + GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), 1, + GNUNET_NO); + n->expect_latency_response = GNUNET_YES; + n->last_keep_alive_time = GNUNET_TIME_absolute_get (); + n->keep_alive_time = GNUNET_TIME_relative_to_absolute (KEEPALIVE_FREQUENCY); } /** - * Disconnect from the given neighbour. + * Keep the connection to the given neighbour alive longer, + * we received a KEEPALIVE (or equivalent); send a response. * - * @param cls unused - * @param key hash of neighbour's public key (not used) - * @param value the 'struct NeighbourMapEntry' of the neighbour + * @param neighbour neighbour to keep alive (by sending keep alive response) */ -static int -disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value) +void +GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour) { - struct NeighbourMapEntry *n = value; + struct NeighbourMapEntry *n; + struct GNUNET_MessageHeader m; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n", - GNUNET_i2s (&n->id), "SHUTDOWN_TASK"); -#endif - if (S_CONNECTED == n->state) + if (NULL == (n = lookup_neighbour (neighbour))) + { GNUNET_STATISTICS_update (GST_stats, gettext_noop - ("# peers disconnected due to global disconnect"), + ("# KEEPALIVE messages discarded (peer unknown)"), 1, GNUNET_NO); - disconnect_neighbour (n); - return GNUNET_OK; -} - - -static void -ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NeighbourMapEntry *n = cls; - - n->ats_suggest = GNUNET_SCHEDULER_NO_TASK; - - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "ATS did not suggested address to connect to peer `%s'\n", - GNUNET_i2s (&n->id)); - - disconnect_neighbour (n); -} - -/** - * Cleanup the neighbours subsystem. - */ -void -GST_neighbours_stop () -{ - // This can happen during shutdown - if (neighbours == NULL) + return; + } + if (NULL == n->primary_address.session) { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# KEEPALIVE messages discarded (no session)"), + 1, GNUNET_NO); return; } - - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighbours, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (neighbours); -// GNUNET_assert (neighbours_connected == 0); - neighbours = NULL; - callback_cls = NULL; - connect_notify_cb = NULL; - disconnect_notify_cb = NULL; - address_change_cb = NULL; + /* send reply to allow neighbour to measure latency */ + m.size = htons (sizeof (struct GNUNET_MessageHeader)); + m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE); + send_with_session(n, + (const void *) &m, sizeof (m), + UINT32_MAX /* priority */, + KEEPALIVE_FREQUENCY, + NULL, NULL); } -struct ContinutionContext -{ - struct GNUNET_HELLO_Address *address; - - struct Session *session; -}; - -static void -send_outbound_quota (const struct GNUNET_PeerIdentity *target, - struct GNUNET_BANDWIDTH_Value32NBO quota) -{ - struct QuotaSetMessage q_msg; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Sending outbound quota of %u Bps for peer `%s' to all clients\n", - ntohl (quota.value__), GNUNET_i2s (target)); -#endif - q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); - q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); - q_msg.quota = quota; - q_msg.peer = (*target); - GST_clients_broadcast (&q_msg.header, GNUNET_NO); -} /** - * We tried to send a SESSION_CONNECT message to another peer. If this - * succeeded, we change the state. If it failed, we should tell - * ATS to not use this address anymore (until it is re-validated). + * We received a KEEP_ALIVE_RESPONSE message and use this to calculate + * latency to this peer. Pass the updated information (existing ats + * plus calculated latency) to ATS. * - * @param cls the 'struct GNUNET_HELLO_Address' of the address that was tried - * @param target peer to send the message to - * @param success GNUNET_OK on success + * @param neighbour neighbour to keep alive + * @param ats performance data + * @param ats_count number of entries in ats */ -static void -send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *target, - int success) +void +GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count) { - struct ContinutionContext *cc = cls; - struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer); + struct NeighbourMapEntry *n; + uint32_t latency; + struct GNUNET_ATS_Information ats_new[ats_count + 1]; - if (GNUNET_YES != success) - { - GNUNET_assert (strlen (cc->address->transport_name) > 0); - GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); - } - if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT)) + if (NULL == (n = lookup_neighbour (neighbour))) { - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# KEEPALIVE_RESPONSE messages discarded (not connected)"), + 1, GNUNET_NO); return; } - - if ((GNUNET_YES == success) && - ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT))) + if ( (S_CONNECTED != n->state) || + (GNUNET_YES != n->expect_latency_response) ) { - change_state (n, S_CONNECT_SENT); - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# KEEPALIVE_RESPONSE messages discarded (not expected)"), + 1, GNUNET_NO); return; } - - if ((GNUNET_NO == success) && - ((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT))) - { -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %p, asking ATS for new address \n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session); -#endif - change_state (n, S_NOT_CONNECTED); - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = - GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, &n->id); - } - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); + n->expect_latency_response = GNUNET_NO; + n->latency = GNUNET_TIME_absolute_get_duration (n->last_keep_alive_time); + n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Latency for peer `%s' is %llu ms\n", + GNUNET_i2s (&n->id), n->latency.rel_value); + memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count); + /* append latency */ + ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); + if (n->latency.rel_value > UINT32_MAX) + latency = UINT32_MAX; + else + latency = n->latency.rel_value; + ats_new[ats_count].value = htonl (latency); + GNUNET_ATS_address_update (GST_ats, + n->primary_address.address, + n->primary_address.session, ats_new, + ats_count + 1); } /** - * We tried to switch addresses with an peer already connected. If it failed, - * we should tell ATS to not use this address anymore (until it is re-validated). + * We have received a message from the given sender. How long should + * we delay before receiving more? (Also used to keep the peer marked + * as live). * - * @param cls the 'struct NeighbourMapEntry' - * @param target peer to send the message to - * @param success GNUNET_OK on success + * @param sender sender of the message + * @param size size of the message + * @param do_forward set to GNUNET_YES if the message should be forwarded to clients + * GNUNET_NO if the neighbour is not connected or violates the quota, + * GNUNET_SYSERR if the connection is not fully up yet + * @return how long to wait before reading more from this sender */ -static void -send_switch_address_continuation (void *cls, - const struct GNUNET_PeerIdentity *target, - int success) +struct GNUNET_TIME_Relative +GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity + *sender, ssize_t size, int *do_forward) { - struct ContinutionContext *cc = cls; struct NeighbourMapEntry *n; - - if (neighbours == NULL) + struct GNUNET_TIME_Relative ret; + + if (NULL == neighbours) { - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); - return; /* neighbour is going away */ + *do_forward = GNUNET_NO; + return GNUNET_TIME_UNIT_FOREVER_REL; /* This can happen during shutdown */ } - - n = lookup_neighbour (&cc->address->peer); - if ((n == NULL) || (is_disconnecting (n))) + if (NULL == (n = lookup_neighbour (sender))) + { + GST_neighbours_try_connect (sender); + if (NULL == (n = lookup_neighbour (sender))) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# messages discarded due to lack of neighbour record"), + 1, GNUNET_NO); + *do_forward = GNUNET_NO; + return GNUNET_TIME_UNIT_ZERO; + } + } + if (! test_connected (n)) { - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); - return; /* neighbour is going away */ + *do_forward = GNUNET_SYSERR; + return GNUNET_TIME_UNIT_ZERO; } - - GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT)); - if (GNUNET_YES != success) + if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size)) { -#if DEBUG_TRANSPORT + n->quota_violation_count++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to switch connected peer `%s' to address '%s' session %X, asking ATS for new address \n", - GNUNET_i2s (&n->id), GST_plugins_a2s (cc->address), cc->session); -#endif - GNUNET_assert (strlen (cc->address->transport_name) > 0); - GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); - - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = - GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, &n->id); - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); - return; + "Bandwidth quota (%u b/s) violation detected (total of %u).\n", + n->in_tracker.available_bytes_per_s__, + n->quota_violation_count); + /* Discount 32k per violation */ + GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024); } - /* Tell ATS that switching addresses was successful */ - switch (n->state) + else { - case S_CONNECTED: - if (n->address_state == FRESH) + if (n->quota_violation_count > 0) { - GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES); - GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0); - if (cc->session != n->session) - GNUNET_break (0); - GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES); - n->address_state = USED; + /* try to add 32k back */ + GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024); + n->quota_violation_count--; } - break; - case S_FAST_RECONNECT: -#if DEBUG_TRANSPORT + } + if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) + { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# bandwidth quota violations by other peers"), + 1, GNUNET_NO); + *do_forward = GNUNET_NO; + return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT; + } + *do_forward = GNUNET_YES; + ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024); + if (ret.rel_value > 0) + { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Successful fast reconnect to peer `%s'\n", - GNUNET_i2s (&n->id)); -#endif - change_state (n, S_CONNECTED); - neighbours_connected++; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, - GNUNET_NO); - - if (n->address_state == FRESH) - { - GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES); - GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0); - GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_YES); - n->address_state = USED; - } - - if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK) - n->keepalive_task = - GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n); - - /* Updating quotas */ - GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); - send_outbound_quota (target, n->bandwidth_out); - - default: - break; + "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n", + (unsigned long long) n->in_tracker. + consumption_since_last_update__, + (unsigned int) n->in_tracker.available_bytes_per_s__, + (unsigned long long) ret.rel_value); + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# ms throttling suggested"), + (int64_t) ret.rel_value, GNUNET_NO); } - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); + return ret; } /** - * We tried to send a SESSION_CONNECT message to another peer. If this - * succeeded, we change the state. If it failed, we should tell - * ATS to not use this address anymore (until it is re-validated). + * Transmit a message to the given target using the active connection. * - * @param cls the 'struct NeighbourMapEntry' - * @param target peer to send the message to - * @param success GNUNET_OK on success + * @param target destination + * @param msg message to send + * @param msg_size number of bytes in msg + * @param timeout when to fail with timeout + * @param cont function to call when done + * @param cont_cls closure for 'cont' */ -static void -send_connect_ack_continuation (void *cls, - const struct GNUNET_PeerIdentity *target, - int success) +void +GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, + size_t msg_size, struct GNUNET_TIME_Relative timeout, + GST_NeighbourSendContinuation cont, void *cont_cls) { - struct ContinutionContext *cc = cls; struct NeighbourMapEntry *n; + struct MessageQueue *mq; - if (neighbours == NULL) - { - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); - return; /* neighbour is going away */ - } - - n = lookup_neighbour (&cc->address->peer); - if ((n == NULL) || (is_disconnecting (n))) + /* All ove these cases should never happen; they are all API violations. + But we check anyway, just to be sure. */ + if (NULL == (n = lookup_neighbour (target))) { - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); - return; /* neighbour is going away */ + GNUNET_break (0); + if (NULL != cont) + cont (cont_cls, GNUNET_SYSERR); + return; } - - if (GNUNET_YES == success) + if (GNUNET_YES != test_connected (n)) { - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); - return; /* sending successful */ + GNUNET_break (0); + if (NULL != cont) + cont (cont_cls, GNUNET_SYSERR); + return; } - - /* sending failed, ask for next address */ -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MSG to peer `%4s' with address '%s' session %X, asking ATS for new address \n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session); -#endif - change_state (n, S_NOT_CONNECTED); - GNUNET_assert (strlen (cc->address->transport_name) > 0); - GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); - - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = - GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, &n->id); - GNUNET_HELLO_address_free (cc->address); - GNUNET_free (cc); + bytes_in_send_queue += msg_size; + GNUNET_STATISTICS_set (GST_stats, + gettext_noop + ("# bytes in message queue for other peers"), + bytes_in_send_queue, GNUNET_NO); + mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size); + mq->cont = cont; + mq->cont_cls = cont_cls; + memcpy (&mq[1], msg, msg_size); + mq->message_buf = (const char *) &mq[1]; + mq->message_buf_size = msg_size; + mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); + GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq); + if ( (NULL != n->is_active) || + ( (NULL == n->primary_address.session) && (NULL == n->primary_address.address)) ) + return; + if (GNUNET_SCHEDULER_NO_TASK != n->task) + GNUNET_SCHEDULER_cancel (n->task); + n->task = GNUNET_SCHEDULER_add_now (&master_task, n); } /** - * For an existing neighbour record, set the active connection to - * use the given address. + * Send a SESSION_CONNECT message via the given address. * - * @param peer identity of the peer to switch the address for - * @param address address of the other peer, NULL if other peer - * connected to us - * @param session session to use (or NULL) - * @param ats performance data - * @param ats_count number of entries in ats - * @param bandwidth_in inbound quota to be used when connection is up - * @param bandwidth_out outbound quota to be used when connection is up - * @return GNUNET_YES if we are currently connected, GNUNET_NO if the - * connection is not up (yet) + * @param na address to use */ -int -GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Address - *address, - struct Session *session, - const struct GNUNET_ATS_Information *ats, - uint32_t ats_count, - struct GNUNET_BANDWIDTH_Value32NBO - bandwidth_in, - struct GNUNET_BANDWIDTH_Value32NBO - bandwidth_out) +static void +send_session_connect (struct NeighbourAddress *na) { - struct NeighbourMapEntry *n; - struct SessionConnectMessage connect_msg; - struct ContinutionContext *cc; - size_t msg_len; - size_t ret; - - if (neighbours == NULL) - { - /* This can happen during shutdown */ - return GNUNET_NO; - } - n = lookup_neighbour (peer); - if (NULL == n) - return GNUNET_NO; - if (n->state == S_DISCONNECT) - { - /* We are disconnecting, nothing to do here */ - return GNUNET_NO; - } - GNUNET_assert (address->transport_name != NULL); - if ((session == NULL) && (0 == address->address_length)) - { - GNUNET_break_op (0); - /* FIXME: is this actually possible? When does this happen? */ - if (strlen (address->transport_name) > 0) - GNUNET_ATS_address_destroyed (GST_ats, address, session); - GNUNET_ATS_suggest_address (GST_ats, peer); - return GNUNET_NO; - } - - /* checks successful and neighbour != NULL */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "ATS tells us to switch to address '%s' session %p for peer `%s' in state `%s'\n", - (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>", - session, - GNUNET_i2s (peer), - print_state (n->state)); - - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = GNUNET_SCHEDULER_NO_TASK; - } - /* do not switch addresses just update quotas */ -/* - if (n->state == S_FAST_RECONNECT) - { - if (0 == GNUNET_HELLO_address_cmp(address, n->address)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "FAST RECONNECT to peer `%s' and address '%s' with identical ADDRESS\n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address)); - } - } -*/ - if ((n->state == S_CONNECTED) && (NULL != n->address) && - (0 == GNUNET_HELLO_address_cmp (address, n->address)) && - (n->session == session)) - { - n->bandwidth_in = bandwidth_in; - n->bandwidth_out = bandwidth_out; - GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); - send_outbound_quota (peer, n->bandwidth_out); - return GNUNET_NO; - } - if (n->state == S_CONNECTED) - { - /* mark old address as no longer used */ - GNUNET_assert (NULL != n->address); - if (n->address_state == USED) - { - GST_validation_set_address_use (n->address, n->session, GNUNET_NO); - GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO); - n->address_state = UNUSED; - } - } - - /* set new address */ - if (NULL != n->address) - GNUNET_HELLO_address_free (n->address); - n->address = GNUNET_HELLO_address_copy (address); - n->address_state = FRESH; - n->bandwidth_in = bandwidth_in; - n->bandwidth_out = bandwidth_out; - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - - if (NULL != address_change_cb && n->state == S_CONNECTED) - address_change_cb (callback_cls, &n->id, n->address); - - /* Obtain an session for this address from plugin */ struct GNUNET_TRANSPORT_PluginFunctions *papi; - papi = GST_plugins_find (address->transport_name); - - if (papi == NULL) - { - /* we don't have the plugin for this address */ - GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL); - - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, - ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, &n->id); - GNUNET_HELLO_address_free (n->address); - n->address = NULL; - n->session = NULL; - return GNUNET_NO; - } - - if (session == NULL) - { - n->session = papi->get_session (papi->cls, address); - /* Session could not be initiated */ - if (n->session == NULL) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to obtain new session %p for peer `%s' and address '%s'\n", - n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address)); - - GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL); - - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, - ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, &n->id); - GNUNET_HELLO_address_free (n->address); - n->address = NULL; - n->session = NULL; - return GNUNET_NO; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Obtained new session %p for peer `%s' and address '%s'\n", - n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->address)); - /* Telling ATS about new session */ - GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0); - } - else + struct SessionConnectMessage connect_msg; + + if (NULL == (papi = GST_plugins_find (na->address->transport_name))) { - n->session = session; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Using existing session %p for peer `%s' and address '%s'\n", - n->session, - GNUNET_i2s (&n->id), - (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>"); + GNUNET_break (0); + return; } - - switch (n->state) + if (NULL == na->session) + na->session = papi->get_session (papi->cls, na->address); + if (NULL == na->session) { - case S_NOT_CONNECTED: - case S_CONNECT_SENT: - msg_len = sizeof (struct SessionConnectMessage); - connect_msg.header.size = htons (msg_len); - connect_msg.header.type = - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); - connect_msg.reserved = htonl (0); - connect_msg.timestamp = - GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - - cc = GNUNET_malloc (sizeof (struct ContinutionContext)); - cc->session = n->session; - cc->address = GNUNET_HELLO_address_copy (address); - - ret = send_with_session (n, - (const char *) &connect_msg, msg_len, - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, - &send_connect_continuation, cc); - - return GNUNET_NO; - case S_CONNECT_RECV: - /* We received a CONNECT message and asked ATS for an address */ - msg_len = sizeof (struct SessionConnectMessage); - connect_msg.header.size = htons (msg_len); - connect_msg.header.type = - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK); - connect_msg.reserved = htonl (0); - connect_msg.timestamp = - GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - cc = GNUNET_malloc (sizeof (struct ContinutionContext)); - cc->session = n->session; - cc->address = GNUNET_HELLO_address_copy (address); - - ret = send_with_session(n, - (const void *) &connect_msg, msg_len, - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, - &send_connect_ack_continuation, - cc); - return GNUNET_NO; - case S_CONNECTED: - case S_FAST_RECONNECT: - /* connected peer is switching addresses or tries fast reconnect */ - msg_len = sizeof (struct SessionConnectMessage); - connect_msg.header.size = htons (msg_len); - connect_msg.header.type = - htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); - connect_msg.reserved = htonl (0); - connect_msg.timestamp = - GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); - cc = GNUNET_malloc (sizeof (struct ContinutionContext)); - cc->session = n->session; - cc->address = GNUNET_HELLO_address_copy (address); - ret = send_with_session(n, - (const void *) &connect_msg, msg_len, - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, - &send_switch_address_continuation, cc); - if (ret == GNUNET_SYSERR) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send CONNECT_MESSAGE to `%4s' using address '%s' session %X\n", - GNUNET_i2s (peer), GST_plugins_a2s (address), session); - } - return GNUNET_NO; - default: - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Invalid connection state to switch addresses %u \n", n->state); - GNUNET_break_op (0); - return GNUNET_NO; + GNUNET_break (0); + return; } + na->connect_timestamp = GNUNET_TIME_absolute_get (); + connect_msg.header.size = htons (sizeof (struct SessionConnectMessage)); + connect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); + connect_msg.reserved = htonl (0); + connect_msg.timestamp = GNUNET_TIME_absolute_hton (na->connect_timestamp); + (void) papi->send (papi->cls, + na->session, + (const char *) &connect_msg, sizeof (struct SessionConnectMessage), + UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + NULL, NULL); } /** - * Obtain current latency information for the given neighbour. + * Send a SESSION_CONNECT_ACK message via the given address. * - * @param peer - * @return observed latency of the address, FOREVER if the address was - * never successfully validated + * @param address address to use + * @param session session to use + * @param timestamp timestamp to use for the ACK message */ -struct GNUNET_TIME_Relative -GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer) -{ - struct NeighbourMapEntry *n; - - n = lookup_neighbour (peer); - if ((NULL == n) || ((n->address == NULL) && (n->session == NULL))) - return GNUNET_TIME_UNIT_FOREVER_REL; - - return n->latency; -} - -/** - * Obtain current address information for the given neighbour. - * - * @param peer - * @return address currently used - */ -struct GNUNET_HELLO_Address * -GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer) +static void +send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address, + struct Session *session, + struct GNUNET_TIME_Absolute timestamp) { - struct NeighbourMapEntry *n; - - n = lookup_neighbour (peer); - if ((NULL == n) || ((n->address == NULL) && (n->session == NULL))) - return NULL; - - return n->address; + struct GNUNET_TRANSPORT_PluginFunctions *papi; + struct SessionConnectMessage connect_msg; + + if (NULL == (papi = GST_plugins_find (address->transport_name))) + { + GNUNET_break (0); + return; + } + if (NULL == session) + session = papi->get_session (papi->cls, address); + if (NULL == session) + { + GNUNET_break (0); + return; + } + connect_msg.header.size = htons (sizeof (struct SessionConnectMessage)); + connect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK); + connect_msg.reserved = htonl (0); + connect_msg.timestamp = GNUNET_TIME_absolute_hton (timestamp); + (void) papi->send (papi->cls, + session, + (const char *) &connect_msg, sizeof (struct SessionConnectMessage), + UINT_MAX, + GNUNET_TIME_UNIT_FOREVER_REL, + NULL, NULL); } - /** - * Create an entry in the neighbour map for the given peer + * Create a fresh entry in the neighbour map for the given peer * * @param peer peer to create an entry for * @return new neighbour map entry @@ -1677,20 +1586,17 @@ setup_neighbour (const struct GNUNET_PeerIdentity *peer) { struct NeighbourMapEntry *n; -#if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (peer)); -#endif + "Creating new neighbour entry for `%s'\n", + GNUNET_i2s (peer)); n = GNUNET_malloc (sizeof (struct NeighbourMapEntry)); n->id = *peer; n->state = S_NOT_CONNECTED; - n->latency = GNUNET_TIME_relative_get_forever (); + n->latency = GNUNET_TIME_UNIT_FOREVER_REL; GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, MAX_BANDWIDTH_CARRY_S); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); + n->task = GNUNET_SCHEDULER_add_now (&master_task, n); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multihashmap_put (neighbours, &n->id.hashPubKey, n, @@ -1700,6 +1606,32 @@ setup_neighbour (const struct GNUNET_PeerIdentity *peer) /** + * Check if the two given addresses are the same. + * Actually only checks if the sessions are non-NULL + * (which they should be) and then if they are identical; + * the actual addresses don't matter if the session + * pointers match anyway, and we must have session pointers + * at this time. + * + * @param a1 first address to compare + * @param a2 other address to compare + * @return GNUNET_NO if the addresses do not match, GNUNET_YES if they do match + */ +static int +address_matches (const struct NeighbourAddress *a1, + const struct NeighbourAddress *a2) +{ + if ( (NULL == a1->session) || + (NULL == a2->session) ) + { + GNUNET_break (0); + return 0; + } + return (a1->session == a2->session) ? GNUNET_YES : GNUNET_NO; +} + + +/** * Try to create a connection to the given target (eventually). * * @param target peer to try to connect to @@ -1709,585 +1641,1234 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) { struct NeighbourMapEntry *n; - // This can happen during shutdown - if (neighbours == NULL) - { - return; - } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n", + if (NULL == neighbours) + return; /* during shutdown, do nothing */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Asked to connect to peer `%s'\n", GNUNET_i2s (target)); -#endif if (0 == memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity))) { - /* my own hello */ + /* refuse to connect to myself */ + /* FIXME: can this happen? Is this not an API violation? */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Refusing to try to connect to myself.\n"); return; } n = lookup_neighbour (target); - if (NULL != n) { - if ((S_CONNECTED == n->state) || (is_connecting (n))) - return; /* already connecting or connected */ - if (is_disconnecting (n)) - change_state (n, S_NOT_CONNECTED); + switch (n->state) + { + case S_NOT_CONNECTED: + /* this should not be possible */ + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + break; + case S_INIT_ATS: + case S_INIT_BLACKLIST: + case S_CONNECT_SENT: + case S_CONNECT_RECV_ATS: + case S_CONNECT_RECV_BLACKLIST: + case S_CONNECT_RECV_ACK: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring request to try to connect to `%s', already trying!\n", + GNUNET_i2s (target)); + return; /* already trying */ + case S_CONNECTED: + case S_RECONNECT_ATS: + case S_RECONNECT_BLACKLIST: + case S_RECONNECT_SENT: + case S_CONNECTED_SWITCHING_BLACKLIST: + case S_CONNECTED_SWITCHING_CONNECT_SENT: + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Ignoring request to try to connect, already connected to `%s'!\n", + GNUNET_i2s (target)); + return; /* already connected */ + case S_DISCONNECT: + /* get rid of remains, ready to re-try immediately */ + free_neighbour (n, GNUNET_NO); + break; + case S_DISCONNECT_FINISHED: + /* should not be possible */ + GNUNET_assert (0); + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + break; + } } + n = setup_neighbour (target); + n->state = S_INIT_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); - - if (n == NULL) - n = setup_neighbour (target); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Asking ATS for suggested address to connect to peer `%s'\n", - GNUNET_i2s (&n->id)); -#endif - - GNUNET_ATS_suggest_address (GST_ats, &n->id); + GNUNET_ATS_reset_backoff (GST_ats, target); + GNUNET_ATS_suggest_address (GST_ats, target); } + /** - * Test if we're connected to the given peer. + * Function called with the result of a blacklist check. * - * @param target peer to test - * @return GNUNET_YES if we are connected, GNUNET_NO if not + * @param cls closure with the 'struct BlackListCheckContext' + * @param peer peer this check affects + * @param result GNUNET_OK if the address is allowed */ -int -GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) +static void +handle_test_blacklist_cont (void *cls, + const struct GNUNET_PeerIdentity *peer, + int result) { + struct BlackListCheckContext *bcc = cls; struct NeighbourMapEntry *n; - // This can happen during shutdown - if (neighbours == NULL) + bcc->bc = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to new address of peer `%s' based on blacklist is `%s'\n", + GNUNET_i2s (peer), + (GNUNET_OK == result) ? "allowed" : "FORBIDDEN"); + if (GNUNET_OK == result) { - return GNUNET_NO; + /* valid new address, let ATS know! */ + GNUNET_ATS_address_update (GST_ats, + bcc->na.address, + bcc->na.session, + bcc->ats, bcc->ats_count); + } + if (NULL == (n = lookup_neighbour (peer))) + goto cleanup; /* nobody left to care about new address */ + switch (n->state) + { + case S_NOT_CONNECTED: + /* this should not be possible */ + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + break; + case S_INIT_ATS: + /* still waiting on ATS suggestion */ + break; + case S_INIT_BLACKLIST: + /* check if the address the blacklist was fine with matches + ATS suggestion, if so, we can move on! */ + if ( (GNUNET_OK == result) && + (1 == n->send_connect_ack) ) + { + n->send_connect_ack = 2; + send_session_connect_ack_message (bcc->na.address, + bcc->na.session, + n->connect_ack_timestamp); + } + if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address)) + break; /* result for an address we currently don't care about */ + if (GNUNET_OK == result) + { + n->timeout = GNUNET_TIME_relative_to_absolute (SETUP_CONNECTION_TIMEOUT); + n->state = S_CONNECT_SENT; + send_session_connect (&n->primary_address); + } + else + { + // FIXME: should also possibly destroy session with plugin!? + GNUNET_ATS_address_destroyed (GST_ats, + bcc->na.address, + NULL); + free_address (&n->primary_address); + n->state = S_INIT_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); + // FIXME: do we need to ask ATS again for suggestions? + GNUNET_ATS_suggest_address (GST_ats, &n->id); + } + break; + case S_CONNECT_SENT: + /* waiting on CONNECT_ACK, send ACK if one is pending */ + if ( (GNUNET_OK == result) && + (1 == n->send_connect_ack) ) + { + n->send_connect_ack = 2; + send_session_connect_ack_message (n->primary_address.address, + n->primary_address.session, + n->connect_ack_timestamp); + } + break; + case S_CONNECT_RECV_ATS: + /* still waiting on ATS suggestion, don't care about blacklist */ + break; + case S_CONNECT_RECV_BLACKLIST: + if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address)) + break; /* result for an address we currently don't care about */ + if (GNUNET_OK == result) + { + n->timeout = GNUNET_TIME_relative_to_absolute (SETUP_CONNECTION_TIMEOUT); + n->state = S_CONNECT_RECV_ACK; + send_session_connect_ack_message (bcc->na.address, + bcc->na.session, + n->connect_ack_timestamp); + if (1 == n->send_connect_ack) + n->send_connect_ack = 2; + } + else + { + // FIXME: should also possibly destroy session with plugin!? + GNUNET_ATS_address_destroyed (GST_ats, + bcc->na.address, + NULL); + free_address (&n->primary_address); + n->state = S_INIT_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); + // FIXME: do we need to ask ATS again for suggestions? + GNUNET_ATS_reset_backoff (GST_ats, peer); + GNUNET_ATS_suggest_address (GST_ats, &n->id); + } + break; + case S_CONNECT_RECV_ACK: + /* waiting on SESSION_ACK, send ACK if one is pending */ + if ( (GNUNET_OK == result) && + (1 == n->send_connect_ack) ) + { + n->send_connect_ack = 2; + send_session_connect_ack_message (n->primary_address.address, + n->primary_address.session, + n->connect_ack_timestamp); + } + break; + case S_CONNECTED: + /* already connected, don't care about blacklist */ + break; + case S_RECONNECT_ATS: + /* still waiting on ATS suggestion, don't care about blacklist */ + break; + case S_RECONNECT_BLACKLIST: + if ( (GNUNET_OK == result) && + (1 == n->send_connect_ack) ) + { + n->send_connect_ack = 2; + send_session_connect_ack_message (bcc->na.address, + bcc->na.session, + n->connect_ack_timestamp); + } + if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address)) + break; /* result for an address we currently don't care about */ + if (GNUNET_OK == result) + { + send_session_connect (&n->primary_address); + n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT); + n->state = S_RECONNECT_SENT; + } + else + { + GNUNET_ATS_address_destroyed (GST_ats, + bcc->na.address, + NULL); + n->state = S_RECONNECT_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); + // FIXME: do we need to ask ATS again for suggestions? + GNUNET_ATS_suggest_address (GST_ats, &n->id); + } + break; + case S_RECONNECT_SENT: + /* waiting on CONNECT_ACK, don't care about blacklist */ + if ( (GNUNET_OK == result) && + (1 == n->send_connect_ack) ) + { + n->send_connect_ack = 2; + send_session_connect_ack_message (n->primary_address.address, + n->primary_address.session, + n->connect_ack_timestamp); + } + break; + case S_CONNECTED_SWITCHING_BLACKLIST: + if (GNUNET_YES != address_matches (&bcc->na, &n->alternative_address)) + break; /* result for an address we currently don't care about */ + if (GNUNET_OK == result) + { + send_session_connect (&n->alternative_address); + n->state = S_CONNECTED_SWITCHING_CONNECT_SENT; + } + else + { + GNUNET_ATS_address_destroyed (GST_ats, + bcc->na.address, + NULL); + free_address (&n->alternative_address); + n->state = S_CONNECTED; + } + break; + case S_CONNECTED_SWITCHING_CONNECT_SENT: + /* waiting on CONNECT_ACK, don't care about blacklist */ + if ( (GNUNET_OK == result) && + (1 == n->send_connect_ack) ) + { + n->send_connect_ack = 2; + send_session_connect_ack_message (n->primary_address.address, + n->primary_address.session, + n->connect_ack_timestamp); + } + break; + case S_DISCONNECT: + /* Nothing to do here, ATS will already do what can be done */ + break; + case S_DISCONNECT_FINISHED: + /* should not be possible */ + GNUNET_assert (0); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + break; } + cleanup: + GNUNET_HELLO_address_free (bcc->na.address); + GNUNET_CONTAINER_DLL_remove (bc_head, + bc_tail, + bcc); + GNUNET_free (bcc); +} - n = lookup_neighbour (target); - if ((NULL == n) || (S_CONNECTED != n->state)) - return GNUNET_NO; /* not connected */ - return GNUNET_YES; +/** + * We want to know if connecting to a particular peer via + * a particular address is allowed. Check it! + * + * @param peer identity of the peer to switch the address for + * @param ts time at which the check was initiated + * @param address address of the other peer, NULL if other peer + * connected to us + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats (excluding 0-termination) + */ +static void +check_blacklist (const struct GNUNET_PeerIdentity *peer, + struct GNUNET_TIME_Absolute ts, + const struct GNUNET_HELLO_Address *address, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count) +{ + struct BlackListCheckContext *bcc; + struct GST_BlacklistCheck *bc; + + bcc = + GNUNET_malloc (sizeof (struct BlackListCheckContext) + + sizeof (struct GNUNET_ATS_Information) * ats_count); + bcc->ats_count = ats_count; + bcc->na.address = GNUNET_HELLO_address_copy (address); + bcc->na.session = session; + bcc->na.connect_timestamp = ts; + bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1]; + memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count); + GNUNET_CONTAINER_DLL_insert (bc_head, + bc_tail, + bcc); + if (NULL != (bc = GST_blacklist_test_allowed (peer, + address->transport_name, + &handle_test_blacklist_cont, bcc))) + bcc->bc = bc; + /* if NULL == bc, 'cont' was already called and 'bcc' already free'd, so + we must only store 'bc' if 'bc' is non-NULL... */ } + /** - * A session was terminated. Take note. + * We received a 'SESSION_CONNECT' message from the other peer. + * Consider switching to it. * - * @param peer identity of the peer where the session died - * @param session session that is gone + * @param message possibly a 'struct SessionConnectMessage' (check format) + * @param peer identity of the peer to switch the address for + * @param address address of the other peer, NULL if other peer + * connected to us + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats (excluding 0-termination) */ void -GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, - struct Session *session) +GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_HELLO_Address *address, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count) { + const struct SessionConnectMessage *scm; struct NeighbourMapEntry *n; + struct GNUNET_TIME_Absolute ts; - if (neighbours == NULL) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received CONNECT message from peer `%s'\n", + GNUNET_i2s (peer)); + if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) { - /* This can happen during shutdown */ + GNUNET_break_op (0); return; } - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n", - session, GNUNET_i2s (peer)); -#endif - + if (NULL == neighbours) + return; /* we're shutting down */ + scm = (const struct SessionConnectMessage *) message; + GNUNET_break_op (0 == ntohl (scm->reserved)); + ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); n = lookup_neighbour (peer); if (NULL == n) - return; - if (session != n->session) - return; /* doesn't affect us */ - if (n->state == S_CONNECTED) - { - if (n->address_state == USED) - { - GST_validation_set_address_use (n->address, n->session, GNUNET_NO); - GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO); - n->address_state = UNUSED; - } - } - - if (NULL != n->address) - { - GNUNET_HELLO_address_free (n->address); - n->address = NULL; - } - n->session = NULL; - - /* not connected anymore anyway, shouldn't matter */ - if (S_CONNECTED != n->state) - return; - - if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK) - { - GNUNET_SCHEDULER_cancel (n->keepalive_task); - n->keepalive_task = GNUNET_SCHEDULER_NO_TASK; - n->expect_latency_response = GNUNET_NO; + n = setup_neighbour (peer); + n->send_connect_ack = 1; + n->connect_ack_timestamp = ts; + switch (n->state) + { + case S_NOT_CONNECTED: + n->state = S_CONNECT_RECV_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); + GNUNET_ATS_reset_backoff (GST_ats, peer); + GNUNET_ATS_suggest_address (GST_ats, peer); + check_blacklist (peer, ts, address, session, ats, ats_count); + break; + case S_INIT_ATS: + case S_INIT_BLACKLIST: + case S_CONNECT_SENT: + case S_CONNECT_RECV_ATS: + case S_CONNECT_RECV_BLACKLIST: + case S_CONNECT_RECV_ACK: + /* It can never hurt to have an alternative address in the above cases, + see if it is allowed */ + check_blacklist (peer, ts, address, session, ats, ats_count); + break; + case S_CONNECTED: + /* we are already connected and can thus send the ACK immediately; + still, it can never hurt to have an alternative address, so also + tell ATS about it */ + GNUNET_assert (NULL != n->primary_address.address); + GNUNET_assert (NULL != n->primary_address.session); + n->send_connect_ack = 0; + send_session_connect_ack_message (n->primary_address.address, + n->primary_address.session, ts); + check_blacklist (peer, ts, address, session, ats, ats_count); + break; + case S_RECONNECT_ATS: + case S_RECONNECT_BLACKLIST: + case S_RECONNECT_SENT: + /* It can never hurt to have an alternative address in the above cases, + see if it is allowed */ + check_blacklist (peer, ts, address, session, ats, ats_count); + break; + case S_CONNECTED_SWITCHING_BLACKLIST: + case S_CONNECTED_SWITCHING_CONNECT_SENT: + /* we are already connected and can thus send the ACK immediately; + still, it can never hurt to have an alternative address, so also + tell ATS about it */ + GNUNET_assert (NULL != n->primary_address.address); + GNUNET_assert (NULL != n->primary_address.session); + n->send_connect_ack = 0; + send_session_connect_ack_message (n->primary_address.address, + n->primary_address.session, ts); + check_blacklist (peer, ts, address, session, ats, ats_count); + break; + case S_DISCONNECT: + /* get rid of remains without terminating sessions, ready to re-try */ + free_neighbour (n, GNUNET_YES); + n = setup_neighbour (peer); + n->state = S_CONNECT_RECV_ATS; + GNUNET_ATS_reset_backoff (GST_ats, peer); + GNUNET_ATS_suggest_address (GST_ats, peer); + break; + case S_DISCONNECT_FINISHED: + /* should not be possible */ + GNUNET_assert (0); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + break; } - - /* connected, try fast reconnect */ - /* statistics "transport" : "# peers connected" -= 1 - * neighbours_connected -= 1 - * BUT: no disconnect_cb to notify clients about disconnect - */ - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\n", - GNUNET_i2s (peer)); - - GNUNET_assert (neighbours_connected > 0); - change_state (n, S_FAST_RECONNECT); - neighbours_connected--; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), -1, - GNUNET_NO); - - - /* We are connected, so ask ATS to switch addresses */ - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNECT_SESSION_TIMEOUT, - &neighbour_timeout_task, n); - /* try QUICKLY to re-establish a connection, reduce timeout! */ - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, - &ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, peer); } /** - * Transmit a message to the given target using the active connection. + * For an existing neighbour record, set the active connection to + * use the given address. * - * @param target destination - * @param msg message to send - * @param msg_size number of bytes in msg - * @param timeout when to fail with timeout - * @param cont function to call when done - * @param cont_cls closure for 'cont' + * @param peer identity of the peer to switch the address for + * @param address address of the other peer, NULL if other peer + * connected to us + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats + * @param bandwidth_in inbound quota to be used when connection is up + * @param bandwidth_out outbound quota to be used when connection is up */ void -GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, - size_t msg_size, struct GNUNET_TIME_Relative timeout, - GST_NeighbourSendContinuation cont, void *cont_cls) +GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_HELLO_Address *address, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count, + struct GNUNET_BANDWIDTH_Value32NBO + bandwidth_in, + struct GNUNET_BANDWIDTH_Value32NBO + bandwidth_out) { struct NeighbourMapEntry *n; - struct MessageQueue *mq; + struct GNUNET_TRANSPORT_PluginFunctions *papi; + + GNUNET_assert (address->transport_name != NULL); + if (NULL == (n = lookup_neighbour (peer))) + return; - // This can happen during shutdown - if (neighbours == NULL) + /* Obtain an session for this address from plugin */ + if (NULL == (papi = GST_plugins_find (address->transport_name))) { + /* we don't have the plugin for this address */ + GNUNET_ATS_address_destroyed (GST_ats, address, NULL); return; } - - n = lookup_neighbour (target); - if ((n == NULL) || (!is_connected (n))) + if ((NULL == session) && (0 == address->address_length)) { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# messages not sent (no such peer or not connected)"), - 1, GNUNET_NO); -#if DEBUG_TRANSPORT - if (n == NULL) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not send message to peer `%s': unknown neighbour", - GNUNET_i2s (target)); - else if (!is_connected (n)) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not send message to peer `%s': not connected\n", - GNUNET_i2s (target)); -#endif - if (NULL != cont) - cont (cont_cls, GNUNET_SYSERR); + GNUNET_break (0); + if (strlen (address->transport_name) > 0) + GNUNET_ATS_address_destroyed (GST_ats, address, session); return; } - - if ((n->session == NULL) && (n->address == NULL)) + if (NULL == session) + session = papi->get_session (papi->cls, address); + if (NULL == session) { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# messages not sent (no such peer or not connected)"), - 1, GNUNET_NO); -#if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Could not send message to peer `%s': no address available\n", - GNUNET_i2s (target)); -#endif - - if (NULL != cont) - cont (cont_cls, GNUNET_SYSERR); + "Failed to obtain new session for peer `%s' and address '%s'\n", + GNUNET_i2s (&address->peer), GST_plugins_a2s (address)); + GNUNET_ATS_address_destroyed (GST_ats, address, NULL); return; } - - GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader)); - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# bytes in message queue for other peers"), - msg_size, GNUNET_NO); - mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size); - mq->cont = cont; - mq->cont_cls = cont_cls; - /* FIXME: this memcpy can be up to 7% of our total runtime! */ - memcpy (&mq[1], msg, msg_size); - mq->message_buf = (const char *) &mq[1]; - mq->message_buf_size = msg_size; - mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); - GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq); - - if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) && - (NULL == n->is_active)) - n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ATS tells us to switch to address '%s' for peer `%s'\n", + (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>", + GNUNET_i2s (peer)); + switch (n->state) + { + case S_NOT_CONNECTED: + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + return; + case S_INIT_ATS: + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->state = S_INIT_BLACKLIST; + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; + case S_INIT_BLACKLIST: + /* ATS suggests a different address, switch again */ + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; + case S_CONNECT_SENT: + /* ATS suggests a different address, switch again */ + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->state = S_INIT_BLACKLIST; + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; + case S_CONNECT_RECV_ATS: + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->state = S_CONNECT_RECV_BLACKLIST; + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; + case S_CONNECT_RECV_BLACKLIST: + case S_CONNECT_RECV_ACK: + /* ATS asks us to switch while we were trying to connect; switch to new + address and check blacklist again */ + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; + case S_CONNECTED: + GNUNET_assert (NULL != n->primary_address.address); + GNUNET_assert (NULL != n->primary_address.session); + if (n->primary_address.session == session) + { + /* not an address change, just a quota change */ + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_YES); + break; + } + /* ATS asks us to switch a life connection; see if we can get + a CONNECT_ACK on it before we actually do this! */ + set_address (&n->alternative_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->state = S_CONNECTED_SWITCHING_BLACKLIST; + check_blacklist (&n->id, + GNUNET_TIME_absolute_get (), + address, session, ats, ats_count); + break; + case S_RECONNECT_ATS: + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->state = S_RECONNECT_BLACKLIST; + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; + case S_RECONNECT_BLACKLIST: + /* ATS asks us to switch while we were trying to reconnect; switch to new + address and check blacklist again */ + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; + case S_RECONNECT_SENT: + /* ATS asks us to switch while we were trying to reconnect; switch to new + address and check blacklist again */ + set_address (&n->primary_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->state = S_RECONNECT_BLACKLIST; + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; + case S_CONNECTED_SWITCHING_BLACKLIST: + if (n->primary_address.session == session) + { + /* ATS switches back to still-active session */ + free_address (&n->alternative_address); + n->state = S_CONNECTED; + break; + } + /* ATS asks us to switch a life connection, update blacklist check */ + set_address (&n->alternative_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + check_blacklist (&n->id, + GNUNET_TIME_absolute_get (), + address, session, ats, ats_count); + break; + case S_CONNECTED_SWITCHING_CONNECT_SENT: + if (n->primary_address.session == session) + { + /* ATS switches back to still-active session */ + free_address (&n->alternative_address); + n->state = S_CONNECTED; + break; + } + /* ATS asks us to switch a life connection, update blacklist check */ + set_address (&n->alternative_address, + address, session, bandwidth_in, bandwidth_out, GNUNET_NO); + n->state = S_CONNECTED_SWITCHING_BLACKLIST; + check_blacklist (&n->id, + GNUNET_TIME_absolute_get (), + address, session, ats, ats_count); + break; + case S_DISCONNECT: + /* not going to switch addresses while disconnecting */ + return; + case S_DISCONNECT_FINISHED: + GNUNET_assert (0); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); + break; + } } /** - * We have received a message from the given sender. How long should - * we delay before receiving more? (Also used to keep the peer marked - * as live). + * Master task run for every neighbour. Performs all of the time-related + * activities (keep alive, send next message, disconnect if idle, finish + * clean up after disconnect). * - * @param sender sender of the message - * @param size size of the message - * @param do_forward set to GNUNET_YES if the message should be forwarded to clients - * GNUNET_NO if the neighbour is not connected or violates the quota, - * GNUNET_SYSERR if the connection is not fully up yet - * @return how long to wait before reading more from this sender + * @param cls the 'struct NeighbourMapEntry' for which we are running + * @param tc scheduler context (unused) */ -struct GNUNET_TIME_Relative -GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity - *sender, ssize_t size, int *do_forward) +static void +master_task (void *cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) { - struct NeighbourMapEntry *n; - struct GNUNET_TIME_Relative ret; - - // This can happen during shutdown - if (neighbours == NULL) - { - return GNUNET_TIME_UNIT_FOREVER_REL; - } + struct NeighbourMapEntry *n = cls; + struct GNUNET_TIME_Relative delay; - n = lookup_neighbour (sender); - if (n == NULL) + n->task = GNUNET_SCHEDULER_NO_TASK; + delay = GNUNET_TIME_absolute_get_remaining (n->timeout); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "master task runs for neighbour `%s' in state %d with timeout in %llu ms\n", + GNUNET_i2s (&n->id), + n->state, + (unsigned long long) delay.rel_value); + switch (n->state) { - GST_neighbours_try_connect (sender); - n = lookup_neighbour (sender); - if (NULL == n) + case S_NOT_CONNECTED: + /* invalid state for master task, clean up */ + GNUNET_break (0); + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; + case S_INIT_ATS: + if (0 == delay.rel_value) { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# messages discarded due to lack of neighbour record"), - 1, GNUNET_NO); - *do_forward = GNUNET_NO; - return GNUNET_TIME_UNIT_ZERO; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out waiting for ATS to provide address\n", + GNUNET_i2s (&n->id)); + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; } - } - if (!is_connected (n)) - { - *do_forward = GNUNET_SYSERR; - return GNUNET_TIME_UNIT_ZERO; - } - if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size)) - { - n->quota_violation_count++; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Bandwidth quota (%u b/s) violation detected (total of %u).\n", - n->in_tracker.available_bytes_per_s__, - n->quota_violation_count); -#endif - /* Discount 32k per violation */ - GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024); - } - else - { - if (n->quota_violation_count > 0) + break; + case S_INIT_BLACKLIST: + if (0 == delay.rel_value) { - /* try to add 32k back */ - GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024); - n->quota_violation_count--; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out waiting for BLACKLIST to approve address\n", + GNUNET_i2s (&n->id)); + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; } - } - if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# bandwidth quota violations by other peers"), - 1, GNUNET_NO); - *do_forward = GNUNET_NO; - return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT; - } - *do_forward = GNUNET_YES; - ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024); - if (ret.rel_value > 0) - { -#if DEBUG_TRANSPORT + break; + case S_CONNECT_SENT: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out waiting for other peer to send CONNECT_ACK\n", + GNUNET_i2s (&n->id)); + disconnect_neighbour (n); + return; + } + break; + case S_CONNECT_RECV_ATS: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out waiting ATS to provide address to use for CONNECT_ACK\n", + GNUNET_i2s (&n->id)); + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; + } + break; + case S_CONNECT_RECV_BLACKLIST: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out waiting BLACKLIST to approve address to use for CONNECT_ACK\n", + GNUNET_i2s (&n->id)); + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; + } + break; + case S_CONNECT_RECV_ACK: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out waiting for other peer to send SESSION_ACK\n", + GNUNET_i2s (&n->id)); + disconnect_neighbour (n); + return; + } + break; + case S_CONNECTED: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs\n", + GNUNET_i2s (&n->id)); + disconnect_neighbour (n); + return; + } + try_transmission_to_peer (n); + send_keepalive (n); + break; + case S_RECONNECT_ATS: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out, waiting for ATS replacement address\n", + GNUNET_i2s (&n->id)); + disconnect_neighbour (n); + return; + } + break; + case S_RECONNECT_BLACKLIST: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out, waiting for BLACKLIST to approve replacement address\n", + GNUNET_i2s (&n->id)); + disconnect_neighbour (n); + return; + } + break; + case S_RECONNECT_SENT: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out, waiting for other peer to CONNECT_ACK replacement address\n", + GNUNET_i2s (&n->id)); + disconnect_neighbour (n); + return; + } + break; + case S_CONNECTED_SWITCHING_BLACKLIST: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs\n", + GNUNET_i2s (&n->id)); + disconnect_neighbour (n); + return; + } + try_transmission_to_peer (n); + send_keepalive (n); + break; + case S_CONNECTED_SWITCHING_CONNECT_SENT: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs (after trying to CONNECT on alternative address)\n", + GNUNET_i2s (&n->id)); + disconnect_neighbour (n); + return; + } + try_transmission_to_peer (n); + send_keepalive (n); + break; + case S_DISCONNECT: GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Throttling read (%llu bytes excess at %u b/s), waiting %llu ms before reading more.\n", - (unsigned long long) n->in_tracker. - consumption_since_last_update__, - (unsigned int) n->in_tracker.available_bytes_per_s__, - (unsigned long long) ret.rel_value); -#endif - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# ms throttling suggested"), - (int64_t) ret.rel_value, GNUNET_NO); - } - return ret; + "Cleaning up connection to `%s' after sending DISCONNECT\n", + GNUNET_i2s (&n->id)); + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; + case S_DISCONNECT_FINISHED: + /* how did we get here!? */ + GNUNET_assert (0); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); + break; + } + if ( (S_CONNECTED_SWITCHING_CONNECT_SENT == n->state) || + (S_CONNECTED_SWITCHING_BLACKLIST == n->state) || + (S_CONNECTED == n->state) ) + { + /* if we are *now* in one of these three states, we're sending + keep alive messages, so we need to consider the keepalive + delay, not just the connection timeout */ + delay = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time), + delay); + } + if (GNUNET_SCHEDULER_NO_TASK == n->task) + n->task = GNUNET_SCHEDULER_add_delayed (delay, + &master_task, + n); } /** - * Keep the connection to the given neighbour alive longer, - * we received a KEEPALIVE (or equivalent). + * Send a SESSION_ACK message to the neighbour to confirm that we + * got his CONNECT_ACK. * - * @param neighbour neighbour to keep alive + * @param n neighbour to send the SESSION_ACK to */ -void -GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour) +static void +send_session_ack_message (struct NeighbourMapEntry *n) { - struct NeighbourMapEntry *n; - - // This can happen during shutdown - if (neighbours == NULL) - { - return; - } - - n = lookup_neighbour (neighbour); - if (NULL == n) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# KEEPALIVE messages discarded (not connected)"), - 1, GNUNET_NO); - return; - } - GNUNET_SCHEDULER_cancel (n->timeout_task); - n->timeout_task = - GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT, - &neighbour_timeout_task, n); - - /* send reply to measure latency */ - if (S_CONNECTED != n->state) - return; - - struct GNUNET_MessageHeader m; - - m.size = htons (sizeof (struct GNUNET_MessageHeader)); - m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE); + struct GNUNET_MessageHeader msg; - send_with_session(n, - (const void *) &m, sizeof (m), - UINT32_MAX, - GNUNET_TIME_UNIT_FOREVER_REL, - NULL, NULL); + msg.size = htons (sizeof (struct GNUNET_MessageHeader)); + msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); + (void) send_with_session(n, + (const char *) &msg, sizeof (struct GNUNET_MessageHeader), + UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, + NULL, NULL); } + /** - * We received a KEEP_ALIVE_RESPONSE message and use this to calculate latency - * to this peer + * We received a 'SESSION_CONNECT_ACK' message from the other peer. + * Consider switching to it. * - * @param neighbour neighbour to keep alive + * @param message possibly a 'struct SessionConnectMessage' (check format) + * @param peer identity of the peer to switch the address for + * @param address address of the other peer, NULL if other peer + * connected to us + * @param session session to use (or NULL) * @param ats performance data * @param ats_count number of entries in ats */ void -GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour, +GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_HELLO_Address *address, + struct Session *session, const struct GNUNET_ATS_Information *ats, uint32_t ats_count) { + const struct SessionConnectMessage *scm; + struct GNUNET_TIME_Absolute ts; struct NeighbourMapEntry *n; - struct GNUNET_ATS_Information *ats_new; - uint32_t latency; - if (neighbours == NULL) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received CONNECT_ACK message from peer `%s'\n", + GNUNET_i2s (peer)); + if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) { - // This can happen during shutdown + GNUNET_break_op (0); return; } - - n = lookup_neighbour (neighbour); - if ((NULL == n) || (n->state != S_CONNECTED)) + scm = (const struct SessionConnectMessage *) message; + GNUNET_break_op (ntohl (scm->reserved) == 0); + if (NULL == (n = lookup_neighbour (peer))) { GNUNET_STATISTICS_update (GST_stats, gettext_noop - ("# KEEPALIVE_RESPONSE messages discarded (not connected)"), + ("# unexpected CONNECT_ACK messages (no peer)"), 1, GNUNET_NO); return; } - if (n->expect_latency_response != GNUNET_YES) + ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); + switch (n->state) { + case S_NOT_CONNECTED: + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + return; + case S_INIT_ATS: + case S_INIT_BLACKLIST: GNUNET_STATISTICS_update (GST_stats, gettext_noop - ("# KEEPALIVE_RESPONSE messages discarded (not expected)"), + ("# unexpected CONNECT_ACK messages (not ready)"), 1, GNUNET_NO); - return; + break; + case S_CONNECT_SENT: + if (ts.abs_value != n->primary_address.connect_timestamp.abs_value) + break; /* ACK does not match our original CONNECT message */ + n->state = S_CONNECTED; + n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_STATISTICS_set (GST_stats, + gettext_noop ("# peers connected"), + ++neighbours_connected, + GNUNET_NO); + connect_notify_cb (callback_cls, &n->id, ats, ats_count); + set_address (&n->primary_address, + n->primary_address.address, + n->primary_address.session, + n->primary_address.bandwidth_in, + n->primary_address.bandwidth_out, + GNUNET_YES); + send_session_ack_message (n); + break; + case S_CONNECT_RECV_ATS: + case S_CONNECT_RECV_BLACKLIST: + case S_CONNECT_RECV_ACK: + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# unexpected CONNECT_ACK messages (not ready)"), + 1, GNUNET_NO); + break; + case S_CONNECTED: + /* duplicate CONNECT_ACK, let's answer by duplciate SESSION_ACK just in case */ + send_session_ack_message (n); + break; + case S_RECONNECT_ATS: + case S_RECONNECT_BLACKLIST: + /* we didn't expect any CONNECT_ACK, as we are waiting for ATS + to give us a new address... */ + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# unexpected CONNECT_ACK messages (waiting on ATS)"), + 1, GNUNET_NO); + break; + case S_RECONNECT_SENT: + /* new address worked; go back to connected! */ + n->state = S_CONNECTED; + send_session_ack_message (n); + break; + case S_CONNECTED_SWITCHING_BLACKLIST: + /* duplicate CONNECT_ACK, let's answer by duplciate SESSION_ACK just in case */ + send_session_ack_message (n); + break; + case S_CONNECTED_SWITCHING_CONNECT_SENT: + /* new address worked; adopt it and go back to connected! */ + n->state = S_CONNECTED; + n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_break (GNUNET_NO == n->alternative_address.ats_active); + set_address (&n->primary_address, + n->alternative_address.address, + n->alternative_address.session, + n->alternative_address.bandwidth_in, + n->alternative_address.bandwidth_out, + GNUNET_YES); + free_address (&n->alternative_address); + send_session_ack_message (n); + break; + case S_DISCONNECT: + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# unexpected CONNECT_ACK messages (disconnecting)"), + 1, GNUNET_NO); + break; + case S_DISCONNECT_FINISHED: + GNUNET_assert (0); + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); + break; } - n->expect_latency_response = GNUNET_NO; +} - GNUNET_assert (n->keep_alive_sent.abs_value != - GNUNET_TIME_absolute_get_zero ().abs_value); - n->latency = - GNUNET_TIME_absolute_get_difference (n->keep_alive_sent, - GNUNET_TIME_absolute_get ()); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n", - GNUNET_i2s (&n->id), n->latency.rel_value); -#endif +/** + * A session was terminated. Take note; if needed, try to get + * an alternative address from ATS. + * + * @param peer identity of the peer where the session died + * @param session session that is gone + */ +void +GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, + struct Session *session) +{ + struct NeighbourMapEntry *n; + struct BlackListCheckContext *bcc; + struct BlackListCheckContext *bcc_next; - if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value) + /* make sure to cancel all ongoing blacklist checks involving 'session' */ + bcc_next = bc_head; + while (NULL != (bcc = bcc_next)) { - GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_count); + bcc_next = bcc->next; + if (bcc->na.session == session) + { + GST_blacklist_test_cancel (bcc->bc); + GNUNET_HELLO_address_free (bcc->na.address); + GNUNET_CONTAINER_DLL_remove (bc_head, + bc_tail, + bcc); + GNUNET_free (bcc); + } } - else + if (NULL == (n = lookup_neighbour (peer))) + return; /* can't affect us */ + if (session != n->primary_address.session) { - ats_new = - GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) * - (ats_count + 1)); - memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count); - - /* add latency */ - ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); - if (n->latency.rel_value > UINT32_MAX) - latency = UINT32_MAX; - else - latency = n->latency.rel_value; - ats_new[ats_count].value = htonl (latency); + if (session == n->alternative_address.session) + { + free_address (&n->alternative_address); + if ( (S_CONNECTED_SWITCHING_BLACKLIST == n->state) || + (S_CONNECTED_SWITCHING_CONNECT_SENT == n->state) ) + n->state = S_CONNECTED; + else + GNUNET_break (0); + } + return; /* doesn't affect us further */ + } - GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new, - ats_count + 1); - GNUNET_free (ats_new); + n->expect_latency_response = GNUNET_NO; + + switch (n->state) + { + case S_NOT_CONNECTED: + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + return; + case S_INIT_ATS: + GNUNET_break (0); + free_neighbour (n, GNUNET_NO); + return; + case S_INIT_BLACKLIST: + case S_CONNECT_SENT: + free_address (&n->primary_address); + n->state = S_INIT_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); + // FIXME: need to ask ATS for suggestions again? + GNUNET_ATS_suggest_address (GST_ats, &n->id); + break; + case S_CONNECT_RECV_ATS: + case S_CONNECT_RECV_BLACKLIST: + case S_CONNECT_RECV_ACK: + /* error on inbound session; free neighbour entirely */ + free_address (&n->primary_address); + free_neighbour (n, GNUNET_NO); + return; + case S_CONNECTED: + free_address (&n->primary_address); + n->state = S_RECONNECT_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); + /* FIXME: is this ATS call needed? */ + GNUNET_ATS_suggest_address (GST_ats, &n->id); + break; + case S_RECONNECT_ATS: + /* we don't have an address, how can it go down? */ + GNUNET_break (0); + break; + case S_RECONNECT_BLACKLIST: + case S_RECONNECT_SENT: + n->state = S_RECONNECT_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); + // FIXME: need to ask ATS for suggestions again? + GNUNET_ATS_suggest_address (GST_ats, &n->id); + break; + case S_CONNECTED_SWITCHING_BLACKLIST: + /* primary went down while we were checking secondary against + blacklist, adopt secondary as primary */ + free_address (&n->primary_address); + n->primary_address = n->alternative_address; + memset (&n->alternative_address, 0, sizeof (struct NeighbourAddress)); + n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT); + n->state = S_RECONNECT_BLACKLIST; + break; + case S_CONNECTED_SWITCHING_CONNECT_SENT: + /* primary went down while we were waiting for CONNECT_ACK on secondary; + secondary as primary */ + free_address (&n->primary_address); + n->primary_address = n->alternative_address; + memset (&n->alternative_address, 0, sizeof (struct NeighbourAddress)); + n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT); + n->state = S_RECONNECT_SENT; + break; + case S_DISCONNECT: + free_address (&n->primary_address); + break; + case S_DISCONNECT_FINISHED: + /* neighbour was freed and plugins told to terminate session */ + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); + GNUNET_break (0); + break; } + if (GNUNET_SCHEDULER_NO_TASK != n->task) + GNUNET_SCHEDULER_cancel (n->task); + n->task = GNUNET_SCHEDULER_add_now (&master_task, n); } /** - * Change the incoming quota for the given peer. + * We received a 'SESSION_ACK' message from the other peer. + * If we sent a 'CONNECT_ACK' last, this means we are now + * connected. Otherwise, do nothing. * - * @param neighbour identity of peer to change qutoa for - * @param quota new quota + * @param message possibly a 'struct SessionConnectMessage' (check format) + * @param peer identity of the peer to switch the address for + * @param address address of the other peer, NULL if other peer + * connected to us + * @param session session to use (or NULL) + * @param ats performance data + * @param ats_count number of entries in ats */ void -GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour, - struct GNUNET_BANDWIDTH_Value32NBO quota) +GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message, + const struct GNUNET_PeerIdentity *peer, + const struct GNUNET_HELLO_Address *address, + struct Session *session, + const struct GNUNET_ATS_Information *ats, + uint32_t ats_count) { struct NeighbourMapEntry *n; - // This can happen during shutdown - if (neighbours == NULL) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received SESSION_ACK message from peer `%s'\n", + GNUNET_i2s (peer)); + if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader)) { + GNUNET_break_op (0); return; } - - n = lookup_neighbour (neighbour); - if (n == NULL) + if (NULL == (n = lookup_neighbour (peer))) + return; + /* check if we are in a plausible state for having sent + a CONNECT_ACK. If not, return, otherwise break */ + if ( ( (S_CONNECT_RECV_ACK != n->state) && + (S_CONNECT_SENT != n->state) ) || + (2 != n->send_connect_ack) ) { GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# SET QUOTA messages ignored (no such peer)"), - 1, GNUNET_NO); + gettext_noop ("# unexpected SESSION ACK messages"), 1, + GNUNET_NO); return; } -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Setting inbound quota of %u Bps for peer `%s' to all clients\n", - ntohl (quota.value__), GNUNET_i2s (&n->id)); -#endif - GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota); - if (0 != ntohl (quota.value__)) - return; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n", - GNUNET_i2s (&n->id), "SET_QUOTA"); -#endif - if (is_connected (n)) - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# disconnects due to quota of 0"), - 1, GNUNET_NO); - disconnect_neighbour (n); + n->state = S_CONNECTED; + n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); + GNUNET_STATISTICS_set (GST_stats, + gettext_noop ("# peers connected"), + ++neighbours_connected, + GNUNET_NO); + connect_notify_cb (callback_cls, &n->id, ats, ats_count); + set_address (&n->primary_address, + n->primary_address.address, + n->primary_address.session, + n->primary_address.bandwidth_in, + n->primary_address.bandwidth_out, + GNUNET_YES); } /** - * Closure for the neighbours_iterate function. - */ -struct IteratorContext -{ - /** - * Function to call on each connected neighbour. - */ - GST_NeighbourIterator cb; - - /** - * Closure for 'cb'. - */ - void *cb_cls; -}; - - -/** - * Call the callback from the closure for each connected neighbour. + * Test if we're connected to the given peer. * - * @param cls the 'struct IteratorContext' - * @param key the hash of the public key of the neighbour - * @param value the 'struct NeighbourMapEntry' - * @return GNUNET_OK (continue to iterate) + * @param target peer to test + * @return GNUNET_YES if we are connected, GNUNET_NO if not */ -static int -neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value) +int +GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) { - struct IteratorContext *ic = cls; - struct NeighbourMapEntry *n = value; - - if (!is_connected (n)) - return GNUNET_OK; - - ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address); - return GNUNET_OK; + return test_connected (lookup_neighbour (target)); } /** - * Iterate over all connected neighbours. - * - * @param cb function to call - * @param cb_cls closure for cb - */ -void -GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls) -{ - struct IteratorContext ic; - - // This can happen during shutdown - if (neighbours == NULL) - { - return; - } - - ic.cb = cb; - ic.cb_cls = cb_cls; - GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic); -} - -/** - * If we have an active connection to the given target, it must be shutdown. + * Change the incoming quota for the given peer. * - * @param target peer to disconnect from + * @param neighbour identity of peer to change qutoa for + * @param quota new quota */ void -GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) +GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighbour, + struct GNUNET_BANDWIDTH_Value32NBO quota) { struct NeighbourMapEntry *n; - // This can happen during shutdown - if (neighbours == NULL) + if (NULL == (n = lookup_neighbour (neighbour))) { + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# SET QUOTA messages ignored (no such peer)"), + 1, GNUNET_NO); return; } - - n = lookup_neighbour (target); - if (NULL == n) - return; /* not active */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Setting inbound quota of %u Bps for peer `%s' to all clients\n", + ntohl (quota.value__), GNUNET_i2s (&n->id)); + GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota); + if (0 != ntohl (quota.value__)) + return; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s'\n", + GNUNET_i2s (&n->id), "SET_QUOTA"); + if (GNUNET_YES == test_connected (n)) + GNUNET_STATISTICS_update (GST_stats, + gettext_noop ("# disconnects due to quota of 0"), + 1, GNUNET_NO); disconnect_neighbour (n); } @@ -2309,12 +2890,9 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity const struct SessionDisconnectMessage *sdm; GNUNET_HashCode hc; -#if DEBUG_TRANSPORT GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received DISCONNECT message from peer `%s'\n", GNUNET_i2s (peer)); -#endif - if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage)) { // GNUNET_break_op (0); @@ -2325,11 +2903,9 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity return; } sdm = (const struct SessionDisconnectMessage *) msg; - n = lookup_neighbour (peer); - if (NULL == n) + if (NULL == (n = lookup_neighbour (peer))) return; /* gone already */ - if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <= - n->connect_ts.abs_value) + if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <= n->connect_ack_timestamp.abs_value) { GNUNET_STATISTICS_update (GST_stats, gettext_noop @@ -2361,360 +2937,210 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity GNUNET_break_op (0); return; } - GST_neighbours_force_disconnect (peer); + if (GNUNET_YES == test_connected (n)) + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# other peer asked to disconnect from us"), 1, + GNUNET_NO); + disconnect_neighbour (n); } /** - * We received a 'SESSION_CONNECT_ACK' message from the other peer. - * Consider switching to it. - * - * @param message possibly a 'struct SessionConnectMessage' (check format) - * @param peer identity of the peer to switch the address for - * @param address address of the other peer, NULL if other peer - * connected to us - * @param session session to use (or NULL) - * @param ats performance data - * @param ats_count number of entries in ats + * Closure for the neighbours_iterate function. */ -void -GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Address *address, - struct Session *session, - const struct GNUNET_ATS_Information *ats, - uint32_t ats_count) +struct IteratorContext { - const struct SessionConnectMessage *scm; - struct GNUNET_MessageHeader msg; - struct NeighbourMapEntry *n; - size_t msg_len; - size_t ret; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received CONNECT_ACK message from peer `%s'\n", - GNUNET_i2s (peer)); - - if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) - { - GNUNET_break_op (0); - return; - } - scm = (const struct SessionConnectMessage *) message; - GNUNET_break_op (ntohl (scm->reserved) == 0); - n = lookup_neighbour (peer); - if (NULL == n) - { - /* we did not send 'CONNECT' -- at least not recently */ - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# unexpected CONNECT_ACK messages (no peer)"), - 1, GNUNET_NO); - return; - } - - /* Additional check - * - * ((n->state != S_CONNECT_RECV) && (n->address != NULL)): - * - * We also received an CONNECT message, switched from SENDT to RECV and - * ATS already suggested us an address after a successful blacklist check + /** + * Function to call on each connected neighbour. */ + GST_NeighbourIterator cb; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received CONNECT_ACK message from peer `%s' in state `%s'\n", - GNUNET_i2s (peer), - print_state(n->state)); - - if ((n->address != NULL) && (n->state == S_CONNECTED)) - { - /* After fast reconnect: send ACK (ACK) even when we are connected */ - msg_len = sizeof (msg); - msg.size = htons (msg_len); - msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); - - ret = send_with_session(n, - (const char *) &msg, msg_len, - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, - NULL, NULL); - - if (ret == GNUNET_SYSERR) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session); - return; - } - - if ((n->state != S_CONNECT_SENT) && - ((n->state != S_CONNECT_RECV) && (n->address != NULL))) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop - ("# unexpected CONNECT_ACK messages"), 1, - GNUNET_NO); - return; - } - if (n->state != S_CONNECTED) - change_state (n, S_CONNECTED); - - if (NULL != session) - { - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "transport-ats", - "Giving ATS session %p of plugin %s for peer %s\n", - session, address->transport_name, GNUNET_i2s (peer)); - } - GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count); - GNUNET_assert (NULL != n->address); + /** + * Closure for 'cb'. + */ + void *cb_cls; +}; - if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address))) - { - GST_validation_set_address_use (n->address, n->session, GNUNET_YES); - GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES); - n->address_state = USED; - } - GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); +/** + * Call the callback from the closure for each connected neighbour. + * + * @param cls the 'struct IteratorContext' + * @param key the hash of the public key of the neighbour + * @param value the 'struct NeighbourMapEntry' + * @return GNUNET_OK (continue to iterate) + */ +static int +neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct IteratorContext *ic = cls; + struct NeighbourMapEntry *n = value; - /* send ACK (ACK) */ - msg_len = sizeof (msg); - msg.size = htons (msg_len); - msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); + if (GNUNET_YES == test_connected (n)) + ic->cb (ic->cb_cls, &n->id, NULL, 0, n->primary_address.address); + return GNUNET_OK; +} - ret = send_with_session(n, - (const char *) &msg, msg_len, - UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, - NULL, NULL); - if (ret == GNUNET_SYSERR) - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Failed to send SESSION_ACK to `%4s' using address '%s' session %X\n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session); +/** + * Iterate over all connected neighbours. + * + * @param cb function to call + * @param cb_cls closure for cb + */ +void +GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls) +{ + struct IteratorContext ic; + if (NULL == neighbours) + return; /* can happen during shutdown */ + ic.cb = cb; + ic.cb_cls = cb_cls; + GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, &ic); +} - if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK) - n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n); - neighbours_connected++; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, - GNUNET_NO); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Notify about connect of `%4s' using address '%s' session %X LINE %u\n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session, - __LINE__); -#endif - connect_notify_cb (callback_cls, &n->id, ats, ats_count); - send_outbound_quota (peer, n->bandwidth_out); +/** + * If we have an active connection to the given target, it must be shutdown. + * + * @param target peer to disconnect from + */ +void +GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) +{ + struct NeighbourMapEntry *n; + if (NULL == (n = lookup_neighbour (target))) + return; /* not active */ + if (GNUNET_YES == test_connected (n)) + GNUNET_STATISTICS_update (GST_stats, + gettext_noop + ("# disconnected from peer upon explicit request"), 1, + GNUNET_NO); + disconnect_neighbour (n); } -void -GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Address *address, - struct Session *session, - const struct GNUNET_ATS_Information *ats, - uint32_t ats_count) +/** + * Obtain current latency information for the given neighbour. + * + * @param peer to get the latency for + * @return observed latency of the address, FOREVER if the + * the connection is not up + */ +struct GNUNET_TIME_Relative +GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer) { struct NeighbourMapEntry *n; -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s'\n", - GNUNET_i2s (peer)); -#endif - - if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader)) - { - GNUNET_break_op (0); - return; - } n = lookup_neighbour (peer); - if (NULL == n) + if (NULL == n) + return GNUNET_TIME_UNIT_FOREVER_REL; + switch (n->state) { + case S_CONNECTED: + case S_RECONNECT_SENT: + case S_RECONNECT_ATS: + return n->latency; + case S_NOT_CONNECTED: + case S_INIT_BLACKLIST: + case S_INIT_ATS: + case S_CONNECT_SENT: + case S_CONNECT_RECV_BLACKLIST: + case S_DISCONNECT: + case S_DISCONNECT_FINISHED: + return GNUNET_TIME_UNIT_FOREVER_REL; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); GNUNET_break (0); - return; - } - if (S_CONNECTED == n->state) - return; - if (!is_connecting (n)) - { - GNUNET_STATISTICS_update (GST_stats, - gettext_noop ("# unexpected ACK messages"), 1, - GNUNET_NO); - return; - } - change_state (n, S_CONNECTED); - if (NULL != session) - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "transport-ats", - "Giving ATS session %p of plugin %s for peer %s\n", - session, address->transport_name, GNUNET_i2s (peer)); - GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count); - GNUNET_assert (n->address != NULL); - - if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address, n->address))) - { - GST_validation_set_address_use (n->address, n->session, GNUNET_YES); - GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES); - n->address_state = USED; + break; } - - - neighbours_connected++; - GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), 1, - GNUNET_NO); - - GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); - if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK) - n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n); -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Notify about connect of `%4s' using address '%s' session %X LINE %u\n", - GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session, - __LINE__); -#endif - connect_notify_cb (callback_cls, &n->id, ats, ats_count); - send_outbound_quota (peer, n->bandwidth_out); + return GNUNET_TIME_UNIT_FOREVER_REL; } -struct BlackListCheckContext -{ - struct GNUNET_ATS_Information *ats; - - uint32_t ats_count; - - struct Session *session; - - struct GNUNET_HELLO_Address *address; - - struct GNUNET_TIME_Absolute ts; -}; - -static void -handle_connect_blacklist_cont (void *cls, - const struct GNUNET_PeerIdentity *peer, - int result) +/** + * Obtain current address information for the given neighbour. + * + * @param peer + * @return address currently used + */ +struct GNUNET_HELLO_Address * +GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer) { struct NeighbourMapEntry *n; - struct BlackListCheckContext *bcc = cls; - -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Blacklist check due to CONNECT message: `%s'\n", - GNUNET_i2s (peer), - (result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN"); -#endif - - /* not allowed */ - if (GNUNET_OK != result) - { - GNUNET_HELLO_address_free (bcc->address); - GNUNET_free (bcc); - return; - } n = lookup_neighbour (peer); if (NULL == n) - n = setup_neighbour (peer); - - if (bcc->ts.abs_value > n->connect_ts.abs_value) - { - if (NULL != bcc->session) - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, - "transport-ats", - "Giving ATS session %p of address `%s' for peer %s\n", - bcc->session, GST_plugins_a2s (bcc->address), - GNUNET_i2s (peer)); - /* Tell ATS about the session, so ATS can suggest it if it likes it. */ - - GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->ats, - bcc->ats_count); - n->connect_ts = bcc->ts; - } - - GNUNET_HELLO_address_free (bcc->address); - GNUNET_free (bcc); - - if (n->state != S_CONNECT_RECV) - change_state (n, S_CONNECT_RECV); - - - /* Ask ATS for an address to connect via that address */ - if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) - GNUNET_SCHEDULER_cancel (n->ats_suggest); - n->ats_suggest = - GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cancel, - n); - GNUNET_ATS_suggest_address (GST_ats, peer); + return NULL; + return n->primary_address.address; } + /** - * We received a 'SESSION_CONNECT' message from the other peer. - * Consider switching to it. + * Initialize the neighbours subsystem. * - * @param message possibly a 'struct SessionConnectMessage' (check format) - * @param peer identity of the peer to switch the address for - * @param address address of the other peer, NULL if other peer - * connected to us - * @param session session to use (or NULL) - * @param ats performance data - * @param ats_count number of entries in ats (excluding 0-termination) + * @param cls closure for callbacks + * @param connect_cb function to call if we connect to a peer + * @param disconnect_cb function to call if we disconnect from a peer + * @param peer_address_cb function to call if we change an active address + * of a neighbour */ void -GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, - const struct GNUNET_PeerIdentity *peer, - const struct GNUNET_HELLO_Address *address, - struct Session *session, - const struct GNUNET_ATS_Information *ats, - uint32_t ats_count) +GST_neighbours_start (void *cls, + GNUNET_TRANSPORT_NotifyConnect connect_cb, + GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb, + GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb) { - const struct SessionConnectMessage *scm; - struct BlackListCheckContext *bcc = NULL; - struct NeighbourMapEntry *n; + callback_cls = cls; + connect_notify_cb = connect_cb; + disconnect_notify_cb = disconnect_cb; + address_change_cb = peer_address_cb; + neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); +} -#if DEBUG_TRANSPORT - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer)); -#endif - if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) - { - GNUNET_break_op (0); - return; - } +/** + * Disconnect from the given neighbour. + * + * @param cls unused + * @param key hash of neighbour's public key (not used) + * @param value the 'struct NeighbourMapEntry' of the neighbour + * @return GNUNET_OK (continue to iterate) + */ +static int +disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value) +{ + struct NeighbourMapEntry *n = value; - scm = (const struct SessionConnectMessage *) message; - GNUNET_break_op (ntohl (scm->reserved) == 0); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Disconnecting peer `%4s', %s\n", + GNUNET_i2s (&n->id), "SHUTDOWN_TASK"); + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return GNUNET_OK; +} - GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count); - n = lookup_neighbour (peer); - if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n->state))) - { - /* connected peer switches addresses or is trying to do a fast reconnect*/ +/** + * Cleanup the neighbours subsystem. + */ +void +GST_neighbours_stop () +{ + if (NULL == neighbours) return; - } - - - /* we are not connected to this peer */ - /* do blacklist check */ - bcc = - GNUNET_malloc (sizeof (struct BlackListCheckContext) + - sizeof (struct GNUNET_ATS_Information) * (ats_count + 1)); - bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); - bcc->ats_count = ats_count + 1; - bcc->address = GNUNET_HELLO_address_copy (address); - bcc->session = session; - bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1]; - memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count); - bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); - bcc->ats[ats_count].value = - htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value); - GST_blacklist_test_allowed (peer, address->transport_name, - &handle_connect_blacklist_cont, bcc); + GNUNET_CONTAINER_multihashmap_iterate (neighbours, + &disconnect_all_neighbours, + NULL); + GNUNET_CONTAINER_multihashmap_destroy (neighbours); + neighbours = NULL; + callback_cls = NULL; + connect_notify_cb = NULL; + disconnect_notify_cb = NULL; + address_change_cb = NULL; } |