/*
This file is part of GNUnet.
Copyright (C) 2017 GNUnet e.V.
GNUnet is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
GNUnet is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
/**
* @file cadet/gnunet-service-cadet_core.c
* @brief cadet service; interaction with CORE service
* @author Bartlomiej Polot
* @author Christian Grothoff
*
* All functions in this file should use the prefix GCO (Gnunet Cadet cOre (bottom))
*
* TODO:
* - Optimization: given BROKEN messages, destroy paths (?)
*/
#include "platform.h"
#include "gnunet-service-cadet_core.h"
#include "gnunet-service-cadet_paths.h"
#include "gnunet-service-cadet_peer.h"
#include "gnunet-service-cadet_connection.h"
#include "gnunet-service-cadet_tunnels.h"
#include "gnunet_core_service.h"
#include "gnunet_statistics_service.h"
#include "cadet_protocol.h"
#define LOG(level, ...) GNUNET_log_from(level,"cadet-cor",__VA_ARGS__)
/**
* Information we keep per direction for a route.
*/
struct RouteDirection;
/**
* Set of CadetRoutes that have exactly the same number of messages
* in their buffer. Used so we can efficiently find all of those
* routes that have the current maximum of messages in the buffer (in
* case we have to purge).
*/
struct Rung
{
/**
* Rung of RouteDirections with one more buffer entry each.
*/
struct Rung *next;
/**
* Rung of RouteDirections with one less buffer entry each.
*/
struct Rung *prev;
/**
* DLL of route directions with a number of buffer entries matching this rung.
*/
struct RouteDirection *rd_head;
/**
* DLL of route directions with a number of buffer entries matching this rung.
*/
struct RouteDirection *rd_tail;
/**
* Total number of route directions in this rung.
*/
unsigned int num_routes;
/**
* Number of messages route directions at this rung have
* in their buffer.
*/
unsigned int rung_off;
};
/**
* Information we keep per direction for a route.
*/
struct RouteDirection
{
/**
* DLL of other route directions within the same `struct Rung`.
*/
struct RouteDirection *prev;
/**
* DLL of other route directions within the same `struct Rung`.
*/
struct RouteDirection *next;
/**
* Rung of this route direction (matches length of the buffer DLL).
*/
struct Rung *rung;
/**
* Head of DLL of envelopes we have in the buffer for this direction.
*/
struct GNUNET_MQ_Envelope *env_head;
/**
* Tail of DLL of envelopes we have in the buffer for this direction.
*/
struct GNUNET_MQ_Envelope *env_tail;
/**
* Target peer.
*/
struct CadetPeer *hop;
/**
* Route this direction is part of.
*/
struct CadetRoute *my_route;
/**
* Message queue manager for @e hop.
*/
struct GCP_MessageQueueManager *mqm;
/**
* Is @e mqm currently ready for transmission?
*/
int is_ready;
};
/**
* Description of a segment of a `struct CadetConnection` at the
* intermediate peers. Routes are basically entries in a peer's
* routing table for forwarding traffic. At both endpoints, the
* routes are terminated by a `struct CadetConnection`, which knows
* the complete `struct CadetPath` that is formed by the individual
* routes.
*/
struct CadetRoute
{
/**
* Information about the next hop on this route.
*/
struct RouteDirection next;
/**
* Information about the previous hop on this route.
*/
struct RouteDirection prev;
/**
* Unique identifier for the connection that uses this route.
*/
struct GNUNET_CADET_ConnectionTunnelIdentifier cid;
/**
* When was this route last in use?
*/
struct GNUNET_TIME_Absolute last_use;
/**
* Position of this route in the #route_heap.
*/
struct GNUNET_CONTAINER_HeapNode *hn;
/**
* Options for the route, control buffering.
*/
enum GNUNET_CADET_ChannelOption options;
};
/**
* Handle to the CORE service.
*/
static struct GNUNET_CORE_Handle *core;
/**
* Routes on which this peer is an intermediate.
*/
static struct GNUNET_CONTAINER_MultiShortmap *routes;
/**
* Heap of routes, MIN-sorted by last activity.
*/
static struct GNUNET_CONTAINER_Heap *route_heap;
/**
* Rung zero (always pointed to by #rung_head).
*/
static struct Rung rung_zero;
/**
* DLL of rungs, with the head always point to a rung of
* route directions with no messages in the queue.
*/
static struct Rung *rung_head = &rung_zero;
/**
* Tail of the #rung_head DLL.
*/
static struct Rung *rung_tail = &rung_zero;
/**
* Maximum number of concurrent routes this peer will support.
*/
static unsigned long long max_routes;
/**
* Maximum number of envelopes we will buffer at this peer.
*/
static unsigned long long max_buffers;
/**
* Current number of envelopes we have buffered at this peer.
*/
static unsigned long long cur_buffers;
/**
* Task to timeout routes.
*/
static struct GNUNET_SCHEDULER_Task *timeout_task;
/**
* Get the route corresponding to a hash.
*
* @param cid hash generated from the connection identifier
*/
static struct CadetRoute *
get_route (const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid)
{
return GNUNET_CONTAINER_multishortmap_get (routes,
&cid->connection_of_tunnel);
}
/**
* Lower the rung in which @a dir is by 1.
*
* @param dir direction to lower in rung.
*/
static void
lower_rung (struct RouteDirection *dir)
{
struct Rung *rung = dir->rung;
struct Rung *prev;
GNUNET_CONTAINER_DLL_remove (rung->rd_head,
rung->rd_tail,
dir);
prev = rung->prev;
GNUNET_assert (NULL != prev);
if (prev->rung_off != rung->rung_off - 1)
{
prev = GNUNET_new (struct Rung);
prev->rung_off = rung->rung_off - 1;
GNUNET_CONTAINER_DLL_insert_after (rung_head,
rung_tail,
rung->prev,
prev);
}
GNUNET_assert (NULL != prev);
GNUNET_CONTAINER_DLL_insert (prev->rd_head,
prev->rd_tail,
dir);
dir->rung = prev;
}
/**
* Discard the buffer @a env from the route direction @a dir and
* move @a dir down a rung.
*
* @param dir direction that contains the @a env in the buffer
* @param env envelope to discard
*/
static void
discard_buffer (struct RouteDirection *dir,
struct GNUNET_MQ_Envelope *env)
{
GNUNET_MQ_dll_remove (&dir->env_head,
&dir->env_tail,
env);
cur_buffers--;
GNUNET_MQ_discard (env);
lower_rung (dir);
GNUNET_STATISTICS_set (stats,
"# buffer use",
cur_buffers,
GNUNET_NO);
}
/**
* Discard all messages from the highest rung, to make space.
*/
static void
discard_all_from_rung_tail ()
{
struct Rung *tail = rung_tail;
struct RouteDirection *dir;
while (NULL != (dir = tail->rd_head))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queue full due new message %s on connection %s, dropping old message\n",
GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
GNUNET_STATISTICS_update (stats,
"# messages dropped due to full buffer",
1,
GNUNET_NO);
discard_buffer (dir,
dir->env_head);
}
GNUNET_CONTAINER_DLL_remove (rung_head,
rung_tail,
tail);
GNUNET_free (tail);
}
/**
* We message @a msg from @a prev. Find its route by @a cid and
* forward to the next hop. Drop and signal broken route if we do not
* have a route.
*
* @param prev previous hop (sender)
* @param cid connection identifier, tells us which route to use
* @param msg the message to forward
*/
static void
route_message (struct CadetPeer *prev,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
const struct GNUNET_MessageHeader *msg)
{
struct CadetRoute *route;
struct RouteDirection *dir;
struct Rung *rung;
struct Rung *nxt;
struct GNUNET_MQ_Envelope *env;
route = get_route (cid);
if (NULL == route)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_ConnectionBrokenMessage *bm;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Failed to route message of type %u from %s on connection %s: no route\n",
ntohs (msg->type),
GCP_2s (prev),
GNUNET_sh2s (&cid->connection_of_tunnel));
switch (ntohs (msg->type))
{
case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY:
case GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN:
/* No need to respond to these! */
return;
}
env = GNUNET_MQ_msg (bm,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
bm->cid = *cid;
bm->peer1 = my_full_id;
GCP_send_ooo (prev,
env);
return;
}
route->last_use = GNUNET_TIME_absolute_get ();
GNUNET_CONTAINER_heap_update_cost (route->hn,
route->last_use.abs_value_us);
dir = (prev == route->prev.hop) ? &route->next : &route->prev;
if (GNUNET_YES == dir->is_ready)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Routing message of type %u from %s to %s on connection %s\n",
ntohs (msg->type),
GCP_2s (prev),
GNUNET_i2s (GCP_get_id (dir->hop)),
GNUNET_sh2s (&cid->connection_of_tunnel));
dir->is_ready = GNUNET_NO;
GCP_send (dir->mqm,
GNUNET_MQ_msg_copy (msg));
return;
}
/* Check if buffering is disallowed, and if so, make sure we only queue
one message per direction. */
if ( (0 != (route->options & GNUNET_CADET_OPTION_NOBUFFER)) &&
(NULL != dir->env_head) )
discard_buffer (dir,
dir->env_head);
/* Check for duplicates */
for (const struct GNUNET_MQ_Envelope *env = dir->env_head;
NULL != env;
env = GNUNET_MQ_env_next (env))
{
const struct GNUNET_MessageHeader *hdr = GNUNET_MQ_env_get_msg (env);
if ( (hdr->size == msg->size) &&
(0 == memcmp (hdr,
msg,
ntohs (msg->size))) )
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received duplicate of message already in buffer, dropping\n");
GNUNET_STATISTICS_update (stats,
"# messages dropped due to duplicate in buffer",
1,
GNUNET_NO);
return;
}
}
rung = dir->rung;
if (cur_buffers == max_buffers)
{
/* Need to make room. */
if (NULL != rung->next)
{
/* Easy case, drop messages from route directions in highest rung */
discard_all_from_rung_tail ();
}
else
{
/* We are in the highest rung, drop our own! */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queue full due new message %s on connection %s, dropping old message\n",
GNUNET_sh2s (&dir->my_route->cid.connection_of_tunnel));
GNUNET_STATISTICS_update (stats,
"# messages dropped due to full buffer",
1,
GNUNET_NO);
discard_buffer (dir,
dir->env_head);
rung = dir->rung;
}
}
/* remove 'dir' from current rung */
GNUNET_CONTAINER_DLL_remove (rung->rd_head,
rung->rd_tail,
dir);
/* make 'nxt' point to the next higher rung, create if necessary */
nxt = rung->next;
if ( (NULL == nxt) ||
(rung->rung_off + 1 != nxt->rung_off) )
{
nxt = GNUNET_new (struct Rung);
nxt->rung_off = rung->rung_off + 1;
GNUNET_CONTAINER_DLL_insert_after (rung_head,
rung_tail,
rung,
nxt);
}
/* insert 'dir' into next higher rung */
GNUNET_CONTAINER_DLL_insert (nxt->rd_head,
nxt->rd_tail,
dir);
dir->rung = nxt;
/* add message into 'dir' buffer */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Queueing new message of type %u from %s to %s on connection %s\n",
ntohs (msg->type),
GCP_2s (prev),
GNUNET_i2s (GCP_get_id (dir->hop)),
GNUNET_sh2s (&cid->connection_of_tunnel));
env = GNUNET_MQ_msg_copy (msg);
GNUNET_MQ_dll_insert_tail (&dir->env_head,
&dir->env_tail,
env);
cur_buffers++;
GNUNET_STATISTICS_set (stats,
"# buffer use",
cur_buffers,
GNUNET_NO);
/* Clean up 'rung' if now empty (and not head) */
if ( (NULL == rung->rd_head) &&
(rung != rung_head) )
{
GNUNET_CONTAINER_DLL_remove (rung_head,
rung_tail,
rung);
GNUNET_free (rung);
}
}
/**
* Check if the create_connection message has the appropriate size.
*
* @param cls Closure (unused).
* @param msg Message to check.
*
* @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
*/
static int
check_connection_create (void *cls,
const struct GNUNET_CADET_ConnectionCreateMessage *msg)
{
uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
if (0 != (size % sizeof (struct GNUNET_PeerIdentity)))
{
GNUNET_break_op (0);
return GNUNET_NO;
}
return GNUNET_YES;
}
/**
* Free internal data of a route direction.
*
* @param dir direction to destroy (do NOT free memory of 'dir' itself)
*/
static void
destroy_direction (struct RouteDirection *dir)
{
struct GNUNET_MQ_Envelope *env;
while (NULL != (env = dir->env_head))
{
GNUNET_STATISTICS_update (stats,
"# messages dropped due to route destruction",
1,
GNUNET_NO);
discard_buffer (dir,
env);
}
if (NULL != dir->mqm)
{
GCP_request_mq_cancel (dir->mqm,
NULL);
dir->mqm = NULL;
}
GNUNET_CONTAINER_DLL_remove (rung_head->rd_head,
rung_head->rd_tail,
dir);
}
/**
* Destroy our state for @a route.
*
* @param route route to destroy
*/
static void
destroy_route (struct CadetRoute *route)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Destroying route from %s to %s of connection %s\n",
GNUNET_i2s (GCP_get_id (route->prev.hop)),
GNUNET_i2s2 (GCP_get_id (route->next.hop)),
GNUNET_sh2s (&route->cid.connection_of_tunnel));
GNUNET_assert (route ==
GNUNET_CONTAINER_heap_remove_node (route->hn));
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multishortmap_remove (routes,
&route->cid.connection_of_tunnel,
route));
GNUNET_STATISTICS_set (stats,
"# routes",
GNUNET_CONTAINER_multishortmap_size (routes),
GNUNET_NO);
destroy_direction (&route->prev);
destroy_direction (&route->next);
GNUNET_free (route);
}
/**
* Send message that a route is broken between @a peer1 and @a peer2.
*
* @param target where to send the message
* @param cid connection identifier to use
* @param peer1 one of the peers where a link is broken
* @param peer2 another one of the peers where a link is broken
*/
static void
send_broken (struct RouteDirection *target,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
const struct GNUNET_PeerIdentity *peer1,
const struct GNUNET_PeerIdentity *peer2)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_ConnectionBrokenMessage *bm;
if (NULL == target->mqm)
return; /* Can't send notification, connection is down! */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Notifying %s about BROKEN route at %s-%s of connection %s\n",
GCP_2s (target->hop),
GNUNET_i2s (peer1),
GNUNET_i2s2 (peer2),
GNUNET_sh2s (&cid->connection_of_tunnel));
env = GNUNET_MQ_msg (bm,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
bm->cid = *cid;
if (NULL != peer1)
bm->peer1 = *peer1;
if (NULL != peer2)
bm->peer2 = *peer2;
GCP_request_mq_cancel (target->mqm,
env);
target->mqm = NULL;
}
/**
* Function called to check if any routes have timed out, and if
* so, to clean them up. Finally, schedules itself again at the
* earliest time where there might be more work.
*
* @param cls NULL
*/
static void
timeout_cb (void *cls)
{
struct CadetRoute *r;
struct GNUNET_TIME_Relative linger;
struct GNUNET_TIME_Absolute exp;
timeout_task = NULL;
linger = GNUNET_TIME_relative_multiply (keepalive_period,
3);
while (NULL != (r = GNUNET_CONTAINER_heap_peek (route_heap)))
{
exp = GNUNET_TIME_absolute_add (r->last_use,
linger);
if (0 != GNUNET_TIME_absolute_get_remaining (exp).rel_value_us)
{
/* Route not yet timed out, wait until it does. */
timeout_task = GNUNET_SCHEDULER_add_at (exp,
&timeout_cb,
NULL);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Sending BROKEN due to timeout (%s was last use, %s linger)\n",
GNUNET_STRINGS_absolute_time_to_string (r->last_use),
GNUNET_STRINGS_relative_time_to_string (linger,
GNUNET_YES));
send_broken (&r->prev,
&r->cid,
NULL,
NULL);
send_broken (&r->next,
&r->cid,
NULL,
NULL);
destroy_route (r);
}
/* No more routes left, so no need for a #timeout_task */
}
/**
* Function called when the message queue to the previous hop
* becomes available/unavailable. We expect this function to
* be called immediately when we register, and then again
* later if the connection ever goes down.
*
* @param cls the `struct RouteDirection`
* @param available #GNUNET_YES if sending is now possible,
* #GNUNET_NO if sending is no longer possible
* #GNUNET_SYSERR if sending is no longer possible
* and the last envelope was discarded
*/
static void
dir_ready_cb (void *cls,
int ready)
{
struct RouteDirection *dir = cls;
struct CadetRoute *route = dir->my_route;
struct RouteDirection *odir;
if (GNUNET_YES == ready)
{
struct GNUNET_MQ_Envelope *env;
dir->is_ready = GNUNET_YES;
if (NULL != (env = dir->env_head))
{
GNUNET_MQ_dll_remove (&dir->env_head,
&dir->env_tail,
env);
cur_buffers--;
GNUNET_STATISTICS_set (stats,
"# buffer use",
cur_buffers,
GNUNET_NO);
lower_rung (dir);
dir->is_ready = GNUNET_NO;
GCP_send (dir->mqm,
env);
}
return;
}
odir = (dir == &route->next) ? &route->prev : &route->next;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending BROKEN due to MQ going down\n");
send_broken (&route->next,
&route->cid,
GCP_get_id (odir->hop),
&my_full_id);
destroy_route (route);
}
/**
* Initialize one of the directions of a route.
*
* @param route route the direction belongs to
* @param dir direction to initialize
* @param hop next hop on in the @a dir
*/
static void
dir_init (struct RouteDirection *dir,
struct CadetRoute *route,
struct CadetPeer *hop)
{
dir->hop = hop;
dir->my_route = route;
dir->mqm = GCP_request_mq (hop,
&dir_ready_cb,
dir);
GNUNET_CONTAINER_DLL_insert (rung_head->rd_head,
rung_head->rd_tail,
dir);
dir->rung = rung_head;
GNUNET_assert (GNUNET_YES == dir->is_ready);
}
/**
* We could not create the desired route. Send a
* #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
* message to @a target.
*
* @param target who should receive the message
* @param cid identifier of the connection/route that failed
* @param failure_at neighbour with which we failed to route,
* or NULL.
*/
static void
send_broken_without_mqm (struct CadetPeer *target,
const struct GNUNET_CADET_ConnectionTunnelIdentifier *cid,
const struct GNUNET_PeerIdentity *failure_at)
{
struct GNUNET_MQ_Envelope *env;
struct GNUNET_CADET_ConnectionBrokenMessage *bm;
env = GNUNET_MQ_msg (bm,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN);
bm->cid = *cid;
bm->peer1 = my_full_id;
if (NULL != failure_at)
bm->peer2 = *failure_at;
GCP_send_ooo (target,
env);
}
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE
*
* @param cls Closure (CadetPeer for neighbor that sent the message).
* @param msg Message itself.
*/
static void
handle_connection_create (void *cls,
const struct GNUNET_CADET_ConnectionCreateMessage *msg)
{
struct CadetPeer *sender = cls;
struct CadetPeer *next;
const struct GNUNET_PeerIdentity *pids = (const struct GNUNET_PeerIdentity *) &msg[1];
struct CadetRoute *route;
uint16_t size = ntohs (msg->header.size) - sizeof (*msg);
unsigned int path_length;
unsigned int off;
enum GNUNET_CADET_ChannelOption options;
options = (enum GNUNET_CADET_ChannelOption) ntohl (msg->options);
path_length = size / sizeof (struct GNUNET_PeerIdentity);
if (0 == path_length)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Dropping CADET_CONNECTION_CREATE with empty path\n");
GNUNET_break_op (0);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Handling CADET_CONNECTION_CREATE from %s for CID %s with %u hops\n",
GCP_2s (sender),
GNUNET_sh2s (&msg->cid.connection_of_tunnel),
path_length);
/* Check for loops */
{
struct GNUNET_CONTAINER_MultiPeerMap *map;
map = GNUNET_CONTAINER_multipeermap_create (path_length * 2,
GNUNET_YES);
GNUNET_assert (NULL != map);
for (unsigned int i=0;icid)))
{
/* Duplicate CREATE, pass it on, previous one might have been lost! */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Passing on duplicate CADET_CONNECTION_CREATE message on connection %s\n",
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
route_message (sender,
&msg->cid,
&msg->header);
return;
}
if (off == path_length - 1)
{
/* We are the destination, create connection */
struct CadetConnection *cc;
struct CadetPeerPath *path;
struct CadetPeer *origin;
cc = GCC_lookup (&msg->cid);
if (NULL != cc)
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received duplicate CADET_CONNECTION_CREATE message on connection %s\n",
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
GCC_handle_duplicate_create (cc);
return;
}
origin = GCP_get (&pids[0],
GNUNET_YES);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"I am destination for CADET_CONNECTION_CREATE message from %s for connection %s, building inverse path\n",
GCP_2s (origin),
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
path = GCPP_get_path_from_route (path_length - 1,
pids);
if (GNUNET_OK !=
GCT_add_inbound_connection (GCP_get_tunnel (origin,
GNUNET_YES),
&msg->cid,
(enum GNUNET_CADET_ChannelOption) ntohl (msg->options),
path))
{
/* Send back BROKEN: duplicate connection on the same path,
we will use the other one. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CADET_CONNECTION_CREATE from %s for %s, but %s already has a connection. Sending BROKEN\n",
GCP_2s (sender),
GNUNET_sh2s (&msg->cid.connection_of_tunnel),
GCPP_2s (path));
send_broken_without_mqm (sender,
&msg->cid,
NULL);
return;
}
return;
}
/* We are merely a hop on the way, check if we can support the route */
next = GCP_get (&pids[off + 1],
GNUNET_NO);
if ( (NULL == next) ||
(GNUNET_NO == GCP_has_core_connection (next)) )
{
/* unworkable, send back BROKEN notification */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is down. Sending BROKEN\n",
GCP_2s (sender),
GNUNET_sh2s (&msg->cid.connection_of_tunnel),
GNUNET_i2s (&pids[off + 1]),
off + 1);
send_broken_without_mqm (sender,
&msg->cid,
&pids[off + 1]);
return;
}
if (max_routes <= GNUNET_CONTAINER_multishortmap_size (routes))
{
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CADET_CONNECTION_CREATE from %s for %s. We have reached our route limit. Sending BROKEN\n",
GCP_2s (sender),
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
send_broken_without_mqm (sender,
&msg->cid,
&pids[off - 1]);
return;
}
/* Workable route, create routing entry */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CADET_CONNECTION_CREATE from %s for %s. Next hop %s:%u is up. Creating route\n",
GCP_2s (sender),
GNUNET_sh2s (&msg->cid.connection_of_tunnel),
GNUNET_i2s (&pids[off + 1]),
off + 1);
route = GNUNET_new (struct CadetRoute);
route->options = options;
route->cid = msg->cid;
route->last_use = GNUNET_TIME_absolute_get ();
dir_init (&route->prev,
route,
sender);
dir_init (&route->next,
route,
next);
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multishortmap_put (routes,
&route->cid.connection_of_tunnel,
route,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
GNUNET_STATISTICS_set (stats,
"# routes",
GNUNET_CONTAINER_multishortmap_size (routes),
GNUNET_NO);
route->hn = GNUNET_CONTAINER_heap_insert (route_heap,
route,
route->last_use.abs_value_us);
if (NULL == timeout_task)
timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (keepalive_period,
3),
&timeout_cb,
NULL);
/* also pass CREATE message along to next hop */
route_message (sender,
&msg->cid,
&msg->header);
}
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK
*
* @param cls Closure (CadetPeer for neighbor that sent the message).
* @param msg Message itself.
*/
static void
handle_connection_create_ack (void *cls,
const struct GNUNET_CADET_ConnectionCreateAckMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
/* First, check if ACK belongs to a connection that ends here. */
cc = GCC_lookup (&msg->cid);
if (NULL != cc)
{
/* verify ACK came from the right direction */
unsigned int len;
struct CadetPeerPath *path = GCC_get_path (cc,
&len);
if (peer !=
GCPP_get_peer_at_offset (path,
0))
{
/* received ACK from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CONNECTION_CREATE_ACK for connection %s.\n",
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
GCC_handle_connection_create_ack (cc);
return;
}
/* We're just an intermediary peer, route the message along its path */
route_message (peer,
&msg->cid,
&msg->header);
}
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN
*
* @param cls Closure (CadetPeer for neighbor that sent the message).
* @param msg Message itself.
* @deprecated duplicate logic with #handle_destroy(); dedup!
*/
static void
handle_connection_broken (void *cls,
const struct GNUNET_CADET_ConnectionBrokenMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
struct CadetRoute *route;
/* First, check if message belongs to a connection that ends here. */
cc = GCC_lookup (&msg->cid);
if (NULL != cc)
{
/* verify message came from the right direction */
unsigned int len;
struct CadetPeerPath *path = GCC_get_path (cc,
&len);
if (peer !=
GCPP_get_peer_at_offset (path,
0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CONNECTION_BROKEN for connection %s. Destroying it.\n",
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
GCC_destroy_without_core (cc);
/* FIXME: also destroy the path up to the specified link! */
return;
}
/* We're just an intermediary peer, route the message along its path */
route_message (peer,
&msg->cid,
&msg->header);
route = get_route (&msg->cid);
if (NULL != route)
destroy_route (route);
/* FIXME: also destroy paths we MAY have up to the specified link! */
}
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY
*
* @param cls Closure (CadetPeer for neighbor that sent the message).
* @param msg Message itself.
*/
static void
handle_connection_destroy (void *cls,
const struct GNUNET_CADET_ConnectionDestroyMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
struct CadetRoute *route;
/* First, check if message belongs to a connection that ends here. */
cc = GCC_lookup (&msg->cid);
if (NULL != cc)
{
/* verify message came from the right direction */
unsigned int len;
struct CadetPeerPath *path = GCC_get_path (cc,
&len);
if (peer !=
GCPP_get_peer_at_offset (path,
0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CONNECTION_DESTROY for connection %s. Destroying connection.\n",
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
GCC_destroy_without_core (cc);
return;
}
/* We're just an intermediary peer, route the message along its path */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Received CONNECTION_DESTROY for connection %s. Destroying route.\n",
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
route_message (peer,
&msg->cid,
&msg->header);
route = get_route (&msg->cid);
if (NULL != route)
destroy_route (route);
}
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX
*
* @param cls Closure (CadetPeer for neighbor that sent the message).
* @param msg Message itself.
*/
static void
handle_tunnel_kx (void *cls,
const struct GNUNET_CADET_TunnelKeyExchangeMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
/* First, check if message belongs to a connection that ends here. */
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Routing KX with ephemeral %s on CID %s\n",
GNUNET_e2s (&msg->ephemeral_key),
GNUNET_sh2s (&msg->cid.connection_of_tunnel));
cc = GCC_lookup (&msg->cid);
if (NULL != cc)
{
/* verify message came from the right direction */
unsigned int len;
struct CadetPeerPath *path = GCC_get_path (cc,
&len);
if (peer !=
GCPP_get_peer_at_offset (path,
0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
GCC_handle_kx (cc,
msg);
return;
}
/* We're just an intermediary peer, route the message along its path */
route_message (peer,
&msg->cid,
&msg->header);
}
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH
*
* @param cls Closure (CadetPeer for neighbor that sent the message).
* @param msg Message itself.
*/
static void
handle_tunnel_kx_auth (void *cls,
const struct GNUNET_CADET_TunnelKeyExchangeAuthMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
/* First, check if message belongs to a connection that ends here. */
cc = GCC_lookup (&msg->kx.cid);
if (NULL != cc)
{
/* verify message came from the right direction */
unsigned int len;
struct CadetPeerPath *path = GCC_get_path (cc,
&len);
if (peer !=
GCPP_get_peer_at_offset (path,
0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
GCC_handle_kx_auth (cc,
msg);
return;
}
/* We're just an intermediary peer, route the message along its path */
route_message (peer,
&msg->kx.cid,
&msg->kx.header);
}
/**
* Check if the encrypted message has the appropriate size.
*
* @param cls Closure (unused).
* @param msg Message to check.
*
* @return #GNUNET_YES if size is correct, #GNUNET_NO otherwise.
*/
static int
check_tunnel_encrypted (void *cls,
const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
{
return GNUNET_YES;
}
/**
* Handle for #GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED.
*
* @param cls Closure (CadetPeer for neighbor that sent the message).
* @param msg Message itself.
*/
static void
handle_tunnel_encrypted (void *cls,
const struct GNUNET_CADET_TunnelEncryptedMessage *msg)
{
struct CadetPeer *peer = cls;
struct CadetConnection *cc;
/* First, check if message belongs to a connection that ends here. */
cc = GCC_lookup (&msg->cid);
if (NULL != cc)
{
/* verify message came from the right direction */
unsigned int len;
struct CadetPeerPath *path = GCC_get_path (cc,
&len);
if (peer !=
GCPP_get_peer_at_offset (path,
0))
{
/* received message from unexpected direction, ignore! */
GNUNET_break_op (0);
return;
}
GCC_handle_encrypted (cc,
msg);
return;
}
/* We're just an intermediary peer, route the message along its path */
route_message (peer,
&msg->cid,
&msg->header);
}
/**
* Function called after #GNUNET_CORE_connect has succeeded (or failed
* for good). Note that the private key of the peer is intentionally
* not exposed here; if you need it, your process should try to read
* the private key file directly (which should work if you are
* authorized...). Implementations of this function must not call
* #GNUNET_CORE_disconnect (other than by scheduling a new task to
* do this later).
*
* @param cls closure
* @param my_identity ID of this peer, NULL if we failed
*/
static void
core_init_cb (void *cls,
const struct GNUNET_PeerIdentity *my_identity)
{
if (NULL == my_identity)
{
GNUNET_break (0);
return;
}
GNUNET_break (0 ==
memcmp (my_identity,
&my_full_id,
sizeof (struct GNUNET_PeerIdentity)));
}
/**
* Method called whenever a given peer connects.
*
* @param cls closure
* @param peer peer identity this notification is about
*/
static void *
core_connect_cb (void *cls,
const struct GNUNET_PeerIdentity *peer,
struct GNUNET_MQ_Handle *mq)
{
struct CadetPeer *cp;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"CORE connection to peer %s was established.\n",
GNUNET_i2s (peer));
cp = GCP_get (peer,
GNUNET_YES);
GCP_set_mq (cp,
mq);
return cp;
}
/**
* Method called whenever a peer disconnects.
*
* @param cls closure
* @param peer peer identity this notification is about
*/
static void
core_disconnect_cb (void *cls,
const struct GNUNET_PeerIdentity *peer,
void *peer_cls)
{
struct CadetPeer *cp = peer_cls;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"CORE connection to peer %s went down.\n",
GNUNET_i2s (peer));
GCP_set_mq (cp,
NULL);
}
/**
* Initialize the CORE subsystem.
*
* @param c Configuration.
*/
void
GCO_init (const struct GNUNET_CONFIGURATION_Handle *c)
{
struct GNUNET_MQ_MessageHandler handlers[] = {
GNUNET_MQ_hd_var_size (connection_create,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE,
struct GNUNET_CADET_ConnectionCreateMessage,
NULL),
GNUNET_MQ_hd_fixed_size (connection_create_ack,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_CREATE_ACK,
struct GNUNET_CADET_ConnectionCreateAckMessage,
NULL),
GNUNET_MQ_hd_fixed_size (connection_broken,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_BROKEN,
struct GNUNET_CADET_ConnectionBrokenMessage,
NULL),
GNUNET_MQ_hd_fixed_size (connection_destroy,
GNUNET_MESSAGE_TYPE_CADET_CONNECTION_DESTROY,
struct GNUNET_CADET_ConnectionDestroyMessage,
NULL),
GNUNET_MQ_hd_fixed_size (tunnel_kx,
GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX,
struct GNUNET_CADET_TunnelKeyExchangeMessage,
NULL),
GNUNET_MQ_hd_fixed_size (tunnel_kx_auth,
GNUNET_MESSAGE_TYPE_CADET_TUNNEL_KX_AUTH,
struct GNUNET_CADET_TunnelKeyExchangeAuthMessage,
NULL),
GNUNET_MQ_hd_var_size (tunnel_encrypted,
GNUNET_MESSAGE_TYPE_CADET_TUNNEL_ENCRYPTED,
struct GNUNET_CADET_TunnelEncryptedMessage,
NULL),
GNUNET_MQ_handler_end ()
};
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"CADET",
"MAX_ROUTES",
&max_routes))
max_routes = 5000;
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (c,
"CADET",
"MAX_MSGS_QUEUE",
&max_buffers))
max_buffers = 10000;
routes = GNUNET_CONTAINER_multishortmap_create (1024,
GNUNET_NO);
route_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
core = GNUNET_CORE_connect (c,
NULL,
&core_init_cb,
&core_connect_cb,
&core_disconnect_cb,
handlers);
}
/**
* Shut down the CORE subsystem.
*/
void
GCO_shutdown ()
{
if (NULL != core)
{
GNUNET_CORE_disconnect (core);
core = NULL;
}
GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (routes));
GNUNET_CONTAINER_multishortmap_destroy (routes);
routes = NULL;
GNUNET_CONTAINER_heap_destroy (route_heap);
route_heap = NULL;
if (NULL != timeout_task)
{
GNUNET_SCHEDULER_cancel (timeout_task);
timeout_task = NULL;
}
}
/* end of gnunet-cadet-service_core.c */