diff options
Diffstat (limited to 'src/transport/gnunet-service-transport_neighbours.c')
-rw-r--r-- | src/transport/gnunet-service-transport_neighbours.c | 537 |
1 files changed, 422 insertions, 115 deletions
diff --git a/src/transport/gnunet-service-transport_neighbours.c b/src/transport/gnunet-service-transport_neighbours.c index c580168..cad325d 100644 --- a/src/transport/gnunet-service-transport_neighbours.c +++ b/src/transport/gnunet-service-transport_neighbours.c @@ -40,6 +40,7 @@ #include "transport.h" + /** * Size of the neighbour hash map. */ @@ -91,6 +92,177 @@ */ #define BLACKLIST_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, 500) +#define DEBUG_MALLOC GNUNET_NO + +#if DEBUG_MALLOC + +struct Allocator +{ + struct Allocator *prev; + struct Allocator *next; + + unsigned int bytes_alloced; + unsigned int max_alloced; + unsigned int diff; + unsigned int line; + + struct GNUNET_TIME_Absolute max_alloced_when; + struct GNUNET_TIME_Absolute last_alloced_when; + +}; + +struct Allocator *aehead; +struct Allocator *aetail; + +struct Allocation +{ + struct Allocation *prev; + struct Allocation *next; + + struct Allocator *alloc; + unsigned int bytes_alloced; + void *p; + unsigned int line; +}; + +struct Allocation *ahead; +struct Allocation *atail; + +static int bytes_alloced; + +static struct Allocator * +find_allocator (int line) +{ + struct Allocator *cur = aehead; + while (NULL != cur) + { + if (line == cur->line) + return cur; + cur = cur->next; + } + return cur; +} + +static void +print_allocators () +{ + static int start = GNUNET_YES; + static struct GNUNET_TIME_Absolute next; + static struct GNUNET_TIME_Relative rem; + struct Allocator *cur = aehead; + if (start) + { + next = GNUNET_TIME_UNIT_ZERO_ABS; + start = GNUNET_NO; + } + if (0 == (rem = GNUNET_TIME_absolute_get_remaining(next)).rel_value) + { + fprintf (stderr, "Allocated in `%s' total: %5u bytes\n", __FILE__, bytes_alloced); + while (NULL != cur) + { + char *last_alloc = GNUNET_strdup (GNUNET_STRINGS_absolute_time_to_string(cur->max_alloced_when)); + fprintf (stderr, "Allocated from line %4u :%5u bytes (diff %5i bytes, max alloc: %5u @ %s, last alloc %s)\n", + cur->line, cur->bytes_alloced, cur->diff, cur->max_alloced, + last_alloc, + GNUNET_STRINGS_absolute_time_to_string(cur->last_alloced_when)); + GNUNET_free (last_alloc); + cur->diff = 0; + cur = cur->next; + } + fprintf (stderr, "\n"); + next = GNUNET_TIME_absolute_add(GNUNET_TIME_absolute_get(), GNUNET_TIME_UNIT_SECONDS); + } +} + +#endif + +static void +MEMDEBUG_add_alloc (void *p, size_t size, int line) +{ +#if DEBUG_MALLOC + struct Allocation *alloc = GNUNET_malloc (sizeof (struct Allocation)); + struct Allocator *allocator = find_allocator(line); + if (NULL == allocator) + { + allocator = GNUNET_malloc (sizeof (struct Allocator)); + allocator->line = line; + GNUNET_CONTAINER_DLL_insert (aehead, aetail, allocator); + } + alloc->alloc = allocator; + alloc->p = p; + alloc->line = line; + alloc->bytes_alloced = size; + allocator->bytes_alloced += size; + allocator->last_alloced_when = GNUNET_TIME_absolute_get(); + if (allocator->bytes_alloced >= allocator->max_alloced) + { + allocator->max_alloced = allocator->bytes_alloced; + allocator->max_alloced_when = allocator->last_alloced_when; + } + allocator->diff += size; + GNUNET_CONTAINER_DLL_insert (ahead, atail, alloc); + print_allocators (); + bytes_alloced += size; +#endif +} + + +static void * +MEMDEBUG_malloc (size_t size, int line) +{ + void * ret; + + ret = GNUNET_malloc (size); +#if DEBUG_MALLOC + if (NULL != ret) + MEMDEBUG_add_alloc (ret, size, line); +#endif + return ret; + +} + +static void +MEMDEBUG_free (void * alloc, int line) +{ +#if DEBUG_MALLOC + struct Allocation *cur; + struct Allocator *allocator; + cur = ahead; + while (NULL != cur) + { + if (alloc == cur->p) + break; + cur = cur->next; + } + if (NULL == cur) + { + fprintf (stderr, "Unmonitored free from line %4u\n", line); + GNUNET_break (0); + return; + } + allocator = cur->alloc; + if (NULL == allocator) + { + GNUNET_break (0); + } + GNUNET_CONTAINER_DLL_remove (ahead, atail, cur); + allocator->bytes_alloced -= cur->bytes_alloced; + allocator->diff -= cur->bytes_alloced; + GNUNET_assert (allocator->bytes_alloced >= 0); + bytes_alloced -= cur->bytes_alloced; + GNUNET_assert (bytes_alloced >= 0); + GNUNET_free (cur); +#endif + GNUNET_free (alloc); +} + +static void +MEMDEBUG_free_non_null (void * alloc, int line) +{ + if (alloc != NULL) + MEMDEBUG_free (alloc, line); +} + GNUNET_NETWORK_STRUCT_BEGIN @@ -221,10 +393,10 @@ struct MessageQueue * Possible state of a neighbour. Initially, we are S_NOT_CONNECTED. * * Then, there are two main paths. If we receive a CONNECT message, we - * first run a check against the blacklist and ask ATS for a - * suggestion. (S_CONNECT_RECV_ATS). If the blacklist comes back - * positive, we give the address to ATS. If ATS makes a suggestion, - * we ALSO give that suggestion to the blacklist + * first run a check against the blacklist (S_CONNECT_RECV_BLACKLIST_INBOUND). + * If this check is successful, we give the inbound address to ATS. + * After the check we ask ATS for a suggestion (S_CONNECT_RECV_ATS). + * If ATS makes a suggestion, we ALSO give that suggestion to the blacklist * (S_CONNECT_RECV_BLACKLIST). Once the blacklist approves the * address we got from ATS, we send our CONNECT_ACK and go to * S_CONNECT_RECV_ACK. If we receive a SESSION_ACK, we go to @@ -302,6 +474,11 @@ enum State S_CONNECT_SENT, /** + * Received a CONNECT, do a blacklist check for inbound address + */ + S_CONNECT_RECV_BLACKLIST_INBOUND, + + /** * Received a CONNECT, asking ATS about address suggestions. */ S_CONNECT_RECV_ATS, @@ -363,7 +540,13 @@ enum State S_DISCONNECT, /** - * We're finished with the disconnect; clean up state now! + * We're finished with the disconnect; and are cleaning up the state + * now! We put the struct into this state when we are really in the + * task that calls 'free' on it and are about to remove the record + * from the map. We should never find a 'struct NeighbourMapEntry' + * in this state in the map. Accessing a 'struct NeighbourMapEntry' + * in this state virtually always means using memory that has been + * freed (the exception being the cleanup code in 'free_neighbour'). */ S_DISCONNECT_FINISHED }; @@ -474,6 +657,11 @@ struct NeighbourMapEntry struct GNUNET_TIME_Absolute connect_ack_timestamp; /** + * ATS address suggest handle + */ + struct GNUNET_ATS_SuggestHandle *suggest_handle; + + /** * Time where we should cut the connection (timeout) if we don't * make progress in the state machine (or get a KEEPALIVE_RESPONSE * if we are in S_CONNECTED). @@ -584,7 +772,7 @@ static void *callback_cls; /** * Function to call when we connected to a neighbour. */ -static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb; +static NotifyConnect connect_notify_cb; /** * Function to call when we disconnected from a neighbour. @@ -629,56 +817,40 @@ print_state (int state) { case S_NOT_CONNECTED: return "S_NOT_CONNECTED"; - break; case S_INIT_ATS: return "S_INIT_ATS"; - break; case S_INIT_BLACKLIST: return "S_INIT_BLACKLIST"; - break; case S_CONNECT_SENT: return "S_CONNECT_SENT"; - break; + case S_CONNECT_RECV_BLACKLIST_INBOUND: + return "S_CONNECT_RECV_BLACKLIST_INBOUND"; case S_CONNECT_RECV_ATS: return "S_CONNECT_RECV_ATS"; - break; case S_CONNECT_RECV_BLACKLIST: return "S_CONNECT_RECV_BLACKLIST"; - break; case S_CONNECT_RECV_ACK: return "S_CONNECT_RECV_ACK"; - break; case S_CONNECTED: return "S_CONNECTED"; - break; case S_RECONNECT_ATS: return "S_RECONNECT_ATS"; - break; case S_RECONNECT_BLACKLIST: return "S_RECONNECT_BLACKLIST"; - break; case S_RECONNECT_SENT: return "S_RECONNECT_SENT"; - break; case S_CONNECTED_SWITCHING_BLACKLIST: return "S_CONNECTED_SWITCHING_BLACKLIST"; - break; case S_CONNECTED_SWITCHING_CONNECT_SENT: return "S_CONNECTED_SWITCHING_CONNECT_SENT"; - break; case S_DISCONNECT: return "S_DISCONNECT"; - break; case S_DISCONNECT_FINISHED: return "S_DISCONNECT_FINISHED"; - break; default: - return "UNDEFINED"; GNUNET_break (0); - break; + return "UNDEFINED"; } - GNUNET_break (0); - return "UNDEFINED"; } /** @@ -698,6 +870,7 @@ test_connected (struct NeighbourMapEntry *n) case S_INIT_ATS: case S_INIT_BLACKLIST: case S_CONNECT_SENT: + case S_CONNECT_RECV_BLACKLIST_INBOUND: case S_CONNECT_RECV_ATS: case S_CONNECT_RECV_BLACKLIST: case S_CONNECT_RECV_ACK: @@ -757,11 +930,14 @@ free_address (struct NeighbourAddress *na) { GST_validation_set_address_use (na->address, na->session, GNUNET_NO, __LINE__); GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_NO); + address_change_cb (NULL, &na->address->peer, NULL); } + na->ats_active = GNUNET_NO; if (NULL != na->address) { - GNUNET_HELLO_address_free (na->address); + MEMDEBUG_free (na->address, __LINE__); + //GNUNET_HELLO_address_free (na->address); na->address = NULL; } na->session = NULL; @@ -789,7 +965,6 @@ set_address (struct NeighbourAddress *na, int is_active) { struct GNUNET_TRANSPORT_PluginFunctions *papi; - if (NULL == (papi = GST_plugins_find (address->transport_name))) { GNUNET_break (0); @@ -804,6 +979,8 @@ set_address (struct NeighbourAddress *na, na->ats_active = is_active; GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, is_active); GST_validation_set_address_use (na->address, na->session, is_active, __LINE__); + if (is_active) + address_change_cb (NULL, &address->peer, address); } if (GNUNET_YES == is_active) { @@ -825,6 +1002,7 @@ set_address (struct NeighbourAddress *na, return; } na->address = GNUNET_HELLO_address_copy (address); + MEMDEBUG_add_alloc (na->address, GNUNET_HELLO_address_get_size (na->address), __LINE__); na->bandwidth_in = bandwidth_in; na->bandwidth_out = bandwidth_out; na->session = session; @@ -832,10 +1010,9 @@ set_address (struct NeighbourAddress *na, if (GNUNET_YES == is_active) { /* Telling ATS about new session */ - GNUNET_ATS_address_update (GST_ats, na->address, na->session, NULL, 0); GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_YES); GST_validation_set_address_use (na->address, na->session, GNUNET_YES, __LINE__); - + address_change_cb (NULL, &address->peer, address); /* FIXME: is this the right place to set quotas? */ GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in); send_outbound_quota (&address->peer, bandwidth_out); @@ -855,6 +1032,7 @@ free_neighbour (struct NeighbourMapEntry *n, int keep_sessions) { struct MessageQueue *mq; struct GNUNET_TRANSPORT_PluginFunctions *papi; + struct GNUNET_HELLO_Address *backup_primary; n->is_active = NULL; /* always free'd by its own continuation! */ @@ -863,8 +1041,8 @@ free_neighbour (struct NeighbourMapEntry *n, int keep_sessions) { GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); if (NULL != mq->cont) - mq->cont (mq->cont_cls, GNUNET_SYSERR); - GNUNET_free (mq); + mq->cont (mq->cont_cls, GNUNET_SYSERR, mq->message_buf_size, 0); + MEMDEBUG_free (mq, __LINE__); } /* It is too late to send other peer disconnect notifications, but at least internally we need to get clean... */ @@ -877,6 +1055,20 @@ free_neighbour (struct NeighbourMapEntry *n, int keep_sessions) disconnect_notify_cb (callback_cls, &n->id); } + n->state = S_DISCONNECT_FINISHED; + + if (NULL != n->primary_address.address) + { + backup_primary = GNUNET_HELLO_address_copy(n->primary_address.address); + MEMDEBUG_add_alloc (backup_primary, GNUNET_HELLO_address_get_size(backup_primary), __LINE__); + } + else + backup_primary = NULL; + + /* free addresses and mark as unused */ + free_address (&n->primary_address); + free_address (&n->alternative_address); + /* FIXME-PLUGIN-API: This does not seem to guarantee that all transport sessions eventually get killed due to inactivity; they MUST have their own timeout logic (but at least TCP doesn't have @@ -886,26 +1078,28 @@ free_neighbour (struct NeighbourMapEntry *n, int keep_sessions) API gives us not even the means to selectively kill only one of them! Killing all sessions like this seems to be very, very wrong. */ + + /* cut transport-level connection */ if ((GNUNET_NO == keep_sessions) && - (NULL != n->primary_address.address) && - (NULL != (papi = GST_plugins_find (n->primary_address.address->transport_name)))) + (NULL != backup_primary) && + (NULL != (papi = GST_plugins_find (backup_primary->transport_name)))) papi->disconnect (papi->cls, &n->id); - n->state = S_DISCONNECT_FINISHED; + MEMDEBUG_free_non_null (backup_primary, __LINE__); GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (neighbours, &n->id.hashPubKey, n)); - /* cut transport-level connection */ - free_address (&n->primary_address); - free_address (&n->alternative_address); - // FIXME-ATS-API: we might want to be more specific about // which states we do this from in the future (ATS should // have given us a 'suggest_address' handle, and if we have // such a handle, we should cancel the operation here! - GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id); + if (NULL != n->suggest_handle) + { + GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id); + n->suggest_handle = NULL; + } if (GNUNET_SCHEDULER_NO_TASK != n->task) { @@ -913,10 +1107,9 @@ free_neighbour (struct NeighbourMapEntry *n, int keep_sessions) n->task = GNUNET_SCHEDULER_NO_TASK; } /* free rest of memory */ - GNUNET_free (n); + MEMDEBUG_free (n, __LINE__); } - /** * Transmit a message using the current session of the given * neighbour. @@ -940,15 +1133,15 @@ send_with_session (struct NeighbourMapEntry *n, struct GNUNET_TRANSPORT_PluginFunctions *papi; GNUNET_assert (n->primary_address.session != NULL); - if ( ( (NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name))) || + if ( ((NULL == (papi = GST_plugins_find (n->primary_address.address->transport_name)) || (-1 == papi->send (papi->cls, n->primary_address.session, msgbuf, msgbuf_size, priority, timeout, - cont, cont_cls))) && - (NULL != cont) ) - cont (cont_cls, &n->id, GNUNET_SYSERR); + cont, cont_cls)))) && + (NULL != cont)) + cont (cont_cls, &n->id, GNUNET_SYSERR, msgbuf_size, 0); GNUNET_break (NULL != papi); } @@ -973,10 +1166,12 @@ master_task (void *cls, * @param cls NULL * @param target identity of the neighbour that was disconnected * @param result GNUNET_OK if the disconnect got out successfully + * @param payload bytes payload + * @param physical bytes physical */ static void send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target, - int result) + int result, size_t payload, size_t physical) { struct NeighbourMapEntry *n; @@ -1056,7 +1251,8 @@ disconnect_neighbour (struct NeighbourMapEntry *n) case S_CONNECT_SENT: send_disconnect (n); n->state = S_DISCONNECT; - break; + break; + case S_CONNECT_RECV_BLACKLIST_INBOUND: case S_CONNECT_RECV_ATS: case S_CONNECT_RECV_BLACKLIST: /* we never ACK'ed the other peer's request, no need to send DISCONNECT */ @@ -1118,18 +1314,20 @@ disconnect_neighbour (struct NeighbourMapEntry *n) * @param cls the 'struct MessageQueue' of the message * @param receiver intended receiver * @param success whether it worked or not + * @param size_payload bytes payload sent + * @param physical bytes sent on wire */ static void transmit_send_continuation (void *cls, const struct GNUNET_PeerIdentity *receiver, - int success) + int success, size_t size_payload, size_t physical) { struct MessageQueue *mq = cls; struct NeighbourMapEntry *n; if (NULL == (n = lookup_neighbour (receiver))) { - GNUNET_free (mq); + MEMDEBUG_free (mq, __LINE__); return; /* disconnect or other error while transmitting, can happen */ } if (n->is_active == mq) @@ -1141,7 +1339,18 @@ transmit_send_continuation (void *cls, GNUNET_SCHEDULER_cancel (n->task); n->task = GNUNET_SCHEDULER_add_now (&master_task, n); } - GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size); + if (bytes_in_send_queue < mq->message_buf_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Bytes_in_send_queue `%u', Message_size %u, result: %s, payload %u, on wire %u\n", + bytes_in_send_queue, mq->message_buf_size, + (GNUNET_OK == success) ? "OK" : "FAIL", + size_payload, physical); + GNUNET_break (0); + } + + + GNUNET_break (size_payload == mq->message_buf_size); bytes_in_send_queue -= mq->message_buf_size; GNUNET_STATISTICS_set (GST_stats, gettext_noop @@ -1163,8 +1372,8 @@ transmit_send_continuation (void *cls, ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->type), (success == GNUNET_OK) ? "success" : "FAILURE"); if (NULL != mq->cont) - mq->cont (mq->cont_cls, success); - GNUNET_free (mq); + mq->cont (mq->cont_cls, success, size_payload, physical); + MEMDEBUG_free (mq, __LINE__); } @@ -1216,7 +1425,7 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) 1, GNUNET_NO); GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); n->is_active = mq; - transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout */ + transmit_send_continuation (mq, &n->id, GNUNET_SYSERR, mq->message_buf_size, 0); /* timeout */ } if (NULL == mq) return; /* no more messages */ @@ -1231,8 +1440,9 @@ try_transmission_to_peer (struct NeighbourMapEntry *n) /** * Send keepalive message to the neighbour. Must only be called - * if we are on 'connected' state. Will internally determine - * if a keepalive is truly needed (so can always be called). + * if we are on 'connected' state or while trying to switch addresses. + * Will internally determine if a keepalive is truly needed (so can + * always be called). * * @param n neighbour that went idle and needs a keepalive */ @@ -1241,7 +1451,9 @@ send_keepalive (struct NeighbourMapEntry *n) { struct GNUNET_MessageHeader m; - GNUNET_assert (S_CONNECTED == n->state); + GNUNET_assert ((S_CONNECTED == n->state) || + (S_CONNECTED_SWITCHING_BLACKLIST == n->state) || + (S_CONNECTED_SWITCHING_CONNECT_SENT)); if (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time).rel_value > 0) return; /* no keepalive needed at this time */ m.size = htons (sizeof (struct GNUNET_MessageHeader)); @@ -1336,7 +1548,7 @@ GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighbour, n->expect_latency_response = GNUNET_NO; n->latency = GNUNET_TIME_absolute_get_duration (n->last_keep_alive_time); n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n", GNUNET_i2s (&n->id), n->latency.rel_value); memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count); @@ -1466,14 +1678,14 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, { GNUNET_break (0); if (NULL != cont) - cont (cont_cls, GNUNET_SYSERR); + cont (cont_cls, GNUNET_SYSERR, msg_size, 0); return; } if (GNUNET_YES != test_connected (n)) { GNUNET_break (0); if (NULL != cont) - cont (cont_cls, GNUNET_SYSERR); + cont (cont_cls, GNUNET_SYSERR, msg_size, 0); return; } bytes_in_send_queue += msg_size; @@ -1481,7 +1693,7 @@ GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void *msg, gettext_noop ("# bytes in message queue for other peers"), bytes_in_send_queue, GNUNET_NO); - mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size); + mq = MEMDEBUG_malloc (sizeof (struct MessageQueue) + msg_size, __LINE__); mq->cont = cont; mq->cont_cls = cont_cls; memcpy (&mq[1], msg, msg_size); @@ -1508,7 +1720,7 @@ send_session_connect (struct NeighbourAddress *na) { struct GNUNET_TRANSPORT_PluginFunctions *papi; struct SessionConnectMessage connect_msg; - + if (NULL == (papi = GST_plugins_find (na->address->transport_name))) { GNUNET_break (0); @@ -1532,6 +1744,7 @@ send_session_connect (struct NeighbourAddress *na) UINT_MAX, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); + } @@ -1549,7 +1762,7 @@ send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address, { struct GNUNET_TRANSPORT_PluginFunctions *papi; struct SessionConnectMessage connect_msg; - + if (NULL == (papi = GST_plugins_find (address->transport_name))) { GNUNET_break (0); @@ -1572,6 +1785,7 @@ send_session_connect_ack_message (const struct GNUNET_HELLO_Address *address, UINT_MAX, GNUNET_TIME_UNIT_FOREVER_REL, NULL, NULL); + } @@ -1589,7 +1803,7 @@ setup_neighbour (const struct GNUNET_PeerIdentity *peer) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating new neighbour entry for `%s'\n", GNUNET_i2s (peer)); - n = GNUNET_malloc (sizeof (struct NeighbourMapEntry)); + n = MEMDEBUG_malloc (sizeof (struct NeighbourMapEntry), __LINE__); n->id = *peer; n->state = S_NOT_CONNECTED; n->latency = GNUNET_TIME_UNIT_FOREVER_REL; @@ -1668,6 +1882,7 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) case S_INIT_ATS: case S_INIT_BLACKLIST: case S_CONNECT_SENT: + case S_CONNECT_RECV_BLACKLIST_INBOUND: case S_CONNECT_RECV_ATS: case S_CONNECT_RECV_BLACKLIST: case S_CONNECT_RECV_ACK: @@ -1704,7 +1919,7 @@ GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); GNUNET_ATS_reset_backoff (GST_ats, target); - GNUNET_ATS_suggest_address (GST_ats, target); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, target); } @@ -1728,14 +1943,6 @@ handle_test_blacklist_cont (void *cls, "Connection to new address of peer `%s' based on blacklist is `%s'\n", GNUNET_i2s (peer), (GNUNET_OK == result) ? "allowed" : "FORBIDDEN"); - if (GNUNET_OK == result) - { - /* valid new address, let ATS know! */ - GNUNET_ATS_address_update (GST_ats, - bcc->na.address, - bcc->na.session, - bcc->ats, bcc->ats_count); - } if (NULL == (n = lookup_neighbour (peer))) goto cleanup; /* nobody left to care about new address */ switch (n->state) @@ -1777,7 +1984,7 @@ handle_test_blacklist_cont (void *cls, n->state = S_INIT_ATS; n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); // FIXME: do we need to ask ATS again for suggestions? - GNUNET_ATS_suggest_address (GST_ats, &n->id); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, &n->id); } break; case S_CONNECT_SENT: @@ -1791,9 +1998,23 @@ handle_test_blacklist_cont (void *cls, n->connect_ack_timestamp); } break; + case S_CONNECT_RECV_BLACKLIST_INBOUND: + if (GNUNET_OK == result) + { + /* valid new address, let ATS know! */ + GNUNET_ATS_address_add (GST_ats, + bcc->na.address, + bcc->na.session, + bcc->ats, bcc->ats_count); + } + n->state = S_CONNECT_RECV_ATS; + n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); + GNUNET_ATS_reset_backoff (GST_ats, peer); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, peer); + break; case S_CONNECT_RECV_ATS: /* still waiting on ATS suggestion, don't care about blacklist */ - break; + break; case S_CONNECT_RECV_BLACKLIST: if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address)) break; /* result for an address we currently don't care about */ @@ -1818,7 +2039,7 @@ handle_test_blacklist_cont (void *cls, n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); // FIXME: do we need to ask ATS again for suggestions? GNUNET_ATS_reset_backoff (GST_ats, peer); - GNUNET_ATS_suggest_address (GST_ats, &n->id); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, &n->id); } break; case S_CONNECT_RECV_ACK: @@ -1863,7 +2084,7 @@ handle_test_blacklist_cont (void *cls, n->state = S_RECONNECT_ATS; n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); // FIXME: do we need to ask ATS again for suggestions? - GNUNET_ATS_suggest_address (GST_ats, &n->id); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, &n->id); } break; case S_RECONNECT_SENT: @@ -1919,11 +2140,12 @@ handle_test_blacklist_cont (void *cls, break; } cleanup: - GNUNET_HELLO_address_free (bcc->na.address); + MEMDEBUG_free (bcc->na.address, __LINE__); + //GNUNET_HELLO_address_free (bcc->na.address); GNUNET_CONTAINER_DLL_remove (bc_head, bc_tail, bcc); - GNUNET_free (bcc); + MEMDEBUG_free (bcc, __LINE__); } @@ -1951,10 +2173,11 @@ check_blacklist (const struct GNUNET_PeerIdentity *peer, struct GST_BlacklistCheck *bc; bcc = - GNUNET_malloc (sizeof (struct BlackListCheckContext) + - sizeof (struct GNUNET_ATS_Information) * ats_count); + MEMDEBUG_malloc (sizeof (struct BlackListCheckContext) + + sizeof (struct GNUNET_ATS_Information) * ats_count, __LINE__); bcc->ats_count = ats_count; bcc->na.address = GNUNET_HELLO_address_copy (address); + MEMDEBUG_add_alloc (bcc->na.address, GNUNET_HELLO_address_get_size (address), __LINE__); bcc->na.session = session; bcc->na.connect_timestamp = ts; bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1]; @@ -1998,6 +2221,7 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer)); + if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) { GNUNET_break_op (0); @@ -2013,18 +2237,21 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, n = setup_neighbour (peer); n->send_connect_ack = 1; n->connect_ack_timestamp = ts; + switch (n->state) { case S_NOT_CONNECTED: - n->state = S_CONNECT_RECV_ATS; - n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); - GNUNET_ATS_reset_backoff (GST_ats, peer); - GNUNET_ATS_suggest_address (GST_ats, peer); + n->state = S_CONNECT_RECV_BLACKLIST_INBOUND; + /* Do a blacklist check for the new address */ check_blacklist (peer, ts, address, session, ats, ats_count); break; case S_INIT_ATS: + /* CONNECT message takes priority over us asking ATS for address */ + n->state = S_CONNECT_RECV_BLACKLIST_INBOUND; + /* fallthrough */ case S_INIT_BLACKLIST: case S_CONNECT_SENT: + case S_CONNECT_RECV_BLACKLIST_INBOUND: case S_CONNECT_RECV_ATS: case S_CONNECT_RECV_BLACKLIST: case S_CONNECT_RECV_ACK: @@ -2068,7 +2295,7 @@ GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, n = setup_neighbour (peer); n->state = S_CONNECT_RECV_ATS; GNUNET_ATS_reset_backoff (GST_ats, peer); - GNUNET_ATS_suggest_address (GST_ats, peer); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, peer); break; case S_DISCONNECT_FINISHED: /* should not be possible */ @@ -2118,6 +2345,8 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, if (NULL == (papi = GST_plugins_find (address->transport_name))) { /* we don't have the plugin for this address */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "2348 : `%s' \n", address->transport_name); GNUNET_ATS_address_destroyed (GST_ats, address, NULL); return; } @@ -2125,11 +2354,27 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, { GNUNET_break (0); if (strlen (address->transport_name) > 0) - GNUNET_ATS_address_destroyed (GST_ats, address, session); + GNUNET_ATS_address_destroyed (GST_ats, address, NULL); return; } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "ATS tells us to switch to address '%s' session %p for " + "peer `%s' in state %s (quota in/out %u %u )\n", + (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>", + session, + GNUNET_i2s (peer), + print_state (n->state), + ntohl (bandwidth_in.value__), + ntohl (bandwidth_out.value__)); + if (NULL == session) + { session = papi->get_session (papi->cls, address); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Obtained new session for peer `%s' and address '%s': %p\n", + GNUNET_i2s (&address->peer), GST_plugins_a2s (address), session); + } if (NULL == session) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -2138,10 +2383,6 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, GNUNET_ATS_address_destroyed (GST_ats, address, NULL); return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "ATS tells us to switch to address '%s' for peer `%s'\n", - (address->address_length != 0) ? GST_plugins_a2s (address): "<inbound>", - GNUNET_i2s (peer)); switch (n->state) { case S_NOT_CONNECTED: @@ -2185,6 +2426,12 @@ GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, n->connect_ack_timestamp, address, session, ats, ats_count); break; + case S_CONNECT_RECV_BLACKLIST_INBOUND: + n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEOUT); + check_blacklist (&n->id, + n->connect_ack_timestamp, + address, session, ats, ats_count); + break; case S_CONNECT_RECV_BLACKLIST: case S_CONNECT_RECV_ACK: /* ATS asks us to switch while we were trying to connect; switch to new @@ -2308,9 +2555,9 @@ master_task (void *cls, n->task = GNUNET_SCHEDULER_NO_TASK; delay = GNUNET_TIME_absolute_get_remaining (n->timeout); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "master task runs for neighbour `%s' in state %d with timeout in %llu ms\n", + "Master task runs for neighbour `%s' in state %s with timeout in %llu ms\n", GNUNET_i2s (&n->id), - n->state, + print_state(n->state), (unsigned long long) delay.rel_value); switch (n->state) { @@ -2352,6 +2599,17 @@ master_task (void *cls, return; } break; + case S_CONNECT_RECV_BLACKLIST_INBOUND: + if (0 == delay.rel_value) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Connection to `%s' timed out waiting BLACKLIST to approve address to use for received CONNECT\n", + GNUNET_i2s (&n->id)); + n->state = S_DISCONNECT_FINISHED; + free_neighbour (n, GNUNET_NO); + return; + } + break; case S_CONNECT_RECV_ATS: if (0 == delay.rel_value) { @@ -2454,7 +2712,6 @@ master_task (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Cleaning up connection to `%s' after sending DISCONNECT\n", GNUNET_i2s (&n->id)); - n->state = S_DISCONNECT_FINISHED; free_neighbour (n, GNUNET_NO); return; case S_DISCONNECT_FINISHED: @@ -2530,6 +2787,7 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received CONNECT_ACK message from peer `%s'\n", GNUNET_i2s (peer)); + if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) { GNUNET_break_op (0); @@ -2568,7 +2826,14 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, gettext_noop ("# peers connected"), ++neighbours_connected, GNUNET_NO); - connect_notify_cb (callback_cls, &n->id, ats, ats_count); + connect_notify_cb (callback_cls, &n->id, ats, ats_count, + n->primary_address.bandwidth_in, + n->primary_address.bandwidth_out); + /* Tell ATS that the outbound session we created to send CONNECT was successfull */ + GNUNET_ATS_address_add (GST_ats, + n->primary_address.address, + n->primary_address.session, + ats, ats_count); set_address (&n->primary_address, n->primary_address.address, n->primary_address.session, @@ -2577,6 +2842,7 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, GNUNET_YES); send_session_ack_message (n); break; + case S_CONNECT_RECV_BLACKLIST_INBOUND: case S_CONNECT_RECV_ATS: case S_CONNECT_RECV_BLACKLIST: case S_CONNECT_RECV_ACK: @@ -2612,6 +2878,10 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, n->state = S_CONNECTED; n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOUT); GNUNET_break (GNUNET_NO == n->alternative_address.ats_active); + GNUNET_ATS_address_add(GST_ats, + n->alternative_address.address, + n->alternative_address.session, + ats, ats_count); set_address (&n->primary_address, n->alternative_address.address, n->alternative_address.session, @@ -2644,8 +2914,10 @@ GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *message, * * @param peer identity of the peer where the session died * @param session session that is gone + * @return GNUNET_YES if this was a session used, GNUNET_NO if + * this session was not in use */ -void +int GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, struct Session *session) { @@ -2661,15 +2933,16 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, if (bcc->na.session == session) { GST_blacklist_test_cancel (bcc->bc); - GNUNET_HELLO_address_free (bcc->na.address); + MEMDEBUG_free (bcc->na.address, __LINE__); + //GNUNET_HELLO_address_free (bcc->na.address); GNUNET_CONTAINER_DLL_remove (bc_head, bc_tail, bcc); - GNUNET_free (bcc); + MEMDEBUG_free (bcc, __LINE__); } } if (NULL == (n = lookup_neighbour (peer))) - return; /* can't affect us */ + return GNUNET_NO; /* can't affect us */ if (session != n->primary_address.session) { if (session == n->alternative_address.session) @@ -2681,42 +2954,42 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, else GNUNET_break (0); } - return; /* doesn't affect us further */ + return GNUNET_NO; /* doesn't affect us further */ } n->expect_latency_response = GNUNET_NO; - switch (n->state) { case S_NOT_CONNECTED: GNUNET_break (0); free_neighbour (n, GNUNET_NO); - return; + return GNUNET_YES; case S_INIT_ATS: GNUNET_break (0); free_neighbour (n, GNUNET_NO); - return; + return GNUNET_YES; case S_INIT_BLACKLIST: case S_CONNECT_SENT: free_address (&n->primary_address); n->state = S_INIT_ATS; n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); // FIXME: need to ask ATS for suggestions again? - GNUNET_ATS_suggest_address (GST_ats, &n->id); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, &n->id); break; + case S_CONNECT_RECV_BLACKLIST_INBOUND: case S_CONNECT_RECV_ATS: case S_CONNECT_RECV_BLACKLIST: case S_CONNECT_RECV_ACK: /* error on inbound session; free neighbour entirely */ free_address (&n->primary_address); free_neighbour (n, GNUNET_NO); - return; + return GNUNET_YES; case S_CONNECTED: free_address (&n->primary_address); n->state = S_RECONNECT_ATS; n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); /* FIXME: is this ATS call needed? */ - GNUNET_ATS_suggest_address (GST_ats, &n->id); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, &n->id); break; case S_RECONNECT_ATS: /* we don't have an address, how can it go down? */ @@ -2727,7 +3000,7 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, n->state = S_RECONNECT_ATS; n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); // FIXME: need to ask ATS for suggestions again? - GNUNET_ATS_suggest_address (GST_ats, &n->id); + n->suggest_handle = GNUNET_ATS_suggest_address (GST_ats, &n->id); break; case S_CONNECTED_SWITCHING_BLACKLIST: /* primary went down while we were checking secondary against @@ -2752,6 +3025,7 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, break; case S_DISCONNECT_FINISHED: /* neighbour was freed and plugins told to terminate session */ + return GNUNET_NO; break; default: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_state (n->state)); @@ -2761,6 +3035,7 @@ GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, if (GNUNET_SCHEDULER_NO_TASK != n->task) GNUNET_SCHEDULER_cancel (n->task); n->task = GNUNET_SCHEDULER_add_now (&master_task, n); + return GNUNET_YES; } @@ -2814,7 +3089,13 @@ GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *message, gettext_noop ("# peers connected"), ++neighbours_connected, GNUNET_NO); - connect_notify_cb (callback_cls, &n->id, ats, ats_count); + connect_notify_cb (callback_cls, &n->id, ats, ats_count, + n->primary_address.bandwidth_in, + n->primary_address.bandwidth_out); + GNUNET_ATS_address_add(GST_ats, + n->primary_address.address, + n->primary_address.session, + ats, ats_count); set_address (&n->primary_address, n->primary_address.address, n->primary_address.session, @@ -2888,7 +3169,7 @@ GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity { struct NeighbourMapEntry *n; const struct SessionDisconnectMessage *sdm; - GNUNET_HashCode hc; + struct GNUNET_HashCode hc; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received DISCONNECT message from peer `%s'\n", @@ -2972,13 +3253,31 @@ struct IteratorContext * @return GNUNET_OK (continue to iterate) */ static int -neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value) +neighbours_iterate (void *cls, const struct GNUNET_HashCode * key, void *value) { struct IteratorContext *ic = cls; struct NeighbourMapEntry *n = value; if (GNUNET_YES == test_connected (n)) - ic->cb (ic->cb_cls, &n->id, NULL, 0, n->primary_address.address); + { + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in; + struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out; + + if (NULL != n->primary_address.address) + { + bandwidth_in = n->primary_address.bandwidth_in; + bandwidth_out = n->primary_address.bandwidth_out; + } + else + { + bandwidth_in = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; + bandwidth_out = GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT; + } + + ic->cb (ic->cb_cls, &n->id, NULL, 0, + n->primary_address.address, + bandwidth_in, bandwidth_out); + } return GNUNET_OK; } @@ -3041,14 +3340,20 @@ GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer) switch (n->state) { case S_CONNECTED: + case S_CONNECTED_SWITCHING_CONNECT_SENT: + case S_CONNECTED_SWITCHING_BLACKLIST: case S_RECONNECT_SENT: case S_RECONNECT_ATS: + case S_RECONNECT_BLACKLIST: return n->latency; case S_NOT_CONNECTED: case S_INIT_BLACKLIST: case S_INIT_ATS: - case S_CONNECT_SENT: + case S_CONNECT_RECV_BLACKLIST_INBOUND: + case S_CONNECT_RECV_ATS: case S_CONNECT_RECV_BLACKLIST: + case S_CONNECT_RECV_ACK: + case S_CONNECT_SENT: case S_DISCONNECT: case S_DISCONNECT_FINISHED: return GNUNET_TIME_UNIT_FOREVER_REL; @@ -3087,18 +3392,20 @@ GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer) * @param disconnect_cb function to call if we disconnect from a peer * @param peer_address_cb function to call if we change an active address * of a neighbour + * @param max_fds maximum number of fds to use */ void GST_neighbours_start (void *cls, - GNUNET_TRANSPORT_NotifyConnect connect_cb, + NotifyConnect connect_cb, GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb, - GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb) + GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb, + unsigned int max_fds) { callback_cls = cls; connect_notify_cb = connect_cb; disconnect_notify_cb = disconnect_cb; address_change_cb = peer_address_cb; - neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); + neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE, GNUNET_NO); } @@ -3111,7 +3418,7 @@ GST_neighbours_start (void *cls, * @return GNUNET_OK (continue to iterate) */ static int -disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *value) +disconnect_all_neighbours (void *cls, const struct GNUNET_HashCode * key, void *value) { struct NeighbourMapEntry *n = value; |