aboutsummaryrefslogtreecommitdiff
path: root/src/psycutil
diff options
context:
space:
mode:
authorGabor X Toth <*@tg-x.net>2016-01-12 23:26:47 +0000
committerGabor X Toth <*@tg-x.net>2016-01-12 23:26:47 +0000
commit50eaf8d7de763d25b7dae7ffdee8d7c6b5fe71ea (patch)
treea8023bdb9c9446a45792d7100303265c78713a50 /src/psycutil
parent3cbdbe18dbd56def00c0014381ff90b4ee664904 (diff)
psycutil reorg: message, env, slicer
Diffstat (limited to 'src/psycutil')
-rw-r--r--src/psycutil/Makefile.am45
-rw-r--r--src/psycutil/psyc_env.c196
-rw-r--r--src/psycutil/psyc_message.c1329
-rw-r--r--src/psycutil/psyc_slicer.c610
-rw-r--r--src/psycutil/test_psyc_env.c96
5 files changed, 2276 insertions, 0 deletions
diff --git a/src/psycutil/Makefile.am b/src/psycutil/Makefile.am
new file mode 100644
index 0000000000..2a916fe30a
--- /dev/null
+++ b/src/psycutil/Makefile.am
@@ -0,0 +1,45 @@
+# This Makefile.am is in the public domain
+AM_CPPFLAGS = -I$(top_srcdir)/src/include
+
+pkgcfgdir= $(pkgdatadir)/config.d/
+
+libexecdir= $(pkglibdir)/libexec/
+
+if MINGW
+ WINFLAGS = -Wl,--no-undefined -Wl,--export-all-symbols
+endif
+
+if USE_COVERAGE
+ AM_CFLAGS = --coverage -O0
+ XLIB = -lgcov
+endif
+
+lib_LTLIBRARIES = libgnunetpsycutil.la
+
+libgnunetpsycutil_la_SOURCES = \
+ psyc_env.c \
+ psyc_message.c \
+ psyc_slicer.c
+libgnunetpsycutil_la_LIBADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ $(GN_LIBINTL) $(XLIB)
+libgnunetpsycutil_la_LDFLAGS = \
+ $(GN_LIB_LDFLAGS) $(WINFLAGS) \
+ -version-info 0:0:0
+
+if HAVE_TESTING
+check_PROGRAMS = \
+ test_psyc_env
+endif
+
+if ENABLE_TEST_RUN
+AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;
+TESTS = $(check_PROGRAMS)
+endif
+
+test_psyc_env_SOURCES = \
+ test_psyc_env.c
+test_psyc_env_LDADD = \
+ libgnunetpsycutil.la \
+ $(top_builddir)/src/testing/libgnunettesting.la \
+ $(top_builddir)/src/util/libgnunetutil.la
diff --git a/src/psycutil/psyc_env.c b/src/psycutil/psyc_env.c
new file mode 100644
index 0000000000..9c9c1a96df
--- /dev/null
+++ b/src/psycutil/psyc_env.c
@@ -0,0 +1,196 @@
+/*
+ * This file is part of GNUnet.
+ * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @author Gabor X Toth
+ *
+ * @file
+ * Library providing operations for the @e environment of
+ * PSYC and Social messages.
+ */
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_psyc_env.h"
+
+/**
+ * Environment for a message.
+ *
+ * Contains modifiers.
+ */
+struct GNUNET_PSYC_Environment
+{
+ struct GNUNET_PSYC_Modifier *mod_head;
+ struct GNUNET_PSYC_Modifier *mod_tail;
+ size_t mod_count;
+};
+
+
+/**
+ * Create an environment.
+ *
+ * @return A newly allocated environment.
+ */
+struct GNUNET_PSYC_Environment *
+GNUNET_PSYC_env_create ()
+{
+ return GNUNET_new (struct GNUNET_PSYC_Environment);
+}
+
+
+/**
+ * Add a modifier to the environment.
+ *
+ * @param env The environment.
+ * @param oper Operation to perform.
+ * @param name Name of the variable.
+ * @param value Value of the variable.
+ * @param value_size Size of @a value.
+ */
+void
+GNUNET_PSYC_env_add (struct GNUNET_PSYC_Environment *env,
+ enum GNUNET_PSYC_Operator oper, const char *name,
+ const void *value, size_t value_size)
+{
+ struct GNUNET_PSYC_Modifier *mod = GNUNET_new (struct GNUNET_PSYC_Modifier);
+ mod->oper = oper;
+ mod->name = name;
+ mod->value = value;
+ mod->value_size = value_size;
+ GNUNET_CONTAINER_DLL_insert_tail (env->mod_head, env->mod_tail, mod);
+ env->mod_count++;
+}
+
+
+/**
+ * Get the first modifier of the environment.
+ */
+struct GNUNET_PSYC_Modifier *
+GNUNET_PSYC_env_head (const struct GNUNET_PSYC_Environment *env)
+{
+ return env->mod_head;
+}
+
+
+/**
+ * Get the last modifier of the environment.
+ */
+struct GNUNET_PSYC_Modifier *
+GNUNET_PSYC_env_tail (const struct GNUNET_PSYC_Environment *env)
+{
+ return env->mod_tail;
+}
+
+
+/**
+ * Remove a modifier from the environment.
+ */
+void
+GNUNET_PSYC_env_remove (struct GNUNET_PSYC_Environment *env,
+ struct GNUNET_PSYC_Modifier *mod)
+{
+ GNUNET_CONTAINER_DLL_remove (env->mod_head, env->mod_tail, mod);
+}
+
+
+/**
+ * Get the modifier at the beginning of an environment and remove it.
+ *
+ * @param env
+ * @param oper
+ * @param name
+ * @param value
+ * @param value_size
+ *
+ * @return
+ */
+int
+GNUNET_PSYC_env_shift (struct GNUNET_PSYC_Environment *env,
+ enum GNUNET_PSYC_Operator *oper, const char **name,
+ const void **value, size_t *value_size)
+{
+ if (NULL == env->mod_head)
+ return GNUNET_NO;
+
+ struct GNUNET_PSYC_Modifier *mod = env->mod_head;
+ *oper = mod->oper;
+ *name = mod->name;
+ *value = mod->value;
+ *value_size = mod->value_size;
+
+ GNUNET_CONTAINER_DLL_remove (env->mod_head, env->mod_tail, mod);
+ GNUNET_free (mod);
+ env->mod_count--;
+
+ return GNUNET_YES;
+}
+
+
+/**
+ * Iterate through all modifiers in the environment.
+ *
+ * @param env The environment.
+ * @param it Iterator.
+ * @param it_cls Closure for iterator.
+ */
+void
+GNUNET_PSYC_env_iterate (const struct GNUNET_PSYC_Environment *env,
+ GNUNET_PSYC_Iterator it, void *it_cls)
+{
+ struct GNUNET_PSYC_Modifier *mod;
+ for (mod = env->mod_head; NULL != mod; mod = mod->next)
+ it (it_cls, mod->oper, mod->name, mod->value, mod->value_size);
+}
+
+
+/**
+ * Get the number of modifiers in the environment.
+ *
+ * @param env The environment.
+ *
+ * @return Number of modifiers.
+ */
+size_t
+GNUNET_PSYC_env_get_count (const struct GNUNET_PSYC_Environment *env)
+{
+ return env->mod_count;
+}
+
+
+/**
+ * Destroy an environment.
+ *
+ * @param env The environment to destroy.
+ */
+void
+GNUNET_PSYC_env_destroy (struct GNUNET_PSYC_Environment *env)
+{
+ struct GNUNET_PSYC_Modifier *mod, *prev = NULL;
+ for (mod = env->mod_head; NULL != mod; mod = mod->next)
+ {
+ if (NULL != prev)
+ GNUNET_free (prev);
+ prev = mod;
+ }
+ if (NULL != prev)
+ GNUNET_free (prev);
+
+ GNUNET_free (env);
+}
diff --git a/src/psycutil/psyc_message.c b/src/psycutil/psyc_message.c
new file mode 100644
index 0000000000..8c214d2b64
--- /dev/null
+++ b/src/psycutil/psyc_message.c
@@ -0,0 +1,1329 @@
+/*
+ * This file is part of GNUnet
+ * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @file psycstore/psyc_util_lib.c
+ * @brief PSYC utilities; receiving/transmitting/logging PSYC messages.
+ * @author Gabor X Toth
+ */
+
+#include <inttypes.h>
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_psyc_util_lib.h"
+#include "gnunet_psyc_service.h"
+
+#define LOG(kind,...) GNUNET_log_from (kind, "psyc-util",__VA_ARGS__)
+
+
+struct GNUNET_PSYC_TransmitHandle
+{
+ /**
+ * Client connection to service.
+ */
+ struct GNUNET_CLIENT_MANAGER_Connection *client;
+
+ /**
+ * Message currently being received from the client.
+ */
+ struct GNUNET_MessageHeader *msg;
+
+ /**
+ * Callback to request next modifier from client.
+ */
+ GNUNET_PSYC_TransmitNotifyModifier notify_mod;
+
+ /**
+ * Closure for the notify callbacks.
+ */
+ void *notify_mod_cls;
+
+ /**
+ * Callback to request next data fragment from client.
+ */
+ GNUNET_PSYC_TransmitNotifyData notify_data;
+
+ /**
+ * Closure for the notify callbacks.
+ */
+ void *notify_data_cls;
+
+ /**
+ * Modifier of the environment that is currently being transmitted.
+ */
+ struct GNUNET_PSYC_Modifier *mod;
+
+ /**
+ *
+ */
+ const char *mod_value;
+
+ /**
+ * Number of bytes remaining to be transmitted from the current modifier value.
+ */
+ uint32_t mod_value_remaining;
+
+ /**
+ * State of the current message being received from client.
+ */
+ enum GNUNET_PSYC_MessageState state;
+
+ /**
+ * Number of PSYC_TRANSMIT_ACK messages we are still waiting for.
+ */
+ uint8_t acks_pending;
+
+ /**
+ * Is transmission paused?
+ */
+ uint8_t paused;
+
+ /**
+ * Are we currently transmitting a message?
+ */
+ uint8_t in_transmit;
+
+ /**
+ * Notify callback is currently being called.
+ */
+ uint8_t in_notify;
+
+};
+
+
+
+struct GNUNET_PSYC_ReceiveHandle
+{
+ /**
+ * Message callback.
+ */
+ GNUNET_PSYC_MessageCallback message_cb;
+
+ /**
+ * Message part callback.
+ */
+ GNUNET_PSYC_MessagePartCallback message_part_cb;
+
+ /**
+ * Closure for the callbacks.
+ */
+ void *cb_cls;
+
+ /**
+ * ID of the message being received from the PSYC service.
+ */
+ uint64_t message_id;
+
+ /**
+ * Public key of the slave from which a message is being received.
+ */
+ struct GNUNET_CRYPTO_EcdsaPublicKey slave_key;
+
+ /**
+ * State of the currently being received message from the PSYC service.
+ */
+ enum GNUNET_PSYC_MessageState state;
+
+ /**
+ * Flags for the currently being received message from the PSYC service.
+ */
+ enum GNUNET_PSYC_MessageFlags flags;
+
+ /**
+ * Expected value size for the modifier being received from the PSYC service.
+ */
+ uint32_t mod_value_size_expected;
+
+ /**
+ * Actual value size for the modifier being received from the PSYC service.
+ */
+ uint32_t mod_value_size;
+};
+
+
+/**** Messages ****/
+
+
+/**
+ * Create a PSYC message.
+ *
+ * @param method_name
+ * PSYC method for the message.
+ * @param env
+ * Environment for the message.
+ * @param data
+ * Data payload for the message.
+ * @param data_size
+ * Size of @a data.
+ *
+ * @return Message header with size information,
+ * followed by the message parts.
+ */
+struct GNUNET_PSYC_Message *
+GNUNET_PSYC_message_create (const char *method_name,
+ const struct GNUNET_PSYC_Environment *env,
+ const void *data,
+ size_t data_size)
+{
+ struct GNUNET_PSYC_Modifier *mod = NULL;
+ struct GNUNET_PSYC_MessageMethod *pmeth = NULL;
+ struct GNUNET_PSYC_MessageModifier *pmod = NULL;
+ struct GNUNET_MessageHeader *pmsg = NULL;
+ uint16_t env_size = 0;
+ if (NULL != env)
+ {
+ mod = GNUNET_PSYC_env_head (env);
+ while (NULL != mod)
+ {
+ env_size += sizeof (*pmod) + strlen (mod->name) + 1 + mod->value_size;
+ mod = mod->next;
+ }
+ }
+
+ struct GNUNET_PSYC_Message *msg;
+ uint16_t method_name_size = strlen (method_name) + 1;
+ if (method_name_size == 1)
+ return NULL;
+
+ uint16_t msg_size = sizeof (*msg) /* header */
+ + sizeof (*pmeth) + method_name_size /* method */
+ + env_size /* modifiers */
+ + ((0 < data_size) ? sizeof (*pmsg) + data_size : 0) /* data */
+ + sizeof (*pmsg); /* end of message */
+ msg = GNUNET_malloc (msg_size);
+ msg->header.size = htons (msg_size);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE); /* FIXME */
+
+ pmeth = (struct GNUNET_PSYC_MessageMethod *) &msg[1];
+ pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
+ pmeth->header.size = htons (sizeof (*pmeth) + method_name_size);
+ memcpy (&pmeth[1], method_name, method_name_size);
+
+ uint16_t p = sizeof (*msg) + sizeof (*pmeth) + method_name_size;
+ if (NULL != env)
+ {
+ mod = GNUNET_PSYC_env_head (env);
+ while (NULL != mod)
+ {
+ uint16_t mod_name_size = strlen (mod->name) + 1;
+ pmod = (struct GNUNET_PSYC_MessageModifier *) ((char *) msg + p);
+ pmod->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+ pmod->header.size = sizeof (*pmod) + mod_name_size + mod->value_size;
+ p += pmod->header.size;
+ pmod->header.size = htons (pmod->header.size);
+
+ pmod->oper = mod->oper;
+ pmod->name_size = htons (mod_name_size);
+ pmod->value_size = htonl (mod->value_size);
+
+ memcpy (&pmod[1], mod->name, mod_name_size);
+ if (0 < mod->value_size)
+ memcpy ((char *) &pmod[1] + mod_name_size, mod->value, mod->value_size);
+
+ mod = mod->next;
+ }
+ }
+
+ if (0 < data_size)
+ {
+ pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
+ pmsg->size = sizeof (*pmsg) + data_size;
+ p += pmsg->size;
+ pmsg->size = htons (pmsg->size);
+ pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
+ memcpy (&pmsg[1], data, data_size);
+ }
+
+ pmsg = (struct GNUNET_MessageHeader *) ((char *) msg + p);
+ pmsg->size = htons (sizeof (*pmsg));
+ pmsg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
+
+ GNUNET_assert (p + sizeof (*pmsg) == msg_size);
+ return msg;
+}
+
+
+void
+GNUNET_PSYC_log_message (enum GNUNET_ErrorType kind,
+ const struct GNUNET_MessageHeader *msg)
+{
+ uint16_t size = ntohs (msg->size);
+ uint16_t type = ntohs (msg->type);
+ GNUNET_log (kind, "Message of type %d and size %u:\n", type, size);
+ switch (type)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE:
+ {
+ struct GNUNET_PSYC_MessageHeader *pmsg
+ = (struct GNUNET_PSYC_MessageHeader *) msg;
+ GNUNET_log (kind, "\tID: %" PRIu64 "\tflags: %x" PRIu32 "\n",
+ GNUNET_ntohll (pmsg->message_id), ntohl (pmsg->flags));
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ {
+ struct GNUNET_PSYC_MessageMethod *meth
+ = (struct GNUNET_PSYC_MessageMethod *) msg;
+ GNUNET_log (kind, "\t%.*s\n", size - sizeof (*meth), &meth[1]);
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ struct GNUNET_PSYC_MessageModifier *mod
+ = (struct GNUNET_PSYC_MessageModifier *) msg;
+ uint16_t name_size = ntohs (mod->name_size);
+ char oper = ' ' < mod->oper ? mod->oper : ' ';
+ GNUNET_log (kind, "\t%c%.*s\t%.*s\n", oper, name_size, &mod[1],
+ size - sizeof (*mod) - name_size,
+ ((char *) &mod[1]) + name_size);
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ GNUNET_log (kind, "\t%.*s\n", size - sizeof (*msg), &msg[1]);
+ break;
+ }
+}
+
+
+/**** Transmitting messages ****/
+
+
+/**
+ * Create a transmission handle.
+ */
+struct GNUNET_PSYC_TransmitHandle *
+GNUNET_PSYC_transmit_create (struct GNUNET_CLIENT_MANAGER_Connection *client)
+{
+ struct GNUNET_PSYC_TransmitHandle *tmit = GNUNET_malloc (sizeof (*tmit));
+ tmit->client = client;
+ return tmit;
+}
+
+
+/**
+ * Destroy a transmission handle.
+ */
+void
+GNUNET_PSYC_transmit_destroy (struct GNUNET_PSYC_TransmitHandle *tmit)
+{
+ GNUNET_free (tmit);
+}
+
+
+/**
+ * Queue a message part for transmission.
+ *
+ * The message part is added to the current message buffer.
+ * When this buffer is full, it is added to the transmission queue.
+ *
+ * @param tmit
+ * Transmission handle.
+ * @param msg
+ * Message part, or NULL.
+ * @param tmit_now
+ * Transmit message now, or wait for buffer to fill up?
+ * #GNUNET_YES or #GNUNET_NO.
+ */
+static void
+transmit_queue_insert (struct GNUNET_PSYC_TransmitHandle *tmit,
+ const struct GNUNET_MessageHeader *msg,
+ uint8_t tmit_now)
+{
+ uint16_t size = (NULL != msg) ? ntohs (msg->size) : 0;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Queueing message part of type %u and size %u (tmit_now: %u)).\n",
+ NULL != msg ? ntohs (msg->type) : 0, size, tmit_now);
+
+ if (NULL != tmit->msg)
+ {
+ if (NULL == msg
+ || GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD < tmit->msg->size + size)
+ {
+ /* End of message or buffer is full, add it to transmission queue
+ * and start with empty buffer */
+ tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ tmit->msg->size = htons (tmit->msg->size);
+ GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+ tmit->msg = NULL;
+ tmit->acks_pending++;
+ }
+ else
+ {
+ /* Message fits in current buffer, append */
+ tmit->msg = GNUNET_realloc (tmit->msg, tmit->msg->size + size);
+ memcpy ((char *) tmit->msg + tmit->msg->size, msg, size);
+ tmit->msg->size += size;
+ }
+ }
+
+ if (NULL == tmit->msg && NULL != msg)
+ {
+ /* Empty buffer, copy over message. */
+ tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + size);
+ tmit->msg->size = sizeof (*tmit->msg) + size;
+ memcpy (&tmit->msg[1], msg, size);
+ }
+
+ if (NULL != tmit->msg
+ && (GNUNET_YES == tmit_now
+ || (GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD
+ < tmit->msg->size + sizeof (struct GNUNET_MessageHeader))))
+ {
+ /* End of message or buffer is full, add it to transmission queue. */
+ tmit->msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ tmit->msg->size = htons (tmit->msg->size);
+ GNUNET_CLIENT_MANAGER_transmit (tmit->client, tmit->msg);
+ tmit->msg = NULL;
+ tmit->acks_pending++;
+ }
+}
+
+
+/**
+ * Request data from client to transmit.
+ *
+ * @param tmit Transmission handle.
+ */
+static void
+transmit_data (struct GNUNET_PSYC_TransmitHandle *tmit)
+{
+ int notify_ret = GNUNET_YES;
+ uint16_t data_size = 0;
+ char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+ struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
+
+ if (NULL != tmit->notify_data)
+ {
+ data_size = GNUNET_PSYC_DATA_MAX_PAYLOAD;
+ tmit->in_notify = GNUNET_YES;
+ notify_ret = tmit->notify_data (tmit->notify_data_cls, &data_size, &msg[1]);
+ tmit->in_notify = GNUNET_NO;
+ }
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "transmit_data (ret: %d, size: %u): %.*s\n",
+ notify_ret, data_size, data_size, &msg[1]);
+ switch (notify_ret)
+ {
+ case GNUNET_NO:
+ if (0 == data_size)
+ {
+ /* Transmission paused, nothing to send. */
+ tmit->paused = GNUNET_YES;
+ return;
+ }
+ break;
+
+ case GNUNET_YES:
+ tmit->state = GNUNET_PSYC_MESSAGE_STATE_END;
+ break;
+
+ default:
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "TransmitNotifyData callback returned error when requesting data.\n");
+
+ tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
+ msg->size = htons (sizeof (*msg));
+ transmit_queue_insert (tmit, msg, GNUNET_YES);
+ tmit->in_transmit = GNUNET_NO;
+ return;
+ }
+
+ if (0 < data_size)
+ {
+ GNUNET_assert (data_size <= GNUNET_PSYC_DATA_MAX_PAYLOAD);
+ msg->size = htons (sizeof (*msg) + data_size);
+ transmit_queue_insert (tmit, msg, !notify_ret);
+ }
+
+ /* End of message. */
+ if (GNUNET_YES == notify_ret)
+ {
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END);
+ msg->size = htons (sizeof (*msg));
+ transmit_queue_insert (tmit, msg, GNUNET_YES);
+ /* FIXME: wait for ACK before setting in_transmit to no */
+ tmit->in_transmit = GNUNET_NO;
+ }
+}
+
+
+/**
+ * Request a modifier from a client to transmit.
+ *
+ * @param tmit Transmission handle.
+ */
+static void
+transmit_mod (struct GNUNET_PSYC_TransmitHandle *tmit)
+{
+ uint16_t max_data_size = 0;
+ uint16_t data_size = 0;
+ char data[GNUNET_MULTICAST_FRAGMENT_MAX_PAYLOAD] = "";
+ struct GNUNET_MessageHeader *msg = (struct GNUNET_MessageHeader *) data;
+ int notify_ret = GNUNET_YES;
+
+ switch (tmit->state)
+ {
+ case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
+ {
+ struct GNUNET_PSYC_MessageModifier *mod
+ = (struct GNUNET_PSYC_MessageModifier *) msg;
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER);
+ msg->size = sizeof (struct GNUNET_PSYC_MessageModifier);
+
+ if (NULL != tmit->notify_mod)
+ {
+ max_data_size = GNUNET_PSYC_MODIFIER_MAX_PAYLOAD;
+ data_size = max_data_size;
+ tmit->in_notify = GNUNET_YES;
+ notify_ret = tmit->notify_mod (tmit->notify_mod_cls, &data_size, &mod[1],
+ &mod->oper, &mod->value_size);
+ tmit->in_notify = GNUNET_NO;
+ }
+
+ mod->name_size = strnlen ((char *) &mod[1], data_size) + 1;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "transmit_mod (ret: %d, size: %u + %u): %.*s\n",
+ notify_ret, mod->name_size, mod->value_size, data_size, &mod[1]);
+ if (mod->name_size < data_size)
+ {
+ tmit->mod_value_remaining
+ = mod->value_size - (data_size - mod->name_size);
+ mod->value_size = htonl (mod->value_size);
+ mod->name_size = htons (mod->name_size);
+ }
+ else if (0 < data_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got invalid modifier name.\n");
+ notify_ret = GNUNET_SYSERR;
+ }
+ break;
+ }
+ case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
+ {
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT);
+ msg->size = sizeof (struct GNUNET_MessageHeader);
+
+ if (NULL != tmit->notify_mod)
+ {
+ max_data_size = GNUNET_PSYC_MOD_CONT_MAX_PAYLOAD;
+ data_size = max_data_size;
+ tmit->in_notify = GNUNET_YES;
+ notify_ret = tmit->notify_mod (tmit->notify_mod_cls,
+ &data_size, &msg[1], NULL, NULL);
+ tmit->in_notify = GNUNET_NO;
+ }
+ tmit->mod_value_remaining -= data_size;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "transmit_mod (ret: %d, size: %u): %.*s\n",
+ notify_ret, data_size, data_size, &msg[1]);
+ break;
+ }
+ default:
+ GNUNET_assert (0);
+ }
+
+ switch (notify_ret)
+ {
+ case GNUNET_NO:
+ if (0 == data_size)
+ { /* Transmission paused, nothing to send. */
+ tmit->paused = GNUNET_YES;
+ return;
+ }
+ tmit->state
+ = (0 == tmit->mod_value_remaining)
+ ? GNUNET_PSYC_MESSAGE_STATE_MODIFIER
+ : GNUNET_PSYC_MESSAGE_STATE_MOD_CONT;
+ break;
+
+ case GNUNET_YES: /* End of modifiers. */
+ GNUNET_assert (0 == tmit->mod_value_remaining);
+ break;
+
+ default:
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "TransmitNotifyModifier callback returned with error.\n");
+
+ tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
+ msg->type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
+ msg->size = htons (sizeof (*msg));
+ transmit_queue_insert (tmit, msg, GNUNET_YES);
+ tmit->in_transmit = GNUNET_NO;
+ return;
+ }
+
+ if (0 < data_size)
+ {
+ GNUNET_assert (data_size <= max_data_size);
+ msg->size = htons (msg->size + data_size);
+ transmit_queue_insert (tmit, msg, GNUNET_NO);
+ }
+
+ if (GNUNET_YES == notify_ret)
+ {
+ tmit->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
+ if (0 == tmit->acks_pending)
+ transmit_data (tmit);
+ }
+ else
+ {
+ transmit_mod (tmit);
+ }
+}
+
+
+int
+transmit_notify_env (void *cls, uint16_t *data_size, void *data, uint8_t *oper,
+ uint32_t *full_value_size)
+
+{
+ struct GNUNET_PSYC_TransmitHandle *tmit = cls;
+ uint16_t name_size = 0;
+ uint32_t value_size = 0;
+ const char *value = NULL;
+
+ if (NULL != oper)
+ { /* New modifier */
+ if (NULL != tmit->mod)
+ tmit->mod = tmit->mod->next;
+ if (NULL == tmit->mod)
+ { /* No more modifiers, continue with data */
+ *data_size = 0;
+ return GNUNET_YES;
+ }
+
+ GNUNET_assert (tmit->mod->value_size < UINT32_MAX);
+ *full_value_size = tmit->mod->value_size;
+ *oper = tmit->mod->oper;
+ name_size = strlen (tmit->mod->name) + 1;
+
+ if (name_size + tmit->mod->value_size <= *data_size)
+ {
+ value_size = tmit->mod->value_size;
+ *data_size = name_size + value_size;
+ }
+ else /* full modifier does not fit in data, continuation needed */
+ {
+ value_size = *data_size - name_size;
+ tmit->mod_value = tmit->mod->value + value_size;
+ }
+
+ memcpy (data, tmit->mod->name, name_size);
+ memcpy ((char *)data + name_size, tmit->mod->value, value_size);
+ return GNUNET_NO;
+ }
+ else
+ { /* Modifier continuation */
+ GNUNET_assert (NULL != tmit->mod_value && 0 < tmit->mod_value_remaining);
+ value = tmit->mod_value;
+ if (tmit->mod_value_remaining <= *data_size)
+ {
+ value_size = tmit->mod_value_remaining;
+ tmit->mod_value = NULL;
+ }
+ else
+ {
+ value_size = *data_size;
+ tmit->mod_value += value_size;
+ }
+
+ if (*data_size < value_size)
+ {
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Value in environment larger than buffer: %u < %zu\n",
+ *data_size, value_size);
+ *data_size = 0;
+ return GNUNET_NO;
+ }
+
+ *data_size = value_size;
+ memcpy (data, value, value_size);
+ return (NULL == tmit->mod_value) ? GNUNET_YES : GNUNET_NO;
+ }
+}
+
+
+/**
+ * Transmit a message.
+ *
+ * @param tmit
+ * Transmission handle.
+ * @param method_name
+ * Which method should be invoked.
+ * @param env
+ * Environment for the message.
+ * Should stay available until the first call to notify_data.
+ * Can be NULL if there are no modifiers or @a notify_mod is
+ * provided instead.
+ * @param notify_mod
+ * Function to call to obtain modifiers.
+ * Can be NULL if there are no modifiers or @a env is provided instead.
+ * @param notify_data
+ * Function to call to obtain fragments of the data.
+ * @param notify_cls
+ * Closure for @a notify_mod and @a notify_data.
+ * @param flags
+ * Flags for the message being transmitted.
+ *
+ * @return #GNUNET_OK if the transmission was started.
+ * #GNUNET_SYSERR if another transmission is already going on.
+ */
+int
+GNUNET_PSYC_transmit_message (struct GNUNET_PSYC_TransmitHandle *tmit,
+ const char *method_name,
+ const struct GNUNET_PSYC_Environment *env,
+ GNUNET_PSYC_TransmitNotifyModifier notify_mod,
+ GNUNET_PSYC_TransmitNotifyData notify_data,
+ void *notify_cls,
+ uint32_t flags)
+{
+ if (GNUNET_NO != tmit->in_transmit)
+ return GNUNET_SYSERR;
+ tmit->in_transmit = GNUNET_YES;
+
+ size_t size = strlen (method_name) + 1;
+ struct GNUNET_PSYC_MessageMethod *pmeth;
+ tmit->msg = GNUNET_malloc (sizeof (*tmit->msg) + sizeof (*pmeth) + size);
+ tmit->msg->size = sizeof (*tmit->msg) + sizeof (*pmeth) + size;
+
+ if (NULL != notify_mod)
+ {
+ tmit->notify_mod = notify_mod;
+ tmit->notify_mod_cls = notify_cls;
+ }
+ else
+ {
+ tmit->notify_mod = &transmit_notify_env;
+ tmit->notify_mod_cls = tmit;
+ if (NULL != env)
+ {
+ struct GNUNET_PSYC_Modifier mod = {};
+ mod.next = GNUNET_PSYC_env_head (env);
+ tmit->mod = &mod;
+
+ struct GNUNET_PSYC_Modifier *m = tmit->mod;
+ while (NULL != (m = m->next))
+ {
+ if (m->oper != GNUNET_PSYC_OP_SET)
+ flags |= GNUNET_PSYC_MASTER_TRANSMIT_STATE_MODIFY;
+ }
+ }
+ else
+ {
+ tmit->mod = NULL;
+ }
+ }
+
+ pmeth = (struct GNUNET_PSYC_MessageMethod *) &tmit->msg[1];
+ pmeth->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD);
+ pmeth->header.size = htons (sizeof (*pmeth) + size);
+ pmeth->flags = htonl (flags);
+ memcpy (&pmeth[1], method_name, size);
+
+ tmit->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
+ tmit->notify_data = notify_data;
+ tmit->notify_data_cls = notify_cls;
+
+ transmit_mod (tmit);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Resume transmission.
+ *
+ * @param tmit Transmission handle.
+ */
+void
+GNUNET_PSYC_transmit_resume (struct GNUNET_PSYC_TransmitHandle *tmit)
+{
+ if (GNUNET_YES != tmit->in_transmit || GNUNET_NO != tmit->in_notify)
+ return;
+
+ if (0 == tmit->acks_pending)
+ {
+ tmit->paused = GNUNET_NO;
+ transmit_data (tmit);
+ }
+}
+
+
+/**
+ * Abort transmission request.
+ *
+ * @param tmit Transmission handle.
+ */
+void
+GNUNET_PSYC_transmit_cancel (struct GNUNET_PSYC_TransmitHandle *tmit)
+{
+ if (GNUNET_NO == tmit->in_transmit)
+ return;
+
+ tmit->state = GNUNET_PSYC_MESSAGE_STATE_CANCEL;
+ tmit->in_transmit = GNUNET_NO;
+ tmit->paused = GNUNET_NO;
+
+ /* FIXME */
+ struct GNUNET_MessageHeader msg;
+ msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA);
+ msg.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL);
+ msg.size = htons (sizeof (msg));
+ transmit_queue_insert (tmit, &msg, GNUNET_YES);
+}
+
+
+/**
+ * Got acknowledgement of a transmitted message part, continue transmission.
+ *
+ * @param tmit Transmission handle.
+ */
+void
+GNUNET_PSYC_transmit_got_ack (struct GNUNET_PSYC_TransmitHandle *tmit)
+{
+ if (0 == tmit->acks_pending)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING, "Ignoring extraneous message ACK\n");
+ GNUNET_break (0);
+ return;
+ }
+ tmit->acks_pending--;
+
+ switch (tmit->state)
+ {
+ case GNUNET_PSYC_MESSAGE_STATE_MODIFIER:
+ case GNUNET_PSYC_MESSAGE_STATE_MOD_CONT:
+ transmit_mod (tmit);
+ break;
+
+ case GNUNET_PSYC_MESSAGE_STATE_DATA:
+ transmit_data (tmit);
+ break;
+
+ case GNUNET_PSYC_MESSAGE_STATE_END:
+ case GNUNET_PSYC_MESSAGE_STATE_CANCEL:
+ break;
+
+ default:
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Ignoring message ACK in state %u.\n", tmit->state);
+ }
+}
+
+
+/**** Receiving messages ****/
+
+
+/**
+ * Create handle for receiving messages.
+ */
+struct GNUNET_PSYC_ReceiveHandle *
+GNUNET_PSYC_receive_create (GNUNET_PSYC_MessageCallback message_cb,
+ GNUNET_PSYC_MessagePartCallback message_part_cb,
+ void *cb_cls)
+{
+ struct GNUNET_PSYC_ReceiveHandle *recv = GNUNET_malloc (sizeof (*recv));
+ recv->message_cb = message_cb;
+ recv->message_part_cb = message_part_cb;
+ recv->cb_cls = cb_cls;
+ return recv;
+}
+
+
+/**
+ * Destroy handle for receiving messages.
+ */
+void
+GNUNET_PSYC_receive_destroy (struct GNUNET_PSYC_ReceiveHandle *recv)
+{
+ GNUNET_free (recv);
+}
+
+
+/**
+ * Reset stored data related to the last received message.
+ */
+void
+GNUNET_PSYC_receive_reset (struct GNUNET_PSYC_ReceiveHandle *recv)
+{
+ recv->state = GNUNET_PSYC_MESSAGE_STATE_START;
+ recv->flags = 0;
+ recv->message_id = 0;
+ recv->mod_value_size = 0;
+ recv->mod_value_size_expected = 0;
+}
+
+
+static void
+recv_error (struct GNUNET_PSYC_ReceiveHandle *recv)
+{
+ if (NULL != recv->message_part_cb)
+ recv->message_part_cb (recv->cb_cls, NULL, recv->message_id, recv->flags,
+ 0, NULL);
+
+ if (NULL != recv->message_cb)
+ recv->message_cb (recv->cb_cls, recv->message_id, recv->flags, NULL);
+
+ GNUNET_PSYC_receive_reset (recv);
+}
+
+
+/**
+ * Handle incoming PSYC message.
+ *
+ * @param recv Receive handle.
+ * @param msg The message.
+ *
+ * @return #GNUNET_OK on success,
+ * #GNUNET_SYSERR on receive error.
+ */
+int
+GNUNET_PSYC_receive_message (struct GNUNET_PSYC_ReceiveHandle *recv,
+ const struct GNUNET_PSYC_MessageHeader *msg)
+{
+ uint16_t size = ntohs (msg->header.size);
+ uint32_t flags = ntohl (msg->flags);
+ uint64_t message_id;
+
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG,
+ (struct GNUNET_MessageHeader *) msg);
+
+ if (GNUNET_PSYC_MESSAGE_STATE_START == recv->state)
+ {
+ recv->message_id = GNUNET_ntohll (msg->message_id);
+ recv->flags = flags;
+ recv->slave_key = msg->slave_key;
+ recv->mod_value_size = 0;
+ recv->mod_value_size_expected = 0;
+ }
+ else if (GNUNET_ntohll (msg->message_id) != recv->message_id)
+ {
+ // FIXME
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Unexpected message ID. Got: %" PRIu64 ", expected: %" PRIu64 "\n",
+ GNUNET_ntohll (msg->message_id), recv->message_id);
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+ else if (flags != recv->flags)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Unexpected message flags. Got: %lu, expected: %lu\n",
+ flags, recv->flags);
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+ message_id = recv->message_id;
+
+ uint16_t pos = 0, psize = 0, ptype, size_eq, size_min;
+
+ for (pos = 0; sizeof (*msg) + pos < size; pos += psize)
+ {
+ const struct GNUNET_MessageHeader *pmsg
+ = (const struct GNUNET_MessageHeader *) ((char *) &msg[1] + pos);
+ psize = ntohs (pmsg->size);
+ ptype = ntohs (pmsg->type);
+ size_eq = size_min = 0;
+
+ if (psize < sizeof (*pmsg) || sizeof (*msg) + pos + psize > size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Dropping message of type %u with invalid size %u.\n",
+ ptype, psize);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received message part of type %u and size %u from PSYC.\n",
+ ptype, psize);
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ size_min = sizeof (struct GNUNET_PSYC_MessageMethod);
+ break;
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ size_min = sizeof (struct GNUNET_PSYC_MessageModifier);
+ break;
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ size_min = sizeof (struct GNUNET_MessageHeader);
+ break;
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ size_eq = sizeof (struct GNUNET_MessageHeader);
+ break;
+ default:
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+
+ if (! ((0 < size_eq && psize == size_eq)
+ || (0 < size_min && size_min <= psize)))
+ {
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ {
+ struct GNUNET_PSYC_MessageMethod *meth
+ = (struct GNUNET_PSYC_MessageMethod *) pmsg;
+
+ if (GNUNET_PSYC_MESSAGE_STATE_START != recv->state)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Dropping out of order message method (%u).\n",
+ recv->state);
+ /* It is normal to receive an incomplete message right after connecting,
+ * but should not happen later.
+ * FIXME: add a check for this condition.
+ */
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+
+ if ('\0' != *((char *) meth + psize - 1))
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Dropping message with malformed method. "
+ "Message ID: %" PRIu64 "\n", recv->message_id);
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+ recv->state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ if (!(GNUNET_PSYC_MESSAGE_STATE_METHOD == recv->state
+ || GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
+ || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state))
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Dropping out of order message modifier (%u).\n",
+ recv->state);
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+
+ struct GNUNET_PSYC_MessageModifier *mod
+ = (struct GNUNET_PSYC_MessageModifier *) pmsg;
+
+ uint16_t name_size = ntohs (mod->name_size);
+ recv->mod_value_size_expected = ntohl (mod->value_size);
+ recv->mod_value_size = psize - sizeof (*mod) - name_size;
+
+ if (psize < sizeof (*mod) + name_size
+ || '\0' != *((char *) &mod[1] + name_size - 1)
+ || recv->mod_value_size_expected < recv->mod_value_size)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING, "Dropping malformed modifier.\n");
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+ recv->state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ {
+ recv->mod_value_size += psize - sizeof (*pmsg);
+
+ if (!(GNUNET_PSYC_MESSAGE_STATE_MODIFIER == recv->state
+ || GNUNET_PSYC_MESSAGE_STATE_MOD_CONT == recv->state)
+ || recv->mod_value_size_expected < recv->mod_value_size)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Dropping out of order message modifier continuation "
+ "!(%u == %u || %u == %u) || %lu < %lu.\n",
+ GNUNET_PSYC_MESSAGE_STATE_MODIFIER, recv->state,
+ GNUNET_PSYC_MESSAGE_STATE_MOD_CONT, recv->state,
+ recv->mod_value_size_expected, recv->mod_value_size);
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+ break;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ {
+ if (recv->state < GNUNET_PSYC_MESSAGE_STATE_METHOD
+ || recv->mod_value_size_expected != recv->mod_value_size)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Dropping out of order message data fragment "
+ "(%u < %u || %lu != %lu).\n",
+ recv->state, GNUNET_PSYC_MESSAGE_STATE_METHOD,
+ recv->mod_value_size_expected, recv->mod_value_size);
+
+ GNUNET_break_op (0);
+ recv_error (recv);
+ return GNUNET_SYSERR;
+ }
+ recv->state = GNUNET_PSYC_MESSAGE_STATE_DATA;
+ break;
+ }
+ }
+
+ if (NULL != recv->message_part_cb)
+ recv->message_part_cb (recv->cb_cls, &recv->slave_key,
+ recv->message_id, recv->flags,
+ 0, // FIXME: data_offset
+ pmsg);
+
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ GNUNET_PSYC_receive_reset (recv);
+ break;
+ }
+ }
+
+ if (NULL != recv->message_cb)
+ recv->message_cb (recv->cb_cls, message_id, flags, msg);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Check if @a data contains a series of valid message parts.
+ *
+ * @param data_size Size of @a data.
+ * @param data Data.
+ * @param[out] first_ptype Type of first message part.
+ * @param[out] last_ptype Type of last message part.
+ *
+ * @return Number of message parts found in @a data.
+ * or GNUNET_SYSERR if the message contains invalid parts.
+ */
+int
+GNUNET_PSYC_receive_check_parts (uint16_t data_size, const char *data,
+ uint16_t *first_ptype, uint16_t *last_ptype)
+{
+ const struct GNUNET_MessageHeader *pmsg;
+ uint16_t parts = 0, ptype = 0, psize = 0, pos = 0;
+ if (NULL != first_ptype)
+ *first_ptype = 0;
+ if (NULL != last_ptype)
+ *last_ptype = 0;
+
+ for (pos = 0; pos < data_size; pos += psize, parts++)
+ {
+ pmsg = (const struct GNUNET_MessageHeader *) (data + pos);
+ GNUNET_PSYC_log_message (GNUNET_ERROR_TYPE_DEBUG, pmsg);
+ psize = ntohs (pmsg->size);
+ ptype = ntohs (pmsg->type);
+ if (0 == parts && NULL != first_ptype)
+ *first_ptype = ptype;
+ if (NULL != last_ptype
+ && *last_ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END)
+ *last_ptype = ptype;
+ if (psize < sizeof (*pmsg)
+ || pos + psize > data_size
+ || ptype < GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD
+ || GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL < ptype)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Invalid message part of type %u and size %u.\n",
+ ptype, psize);
+ return GNUNET_SYSERR;
+ }
+ /** @todo FIXME: check message part order */
+ }
+ return parts;
+}
+
+
+struct ParseMessageClosure
+{
+ struct GNUNET_PSYC_Environment *env;
+ const char **method_name;
+ const void **data;
+ uint16_t *data_size;
+ enum GNUNET_PSYC_MessageState msg_state;
+};
+
+
+static void
+parse_message_part_cb (void *cls,
+ const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key,
+ uint64_t message_id, uint32_t flags, uint64_t data_offset,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct ParseMessageClosure *pmc = cls;
+ if (NULL == msg)
+ {
+ pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+ return;
+ }
+
+ switch (ntohs (msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ {
+ struct GNUNET_PSYC_MessageMethod *
+ pmeth = (struct GNUNET_PSYC_MessageMethod *) msg;
+ *pmc->method_name = (const char *) &pmeth[1];
+ pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD;
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ struct GNUNET_PSYC_MessageModifier *
+ pmod = (struct GNUNET_PSYC_MessageModifier *) msg;
+
+ const char *name = (const char *) &pmod[1];
+ const void *value = name + ntohs (pmod->name_size);
+ GNUNET_PSYC_env_add (pmc->env, pmod->oper, name, value,
+ ntohl (pmod->value_size));
+ pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER;
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ *pmc->data = &msg[1];
+ *pmc->data_size = ntohs (msg->size) - sizeof (*msg);
+ pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA;
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_END;
+ break;
+
+ default:
+ pmc->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR;
+ }
+}
+
+
+/**
+ * Parse PSYC message.
+ *
+ * @param msg
+ * The PSYC message to parse.
+ * @param[out] method_name
+ * Pointer to the method name inside @a pmsg.
+ * @param env
+ * The environment for the message with a list of modifiers.
+ * @param[out] data
+ * Pointer to data inside @a pmsg.
+ * @param[out] data_size
+ * Size of @data is written here.
+ *
+ * @return #GNUNET_OK on success,
+ * #GNUNET_SYSERR on parse error.
+ */
+int
+GNUNET_PSYC_message_parse (const struct GNUNET_PSYC_MessageHeader *msg,
+ const char **method_name,
+ struct GNUNET_PSYC_Environment *env,
+ const void **data,
+ uint16_t *data_size)
+{
+ struct ParseMessageClosure cls;
+ cls.env = env;
+ cls.method_name = method_name;
+ cls.data = data;
+ cls.data_size = data_size;
+
+ struct GNUNET_PSYC_ReceiveHandle *
+ recv = GNUNET_PSYC_receive_create (NULL, parse_message_part_cb, &cls);
+ int ret = GNUNET_PSYC_receive_message (recv, msg);
+ GNUNET_PSYC_receive_destroy (recv);
+
+ if (GNUNET_OK != ret)
+ return GNUNET_SYSERR;
+
+ return (GNUNET_PSYC_MESSAGE_STATE_END == cls.msg_state)
+ ? GNUNET_OK
+ : GNUNET_NO;
+}
+
+
+/**
+ * Initialize PSYC message header.
+ */
+void
+GNUNET_PSYC_message_header_init (struct GNUNET_PSYC_MessageHeader *pmsg,
+ const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ uint32_t flags)
+{
+ uint16_t size = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ pmsg->header.size = htons (psize);
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->message_id = mmsg->message_id;
+ pmsg->fragment_offset = mmsg->fragment_offset;
+ pmsg->flags = htonl (flags);
+
+ memcpy (&pmsg[1], &mmsg[1], size - sizeof (*mmsg));
+}
+
+
+/**
+ * Create a new PSYC message header from a multicast message.
+ */
+struct GNUNET_PSYC_MessageHeader *
+GNUNET_PSYC_message_header_create (const struct GNUNET_MULTICAST_MessageHeader *mmsg,
+ uint32_t flags)
+{
+ struct GNUNET_PSYC_MessageHeader *pmsg;
+ uint16_t size = ntohs (mmsg->header.size);
+ uint16_t psize = sizeof (*pmsg) + size - sizeof (*mmsg);
+
+ pmsg = GNUNET_malloc (psize);
+ GNUNET_PSYC_message_header_init (pmsg, mmsg, flags);
+ return pmsg;
+}
+
+
+/**
+ * Create a new PSYC message header from a PSYC message.
+ */
+struct GNUNET_PSYC_MessageHeader *
+GNUNET_PSYC_message_header_create_from_psyc (const struct GNUNET_PSYC_Message *msg)
+{
+ uint16_t msg_size = ntohs (msg->header.size);
+ struct GNUNET_PSYC_MessageHeader *
+ pmsg = GNUNET_malloc (sizeof (*pmsg) + msg_size - sizeof (*msg));
+ pmsg->header.type = htons (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE);
+ pmsg->header.size = htons (sizeof (*pmsg) + msg_size - sizeof (*msg));
+ memcpy (&pmsg[1], &msg[1], msg_size - sizeof (*msg));
+ return pmsg;
+}
diff --git a/src/psycutil/psyc_slicer.c b/src/psycutil/psyc_slicer.c
new file mode 100644
index 0000000000..fe99124164
--- /dev/null
+++ b/src/psycutil/psyc_slicer.c
@@ -0,0 +1,610 @@
+/*
+ * This file is part of GNUnet
+ * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @author Gabor X Toth
+ *
+ * @file
+ * PSYC Slicer API
+ */
+
+#include <inttypes.h>
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_psyc_util_lib.h"
+
+#define LOG(kind,...) GNUNET_log_from (kind, "psyc-util-slicer",__VA_ARGS__)
+
+
+/**
+ * Handle for a try-and-slice instance.
+ */
+struct GNUNET_PSYC_Slicer
+{
+ /**
+ * Method handlers: H(method_name) -> SlicerMethodCallbacks
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *method_handlers;
+
+ /**
+ * Modifier handlers: H(modifier_name) -> SlicerModifierCallbacks
+ */
+ struct GNUNET_CONTAINER_MultiHashMap *modifier_handlers;
+
+ /**
+ * Currently being processed message part.
+ */
+ const struct GNUNET_MessageHeader *msg;
+
+ /**
+ * ID of currently being received message.
+ */
+ uint64_t message_id;
+
+ /**
+ * Method name of currently being received message.
+ */
+ char *method_name;
+
+ /**
+ * Name of currently processed modifier.
+ */
+ char *mod_name;
+
+ /**
+ * Value of currently processed modifier.
+ */
+ char *mod_value;
+
+ /**
+ * Public key of the nym the current message originates from.
+ */
+ struct GNUNET_CRYPTO_EcdsaPublicKey nym_pub_key;
+
+ /**
+ * Size of @a method_name (including terminating \0).
+ */
+ uint16_t method_name_size;
+
+ /**
+ * Size of @a modifier_name (including terminating \0).
+ */
+ uint16_t mod_name_size;
+
+ /**
+ * Size of modifier value fragment.
+ */
+ uint16_t mod_value_size;
+
+ /**
+ * Full size of modifier value.
+ */
+ uint16_t mod_full_value_size;
+
+ /**
+ * Remaining bytes from the value of the current modifier.
+ */
+ uint16_t mod_value_remaining;
+
+ /**
+ * Operator of currently processed modifier.
+ */
+ uint8_t mod_oper;
+};
+
+
+/**
+ * Callbacks for a slicer method handler.
+ */
+struct SlicerMethodCallbacks
+{
+ GNUNET_PSYC_MethodCallback method_cb;
+ GNUNET_PSYC_ModifierCallback modifier_cb;
+ GNUNET_PSYC_DataCallback data_cb;
+ GNUNET_PSYC_EndOfMessageCallback eom_cb;
+ void *cls;
+};
+
+
+struct SlicerMethodRemoveClosure
+{
+ struct GNUNET_PSYC_Slicer *slicer;
+ struct SlicerMethodCallbacks rm_cbs;
+};
+
+
+/**
+ * Callbacks for a slicer method handler.
+ */
+struct SlicerModifierCallbacks
+{
+ GNUNET_PSYC_ModifierCallback modifier_cb;
+ void *cls;
+};
+
+
+struct SlicerModifierRemoveClosure
+{
+ struct GNUNET_PSYC_Slicer *slicer;
+ struct SlicerModifierCallbacks rm_cbs;
+};
+
+
+/**
+ * Call a method handler for an incoming message part.
+ */
+int
+slicer_method_handler_notify (void *cls, const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_PSYC_Slicer *slicer = cls;
+ const struct GNUNET_MessageHeader *msg = slicer->msg;
+ struct SlicerMethodCallbacks *cbs = value;
+ uint16_t ptype = ntohs (msg->type);
+
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD:
+ {
+ if (NULL == cbs->method_cb)
+ break;
+ struct GNUNET_PSYC_MessageMethod *
+ meth = (struct GNUNET_PSYC_MessageMethod *) msg;
+ cbs->method_cb (cbs->cls, meth, slicer->message_id,
+ ntohl (meth->flags),
+ &slicer->nym_pub_key,
+ slicer->method_name);
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ if (NULL == cbs->modifier_cb)
+ break;
+ struct GNUNET_PSYC_MessageModifier *
+ mod = (struct GNUNET_PSYC_MessageModifier *) msg;
+ cbs->modifier_cb (cbs->cls, &mod->header, slicer->message_id,
+ mod->oper, (const char *) &mod[1],
+ (const void *) &mod[1] + ntohs (mod->name_size),
+ ntohs (mod->header.size) - sizeof (*mod) - ntohs (mod->name_size),
+ ntohs (mod->value_size));
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ {
+ if (NULL == cbs->modifier_cb)
+ break;
+ cbs->modifier_cb (cbs->cls, msg, slicer->message_id,
+ slicer->mod_oper, slicer->mod_name, &msg[1],
+ ntohs (msg->size) - sizeof (*msg),
+ slicer->mod_full_value_size);
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA:
+ {
+ if (NULL == cbs->data_cb)
+ break;
+ uint64_t data_offset = 0; // FIXME
+ cbs->data_cb (cbs->cls, msg, slicer->message_id,
+ data_offset, &msg[1], ntohs (msg->size) - sizeof (*msg));
+ break;
+ }
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END:
+ if (NULL == cbs->eom_cb)
+ break;
+ cbs->eom_cb (cbs->cls, msg, slicer->message_id, GNUNET_NO);
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_CANCEL:
+ if (NULL == cbs->eom_cb)
+ break;
+ cbs->eom_cb (cbs->cls, msg, slicer->message_id, GNUNET_YES);
+ break;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Call a method handler for an incoming message part.
+ */
+int
+slicer_modifier_handler_notify (void *cls, const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_PSYC_Slicer *slicer = cls;
+ struct SlicerModifierCallbacks *cbs = value;
+
+ cbs->modifier_cb (cbs->cls, slicer->msg, slicer->message_id, slicer->mod_oper,
+ slicer->mod_name, slicer->mod_value,
+ slicer->mod_value_size, slicer->mod_full_value_size);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Process an incoming message part and call matching handlers.
+ *
+ * @param cls
+ * Closure.
+ * @param message_id
+ * ID of the message.
+ * @param flags
+ * Flags for the message.
+ * @see enum GNUNET_PSYC_MessageFlags
+ * @param msg
+ * The message part. as it arrived from the network.
+ */
+void
+GNUNET_PSYC_slicer_message (void *cls, const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_pub_key,
+ uint64_t message_id, uint32_t flags, uint64_t fragment_offset,
+ const struct GNUNET_MessageHeader *msg)
+{
+ struct GNUNET_PSYC_Slicer *slicer = cls;
+ slicer->nym_pub_key = *slave_pub_key;
+
+ uint16_t ptype = ntohs (msg->type);
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD == ptype)
+ {
+ struct GNUNET_PSYC_MessageMethod *
+ meth = (struct GNUNET_PSYC_MessageMethod *) msg;
+ slicer->method_name_size = ntohs (meth->header.size) - sizeof (*meth);
+ slicer->method_name = GNUNET_malloc (slicer->method_name_size);
+ memcpy (slicer->method_name, &meth[1], slicer->method_name_size);
+ slicer->message_id = message_id;
+ }
+ else
+ {
+ GNUNET_assert (message_id == slicer->message_id);
+ }
+
+ char *nym_str = GNUNET_CRYPTO_ecdsa_public_key_to_string (slave_pub_key);
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Slicer received message of type %u and size %u, "
+ "with ID %" PRIu64 " and method %s from %s\n",
+ ptype, ntohs (msg->size), message_id, slicer->method_name, nym_str);
+ GNUNET_free (nym_str);
+
+ slicer->msg = msg;
+
+ /* try-and-slice modifier */
+
+ switch (ptype)
+ {
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER:
+ {
+ struct GNUNET_PSYC_MessageModifier *
+ mod = (struct GNUNET_PSYC_MessageModifier *) msg;
+ slicer->mod_oper = mod->oper;
+ slicer->mod_name_size = ntohs (mod->name_size);
+ slicer->mod_name = GNUNET_malloc (slicer->mod_name_size);
+ memcpy (slicer->mod_name, &mod[1], slicer->mod_name_size);
+ slicer->mod_value = (char *) &mod[1] + slicer->mod_name_size;
+ slicer->mod_full_value_size = ntohs (mod->value_size);
+ slicer->mod_value_remaining = slicer->mod_full_value_size;
+ slicer->mod_value_size
+ = ntohs (mod->header.size) - sizeof (*mod) - slicer->mod_name_size;
+ }
+ case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT:
+ if (ptype == GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT)
+ {
+ slicer->mod_value = (char *) &msg[1];
+ slicer->mod_value_size = ntohs (msg->size) - sizeof (*msg);
+ }
+ slicer->mod_value_remaining -= slicer->mod_value_size;
+ char *name = GNUNET_malloc (slicer->mod_name_size);
+ memcpy (name, slicer->mod_name, slicer->mod_name_size);
+ do
+ {
+ struct GNUNET_HashCode key;
+ uint16_t name_len = strlen (name);
+ GNUNET_CRYPTO_hash (name, name_len, &key);
+ GNUNET_CONTAINER_multihashmap_get_multiple (slicer->modifier_handlers, &key,
+ slicer_modifier_handler_notify,
+ slicer);
+ char *p = strrchr (name, '_');
+ if (NULL == p)
+ break;
+ *p = '\0';
+ } while (1);
+ GNUNET_free (name);
+ }
+
+ /* try-and-slice method */
+
+ char *name = GNUNET_malloc (slicer->method_name_size);
+ memcpy (name, slicer->method_name, slicer->method_name_size);
+ do
+ {
+ struct GNUNET_HashCode key;
+ uint16_t name_len = strlen (name);
+ GNUNET_CRYPTO_hash (name, name_len, &key);
+ GNUNET_CONTAINER_multihashmap_get_multiple (slicer->method_handlers, &key,
+ slicer_method_handler_notify,
+ slicer);
+ char *p = strrchr (name, '_');
+ if (NULL == p)
+ break;
+ *p = '\0';
+ } while (1);
+ GNUNET_free (name);
+
+ if (GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END <= ptype)
+ GNUNET_free (slicer->method_name);
+
+ if (0 == slicer->mod_value_remaining && NULL != slicer->mod_name)
+ {
+ GNUNET_free (slicer->mod_name);
+ slicer->mod_name = NULL;
+ slicer->mod_name_size = 0;
+ slicer->mod_value_size = 0;
+ slicer->mod_full_value_size = 0;
+ slicer->mod_oper = 0;
+ }
+
+ slicer->msg = NULL;
+}
+
+
+/**
+ * Create a try-and-slice instance.
+ *
+ * A slicer processes incoming messages and notifies callbacks about matching
+ * methods or modifiers encountered.
+ *
+ * @return A new try-and-slice construct.
+ */
+struct GNUNET_PSYC_Slicer *
+GNUNET_PSYC_slicer_create (void)
+{
+ struct GNUNET_PSYC_Slicer *slicer = GNUNET_malloc (sizeof (*slicer));
+ slicer->method_handlers = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ slicer->modifier_handlers = GNUNET_CONTAINER_multihashmap_create (1, GNUNET_NO);
+ return slicer;
+}
+
+
+/**
+ * Add a method to the try-and-slice instance.
+ *
+ * The callbacks are called for messages with a matching @a method_name prefix.
+ *
+ * @param slicer
+ * The try-and-slice instance to extend.
+ * @param method_name
+ * Name of the given method, use empty string to match all.
+ * @param method_cb
+ * Method handler invoked upon a matching message.
+ * @param modifier_cb
+ * Modifier handler, invoked after @a method_cb
+ * for each modifier in the message.
+ * @param data_cb
+ * Data handler, invoked after @a modifier_cb for each data fragment.
+ * @param eom_cb
+ * Invoked upon reaching the end of a matching message.
+ * @param cls
+ * Closure for the callbacks.
+ */
+void
+GNUNET_PSYC_slicer_method_add (struct GNUNET_PSYC_Slicer *slicer,
+ const char *method_name,
+ GNUNET_PSYC_MethodCallback method_cb,
+ GNUNET_PSYC_ModifierCallback modifier_cb,
+ GNUNET_PSYC_DataCallback data_cb,
+ GNUNET_PSYC_EndOfMessageCallback eom_cb,
+ void *cls)
+{
+ struct GNUNET_HashCode key;
+ GNUNET_CRYPTO_hash (method_name, strlen (method_name), &key);
+
+ struct SlicerMethodCallbacks *cbs = GNUNET_malloc (sizeof (*cbs));
+ cbs->method_cb = method_cb;
+ cbs->modifier_cb = modifier_cb;
+ cbs->data_cb = data_cb;
+ cbs->eom_cb = eom_cb;
+ cbs->cls = cls;
+
+ GNUNET_CONTAINER_multihashmap_put (slicer->method_handlers, &key, cbs,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+}
+
+
+int
+slicer_method_remove (void *cls, const struct GNUNET_HashCode *key, void *value)
+{
+ struct SlicerMethodRemoveClosure *rm_cls = cls;
+ struct GNUNET_PSYC_Slicer *slicer = rm_cls->slicer;
+ struct SlicerMethodCallbacks *rm_cbs = &rm_cls->rm_cbs;
+ struct SlicerMethodCallbacks *cbs = value;
+
+ if (cbs->method_cb == rm_cbs->method_cb
+ && cbs->modifier_cb == rm_cbs->modifier_cb
+ && cbs->data_cb == rm_cbs->data_cb
+ && cbs->eom_cb == rm_cbs->eom_cb)
+ {
+ GNUNET_CONTAINER_multihashmap_remove (slicer->method_handlers, key, cbs);
+ GNUNET_free (cbs);
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Remove a registered method from the try-and-slice instance.
+ *
+ * Removes one matching handler registered with the given
+ * @a method_name and callbacks.
+ *
+ * @param slicer
+ * The try-and-slice instance.
+ * @param method_name
+ * Name of the method to remove.
+ * @param method_cb
+ * Method handler.
+ * @param modifier_cb
+ * Modifier handler.
+ * @param data_cb
+ * Data handler.
+ * @param eom_cb
+ * End of message handler.
+ *
+ * @return #GNUNET_OK if a method handler was removed,
+ * #GNUNET_NO if no handler matched the given method name and callbacks.
+ */
+int
+GNUNET_PSYC_slicer_method_remove (struct GNUNET_PSYC_Slicer *slicer,
+ const char *method_name,
+ GNUNET_PSYC_MethodCallback method_cb,
+ GNUNET_PSYC_ModifierCallback modifier_cb,
+ GNUNET_PSYC_DataCallback data_cb,
+ GNUNET_PSYC_EndOfMessageCallback eom_cb)
+{
+ struct GNUNET_HashCode key;
+ GNUNET_CRYPTO_hash (method_name, strlen (method_name), &key);
+
+ struct SlicerMethodRemoveClosure rm_cls;
+ rm_cls.slicer = slicer;
+ struct SlicerMethodCallbacks *rm_cbs = &rm_cls.rm_cbs;
+ rm_cbs->method_cb = method_cb;
+ rm_cbs->modifier_cb = modifier_cb;
+ rm_cbs->data_cb = data_cb;
+ rm_cbs->eom_cb = eom_cb;
+
+ return
+ (GNUNET_SYSERR
+ == GNUNET_CONTAINER_multihashmap_get_multiple (slicer->method_handlers, &key,
+ slicer_method_remove,
+ &rm_cls))
+ ? GNUNET_NO
+ : GNUNET_OK;
+}
+
+
+/**
+ * Watch a place for changed objects.
+ *
+ * @param slicer
+ * The try-and-slice instance.
+ * @param object_filter
+ * Object prefix to match.
+ * @param modifier_cb
+ * Function to call when encountering a state modifier.
+ * @param cls
+ * Closure for callback.
+ */
+void
+GNUNET_PSYC_slicer_modifier_add (struct GNUNET_PSYC_Slicer *slicer,
+ const char *object_filter,
+ GNUNET_PSYC_ModifierCallback modifier_cb,
+ void *cls)
+{
+ struct SlicerModifierCallbacks *cbs = GNUNET_malloc (sizeof *cbs);
+ cbs->modifier_cb = modifier_cb;
+ cbs->cls = cls;
+
+ struct GNUNET_HashCode key;
+ GNUNET_CRYPTO_hash (object_filter, strlen (object_filter), &key);
+ GNUNET_CONTAINER_multihashmap_put (slicer->modifier_handlers, &key, cbs,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+}
+
+
+int
+slicer_modifier_remove (void *cls, const struct GNUNET_HashCode *key, void *value)
+{
+ struct SlicerModifierRemoveClosure *rm_cls = cls;
+ struct GNUNET_PSYC_Slicer *slicer = rm_cls->slicer;
+ struct SlicerModifierCallbacks *rm_cbs = &rm_cls->rm_cbs;
+ struct SlicerModifierCallbacks *cbs = value;
+
+ if (cbs->modifier_cb == rm_cbs->modifier_cb)
+ {
+ GNUNET_CONTAINER_multihashmap_remove (slicer->modifier_handlers, key, cbs);
+ GNUNET_free (cbs);
+ return GNUNET_NO;
+ }
+ return GNUNET_YES;
+}
+
+
+/**
+ * Remove a registered modifier from the try-and-slice instance.
+ *
+ * Removes one matching handler registered with the given
+ * @a object_filter and @a modifier_cb.
+ *
+ * @param slicer
+ * The try-and-slice instance.
+ * @param object_filter
+ * Object prefix to match.
+ * @param modifier_cb
+ * Function to call when encountering a state modifier changes.
+ */
+int
+GNUNET_PSYC_slicer_modifier_remove (struct GNUNET_PSYC_Slicer *slicer,
+ const char *object_filter,
+ GNUNET_PSYC_ModifierCallback modifier_cb)
+{
+ struct GNUNET_HashCode key;
+ GNUNET_CRYPTO_hash (object_filter, strlen (object_filter), &key);
+
+ struct SlicerModifierRemoveClosure rm_cls;
+ rm_cls.slicer = slicer;
+ struct SlicerModifierCallbacks *rm_cbs = &rm_cls.rm_cbs;
+ rm_cbs->modifier_cb = modifier_cb;
+
+ return
+ (GNUNET_SYSERR
+ == GNUNET_CONTAINER_multihashmap_get_multiple (slicer->modifier_handlers, &key,
+ slicer_modifier_remove,
+ &rm_cls))
+ ? GNUNET_NO
+ : GNUNET_OK;
+ }
+
+
+int
+slicer_method_free (void *cls, const struct GNUNET_HashCode *key, void *value)
+{
+ struct SlicerMethodCallbacks *cbs = value;
+ GNUNET_free (cbs);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Destroy a given try-and-slice instance.
+ *
+ * @param slicer
+ * Slicer to destroy
+ */
+void
+GNUNET_PSYC_slicer_destroy (struct GNUNET_PSYC_Slicer *slicer)
+{
+ GNUNET_CONTAINER_multihashmap_iterate (slicer->method_handlers,
+ slicer_method_free, NULL);
+ GNUNET_CONTAINER_multihashmap_destroy (slicer->method_handlers);
+ GNUNET_free (slicer);
+}
diff --git a/src/psycutil/test_psyc_env.c b/src/psycutil/test_psyc_env.c
new file mode 100644
index 0000000000..021e7fe10c
--- /dev/null
+++ b/src/psycutil/test_psyc_env.c
@@ -0,0 +1,96 @@
+/*
+ * This file is part of GNUnet.
+ * Copyright (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * @author Gabor X Toth
+ *
+ * @file
+ * Tests for the environment library.
+ */
+
+#include "platform.h"
+#include "gnunet_util_lib.h"
+#include "gnunet_testing_lib.h"
+#include "gnunet_psyc_util_lib.h"
+
+struct GNUNET_PSYC_Modifier mods[] = {
+ { .oper = GNUNET_PSYC_OP_SET,
+ .name = "_foo", .value = "foo", .value_size = 3 },
+
+ { .oper = GNUNET_PSYC_OP_ASSIGN,
+ .name = "_foo_bar", .value = "foo bar", .value_size = 7 },
+
+ { .oper = GNUNET_PSYC_OP_AUGMENT,
+ .name = "_foo_bar_baz", .value = "foo bar baz", .value_size = 11 }
+};
+
+struct ItCls
+{
+ size_t n;
+};
+
+int
+iterator (void *cls, enum GNUNET_PSYC_Operator oper,
+ const char *name, const char *value, uint32_t value_size)
+{
+ struct ItCls *it_cls = cls;
+ struct GNUNET_PSYC_Modifier *m = &mods[it_cls->n++];
+
+ GNUNET_assert (oper == m->oper);
+ GNUNET_assert (value_size == m->value_size);
+ GNUNET_assert (0 == memcmp (name, m->name, strlen (m->name)));
+ GNUNET_assert (0 == memcmp (value, m->value, m->value_size));
+
+ return GNUNET_YES;
+}
+
+int
+main (int argc, char *argv[])
+{
+ GNUNET_log_setup ("test-env", "WARNING", NULL);
+
+ struct GNUNET_PSYC_Environment *env = GNUNET_PSYC_env_create ();
+ GNUNET_assert (NULL != env);
+ int i, len = 3;
+
+ for (i = 0; i < len; i++)
+ {
+ GNUNET_PSYC_env_add (env, mods[i].oper, mods[i].name,
+ mods[i].value, mods[i].value_size);
+ }
+
+ struct ItCls it_cls = { .n = 0 };
+ GNUNET_PSYC_env_iterate (env, iterator, &it_cls);
+ GNUNET_assert (len == it_cls.n);
+
+ for (i = 0; i < len; i++)
+ {
+ enum GNUNET_PSYC_Operator oper;
+ const char *name;
+ const void *value;
+ size_t value_size;
+ GNUNET_PSYC_env_shift (env, &oper, &name, &value, &value_size);
+ GNUNET_assert (len - i - 1 == GNUNET_PSYC_env_get_count (env));
+ }
+
+ GNUNET_PSYC_env_destroy (env);
+
+ return 0;
+}