aboutsummaryrefslogtreecommitdiff
path: root/src/core/gnunet-service-core_neighbours.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/core/gnunet-service-core_neighbours.c')
-rw-r--r--src/core/gnunet-service-core_neighbours.c528
1 files changed, 528 insertions, 0 deletions
diff --git a/src/core/gnunet-service-core_neighbours.c b/src/core/gnunet-service-core_neighbours.c
new file mode 100644
index 0000000..c4db40a
--- /dev/null
+++ b/src/core/gnunet-service-core_neighbours.c
@@ -0,0 +1,528 @@
+/*
+ This file is part of GNUnet.
+ (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 3, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file core/gnunet-service-core_neighbours.c
+ * @brief code for managing low-level 'plaintext' connections with transport (key exchange may or may not be done yet)
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_statistics_service.h"
+#include "gnunet_transport_service.h"
+#include "gnunet-service-core.h"
+#include "gnunet-service-core_neighbours.h"
+#include "gnunet-service-core_kx.h"
+#include "gnunet-service-core_sessions.h"
+#include "gnunet_constants.h"
+
+
+/**
+ * Message ready for transmission via transport service. This struct
+ * is followed by the actual content of the message.
+ */
+struct NeighbourMessageEntry
+{
+
+ /**
+ * We keep messages in a doubly linked list.
+ */
+ struct NeighbourMessageEntry *next;
+
+ /**
+ * We keep messages in a doubly linked list.
+ */
+ struct NeighbourMessageEntry *prev;
+
+ /**
+ * By when are we supposed to transmit this message?
+ */
+ struct GNUNET_TIME_Absolute deadline;
+
+ /**
+ * How long is the message? (number of bytes following the "struct
+ * MessageEntry", but not including the size of "struct
+ * MessageEntry" itself!)
+ */
+ size_t size;
+
+};
+
+
+/**
+ * Data kept per transport-connected peer.
+ */
+struct Neighbour
+{
+
+ /**
+ * Head of the batched message queue (already ordered, transmit
+ * starting with the head).
+ */
+ struct NeighbourMessageEntry *message_head;
+
+ /**
+ * Tail of the batched message queue (already ordered, append new
+ * messages to tail).
+ */
+ struct NeighbourMessageEntry *message_tail;
+
+ /**
+ * Handle for pending requests for transmission to this peer
+ * with the transport service. NULL if no request is pending.
+ */
+ struct GNUNET_TRANSPORT_TransmitHandle *th;
+
+ /**
+ * Information about the key exchange with the other peer.
+ */
+ struct GSC_KeyExchangeInfo *kxinfo;
+
+ /**
+ * Identity of the other peer.
+ */
+ struct GNUNET_PeerIdentity peer;
+
+ /**
+ * ID of task used for re-trying plaintext scheduling.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier retry_plaintext_task;
+
+};
+
+
+/**
+ * Map of peer identities to 'struct Neighbour'.
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *neighbours;
+
+/**
+ * Transport service.
+ */
+static struct GNUNET_TRANSPORT_Handle *transport;
+
+
+/**
+ * Find the entry for the given neighbour.
+ *
+ * @param peer identity of the neighbour
+ * @return NULL if we are not connected, otherwise the
+ * neighbour's entry.
+ */
+static struct Neighbour *
+find_neighbour (const struct GNUNET_PeerIdentity *peer)
+{
+ return GNUNET_CONTAINER_multihashmap_get (neighbours, &peer->hashPubKey);
+}
+
+
+/**
+ * Free the given entry for the neighbour.
+ *
+ * @param n neighbour to free
+ */
+static void
+free_neighbour (struct Neighbour *n)
+{
+ struct NeighbourMessageEntry *m;
+
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Destroying neighbour entry for peer `%4s'\n",
+ GNUNET_i2s (&n->peer));
+#endif
+ while (NULL != (m = n->message_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
+ GNUNET_free (m);
+ }
+ if (NULL != n->th)
+ {
+ GNUNET_TRANSPORT_notify_transmit_ready_cancel (n->th);
+ n->th = NULL;
+ }
+ GNUNET_STATISTICS_update (GSC_stats,
+ gettext_noop
+ ("# sessions terminated by transport disconnect"),
+ 1, GNUNET_NO);
+ GSC_SESSIONS_end (&n->peer);
+ if (NULL != n->kxinfo)
+ {
+ GSC_KX_stop (n->kxinfo);
+ n->kxinfo = NULL;
+ }
+ if (n->retry_plaintext_task != GNUNET_SCHEDULER_NO_TASK)
+ {
+ GNUNET_SCHEDULER_cancel (n->retry_plaintext_task);
+ n->retry_plaintext_task = GNUNET_SCHEDULER_NO_TASK;
+ }
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_remove (neighbours,
+ &n->peer.hashPubKey, n));
+ GNUNET_STATISTICS_set (GSC_stats,
+ gettext_noop ("# neighbour entries allocated"),
+ GNUNET_CONTAINER_multihashmap_size (neighbours),
+ GNUNET_NO);
+ GNUNET_free (n);
+}
+
+
+/**
+ * Check if we have encrypted messages for the specified neighbour
+ * pending, and if so, check with the transport about sending them
+ * out.
+ *
+ * @param n neighbour to check.
+ */
+static void
+process_queue (struct Neighbour *n);
+
+
+/**
+ * Function called when the transport service is ready to receive a
+ * message for the respective peer
+ *
+ * @param cls neighbour to use message from
+ * @param size number of bytes we can transmit
+ * @param buf where to copy the message
+ * @return number of bytes transmitted
+ */
+static size_t
+transmit_ready (void *cls, size_t size, void *buf)
+{
+ struct Neighbour *n = cls;
+ struct NeighbourMessageEntry *m;
+ size_t ret;
+ char *cbuf;
+
+ n->th = NULL;
+ m = n->message_head;
+ if (m == NULL)
+ {
+ GNUNET_break (0);
+ return 0;
+ }
+ GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
+ if (buf == NULL)
+ {
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Transmission of message of type %u and size %u failed\n",
+ (unsigned int)
+ ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
+ (unsigned int) m->size);
+#endif
+ GNUNET_free (m);
+ process_queue (n);
+ return 0;
+ }
+ cbuf = buf;
+ GNUNET_assert (size >= m->size);
+ memcpy (cbuf, &m[1], m->size);
+ ret = m->size;
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Copied message of type %u and size %u into transport buffer for `%4s'\n",
+ (unsigned int)
+ ntohs (((struct GNUNET_MessageHeader *) &m[1])->type),
+ (unsigned int) ret, GNUNET_i2s (&n->peer));
+#endif
+ GNUNET_free (m);
+ process_queue (n);
+ GNUNET_STATISTICS_update (GSC_stats,
+ gettext_noop
+ ("# encrypted bytes given to transport"), ret,
+ GNUNET_NO);
+ return ret;
+}
+
+
+/**
+ * Check if we have messages for the specified neighbour pending, and
+ * if so, check with the transport about sending them out.
+ *
+ * @param n neighbour to check.
+ */
+static void
+process_queue (struct Neighbour *n)
+{
+ struct NeighbourMessageEntry *m;
+
+ if (n->th != NULL)
+ return; /* request already pending */
+ m = n->message_head;
+ if (m == NULL)
+ {
+ /* notify sessions that the queue is empty and more messages
+ * could thus be queued now */
+ GSC_SESSIONS_solicit (&n->peer);
+ return;
+ }
+#if DEBUG_CORE > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Asking transport for transmission of %u bytes to `%4s' in next %llu ms\n",
+ (unsigned int) m->size, GNUNET_i2s (&n->peer),
+ (unsigned long long)
+ GNUNET_TIME_absolute_get_remaining (m->deadline).rel_value);
+#endif
+ n->th =
+ GNUNET_TRANSPORT_notify_transmit_ready (transport, &n->peer, m->size, 0,
+ GNUNET_TIME_absolute_get_remaining
+ (m->deadline), &transmit_ready,
+ n);
+ if (n->th != NULL)
+ return;
+ /* message request too large or duplicate request */
+ GNUNET_break (0);
+ /* discard encrypted message */
+ GNUNET_CONTAINER_DLL_remove (n->message_head, n->message_tail, m);
+ GNUNET_free (m);
+ process_queue (n);
+}
+
+
+
+/**
+ * Function called by transport to notify us that
+ * a peer connected to us (on the network level).
+ *
+ * @param cls closure
+ * @param peer the peer that connected
+ * @param atsi performance data
+ * @param atsi_count number of entries in ats (excluding 0-termination)
+ */
+static void
+handle_transport_notify_connect (void *cls,
+ const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_ATS_Information *atsi,
+ uint32_t atsi_count)
+{
+ struct Neighbour *n;
+
+ if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ n = find_neighbour (peer);
+ if (n != NULL)
+ {
+ /* duplicate connect notification!? */
+ GNUNET_break (0);
+ return;
+ }
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received connection from `%4s'.\n",
+ GNUNET_i2s (peer));
+#endif
+ n = GNUNET_malloc (sizeof (struct Neighbour));
+ n->peer = *peer;
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_CONTAINER_multihashmap_put (neighbours,
+ &n->peer.hashPubKey, n,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ GNUNET_STATISTICS_set (GSC_stats,
+ gettext_noop ("# neighbour entries allocated"),
+ GNUNET_CONTAINER_multihashmap_size (neighbours),
+ GNUNET_NO);
+ n->kxinfo = GSC_KX_start (peer);
+}
+
+
+/**
+ * Function called by transport telling us that a peer
+ * disconnected.
+ *
+ * @param cls closure
+ * @param peer the peer that disconnected
+ */
+static void
+handle_transport_notify_disconnect (void *cls,
+ const struct GNUNET_PeerIdentity *peer)
+{
+ struct Neighbour *n;
+
+#if DEBUG_CORE
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Peer `%4s' disconnected from us; received notification from transport.\n",
+ GNUNET_i2s (peer));
+#endif
+ n = find_neighbour (peer);
+ if (n == NULL)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ free_neighbour (n);
+}
+
+
+/**
+ * Function called by the transport for each received message.
+ *
+ * @param cls closure
+ * @param peer (claimed) identity of the other peer
+ * @param message the message
+ * @param atsi performance data
+ * @param atsi_count number of entries in ats (excluding 0-termination)
+ */
+static void
+handle_transport_receive (void *cls, const struct GNUNET_PeerIdentity *peer,
+ const struct GNUNET_MessageHeader *message,
+ const struct GNUNET_ATS_Information *atsi,
+ uint32_t atsi_count)
+{
+ struct Neighbour *n;
+ uint16_t type;
+
+#if DEBUG_CORE > 1
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message of type %u from `%4s', demultiplexing.\n",
+ (unsigned int) ntohs (message->type), GNUNET_i2s (peer));
+#endif
+ if (0 == memcmp (peer, &GSC_my_identity, sizeof (struct GNUNET_PeerIdentity)))
+ {
+ GNUNET_break (0);
+ return;
+ }
+ n = find_neighbour (peer);
+ if (n == NULL)
+ {
+ /* received message from peer that is not connected!? */
+ GNUNET_break (0);
+ return;
+ }
+ type = ntohs (message->type);
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_CORE_SET_KEY:
+ GSC_KX_handle_set_key (n->kxinfo, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_CORE_PING:
+ GSC_KX_handle_ping (n->kxinfo, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_CORE_PONG:
+ GSC_KX_handle_pong (n->kxinfo, message);
+ break;
+ case GNUNET_MESSAGE_TYPE_CORE_ENCRYPTED_MESSAGE:
+ GSC_KX_handle_encrypted_message (n->kxinfo, message, atsi, atsi_count);
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ _
+ ("Unsupported message of type %u (%u bytes) received from peer `%s'\n"),
+ (unsigned int) type, (unsigned int) ntohs (message->size),
+ GNUNET_i2s (peer));
+ return;
+ }
+}
+
+
+/**
+ * Transmit the given message to the given target.
+ *
+ * @param target peer that should receive the message (must be connected)
+ * @param msg message to transmit
+ * @param timeout by when should the transmission be done?
+ */
+void
+GSC_NEIGHBOURS_transmit (const struct GNUNET_PeerIdentity *target,
+ const struct GNUNET_MessageHeader *msg,
+ struct GNUNET_TIME_Relative timeout)
+{
+ struct NeighbourMessageEntry *me;
+ struct Neighbour *n;
+ size_t msize;
+
+ n = find_neighbour (target);
+ if (NULL == n)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ msize = ntohs (msg->size);
+ me = GNUNET_malloc (sizeof (struct NeighbourMessageEntry) + msize);
+ me->deadline = GNUNET_TIME_relative_to_absolute (timeout);
+ me->size = msize;
+ memcpy (&me[1], msg, msize);
+ GNUNET_CONTAINER_DLL_insert_tail (n->message_head, n->message_tail, me);
+ process_queue (n);
+}
+
+
+/**
+ * Initialize neighbours subsystem.
+ */
+int
+GSC_NEIGHBOURS_init ()
+{
+ neighbours = GNUNET_CONTAINER_multihashmap_create (128);
+ transport =
+ GNUNET_TRANSPORT_connect (GSC_cfg, &GSC_my_identity, NULL,
+ &handle_transport_receive,
+ &handle_transport_notify_connect,
+ &handle_transport_notify_disconnect);
+ if (NULL == transport)
+ {
+ GNUNET_CONTAINER_multihashmap_destroy (neighbours);
+ neighbours = NULL;
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Wrapper around 'free_neighbour'.
+ *
+ * @param cls unused
+ * @param key peer identity
+ * @param value the 'struct Neighbour' to free
+ * @return GNUNET_OK (continue to iterate)
+ */
+static int
+free_neighbour_helper (void *cls, const GNUNET_HashCode * key, void *value)
+{
+ struct Neighbour *n = value;
+
+ /* transport should have 'disconnected' all neighbours... */
+ GNUNET_break (0);
+ free_neighbour (n);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Shutdown neighbours subsystem.
+ */
+void
+GSC_NEIGHBOURS_done ()
+{
+ if (NULL == transport)
+ return;
+ GNUNET_TRANSPORT_disconnect (transport);
+ transport = NULL;
+ GNUNET_CONTAINER_multihashmap_iterate (neighbours, &free_neighbour_helper,
+ NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (neighbours);
+ neighbours = NULL;
+}
+
+/* end of gnunet-service-core_neighbours.c */