aboutsummaryrefslogtreecommitdiff
path: root/src/dht/dht_api.c
diff options
context:
space:
mode:
authorBart Polot <bart@net.in.tum.de>2012-04-20 17:18:14 +0000
committerBart Polot <bart@net.in.tum.de>2012-04-20 17:18:14 +0000
commit90c9abc573f95de334a1f61faa089e79046d2f11 (patch)
tree89b596897cf1a17ea9c048748ec063b063f8d634 /src/dht/dht_api.c
parent6c889a1786be40c0d023e8971411bc327af352c6 (diff)
- Rewritten DHT monitoring
Diffstat (limited to 'src/dht/dht_api.c')
-rw-r--r--src/dht/dht_api.c238
1 files changed, 200 insertions, 38 deletions
diff --git a/src/dht/dht_api.c b/src/dht/dht_api.c
index 1030b79999..3f850b3e70 100644
--- a/src/dht/dht_api.c
+++ b/src/dht/dht_api.c
@@ -171,9 +171,19 @@ struct GNUNET_DHT_MonitorHandle
GNUNET_HashCode *key;
/**
- * Callback for each received message of interest.
+ * Callback for each received message of type get.
*/
- GNUNET_DHT_MonitorCB cb;
+ GNUNET_DHT_MonitorGetCB get_cb;
+
+ /**
+ * Callback for each received message of type get response.
+ */
+ GNUNET_DHT_MonitorGetRespCB get_resp_cb;
+
+ /**
+ * Callback for each received message of type put.
+ */
+ GNUNET_DHT_MonitorPutCB put_cb;
/**
* Closure for cb.
@@ -533,63 +543,205 @@ process_reply (void *cls, const GNUNET_HashCode * key, void *value)
return GNUNET_YES;
}
-
/**
- * Process a monitoring message from the service.
+ * Process a get monitor message from the service.
*
* @param handle The DHT handle.
- * @param msg Message from the service.
+ * @param msg Monitor get message from the service.
*
* @return GNUNET_OK if everything went fine,
* GNUNET_SYSERR if the message is malformed.
*/
static int
-process_monitor_message (struct GNUNET_DHT_Handle *handle,
- const struct GNUNET_MessageHeader *msg)
+process_monitor_get_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorGetMessage *msg)
{
- struct GNUNET_DHT_MonitorMessage *m;
struct GNUNET_DHT_MonitorHandle *h;
size_t msize;
- if (ntohs (msg->type) < GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET ||
- ntohs (msg->type) > GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT)
+ msize = ntohs (msg->header.size);
+ if (msize < sizeof (struct GNUNET_DHT_MonitorGetMessage))
return GNUNET_SYSERR;
- msize = ntohs (msg->size);
- if (msize < sizeof (struct GNUNET_DHT_MonitorMessage))
+
+ h = handle->monitor_head;
+ while (NULL != h)
+ {
+ int type_ok;
+ int key_ok;
+
+ type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
+ key_ok = NULL == h->key || memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)) == 0;
+ if (type_ok && key_ok && NULL != h->get_cb)
+ {
+ h->get_cb (h->cb_cls,
+ ntohl (msg->options),
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ ntohl (msg->hop_count),
+ ntohl (msg->desired_replication_level),
+ ntohl (msg->get_path_length),
+ (struct GNUNET_PeerIdentity *) &msg[1],
+ &msg->key);
+ }
+ h = h->next;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process a get response monitor message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg Monitor get response message from the service.
+ *
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_get_resp_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorGetRespMessage
+ *msg)
+{
+ struct GNUNET_DHT_MonitorHandle *h;
+ size_t msize;
+
+ msize = ntohs (msg->header.size);
+ if (msize < sizeof (struct GNUNET_DHT_MonitorGetRespMessage))
return GNUNET_SYSERR;
- m = (struct GNUNET_DHT_MonitorMessage *) msg;
h = handle->monitor_head;
while (NULL != h)
{
- if (h->type == ntohl(m->type) &&
- (NULL == h->key ||
- memcmp (h->key, &m->key, sizeof (GNUNET_HashCode)) == 0))
+ int type_ok;
+ int key_ok;
+
+ type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
+ key_ok = NULL == h->key || memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)) == 0;
+ if (type_ok && key_ok && NULL != h->get_resp_cb)
{
struct GNUNET_PeerIdentity *path;
uint32_t getl;
uint32_t putl;
- path = (struct GNUNET_PeerIdentity *) &m[1];
- getl = ntohl (m->get_path_length);
- putl = ntohl (m->put_path_length);
- h->cb (h->cb_cls, ntohs(msg->type),
- GNUNET_TIME_absolute_ntoh(m->expiration),
- &m->key,
- &path[getl], putl, path, getl,
- ntohl (m->desired_replication_level),
- ntohl (m->options), ntohl (m->type),
- (void *) &path[getl + putl],
- ntohs (msg->size) -
- sizeof (struct GNUNET_DHT_MonitorMessage) -
- sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
+ path = (struct GNUNET_PeerIdentity *) &msg[1];
+ getl = ntohl (msg->get_path_length);
+ putl = ntohl (msg->put_path_length);
+ h->get_resp_cb (h->cb_cls,
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ path, getl,
+ &path[getl], putl,
+ GNUNET_TIME_absolute_ntoh(msg->expiration_time),
+ &msg->key,
+ (void *) &path[getl + putl],
+ msize -
+ sizeof (struct GNUNET_DHT_MonitorGetRespMessage) -
+ sizeof (struct GNUNET_PeerIdentity) * (putl + getl));
}
h = h->next;
}
+ return GNUNET_OK;
+}
+
+
+/**
+ * Process a put monitor message from the service.
+ *
+ * @param handle The DHT handle.
+ * @param msg Monitor put message from the service.
+ *
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_put_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_DHT_MonitorPutMessage *msg)
+{
+ struct GNUNET_DHT_MonitorHandle *h;
+ size_t msize;
+
+ msize = ntohs (msg->header.size);
+ if (msize < sizeof (struct GNUNET_DHT_MonitorPutMessage))
+ return GNUNET_SYSERR;
+
+ h = handle->monitor_head;
+ while (NULL != h)
+ {
+ int type_ok;
+ int key_ok;
+
+ type_ok = GNUNET_BLOCK_TYPE_ANY == h->type || h->type == ntohl(msg->type);
+ key_ok = NULL == h->key || memcmp (h->key, &msg->key,
+ sizeof (GNUNET_HashCode)) == 0;
+ if (type_ok && key_ok && NULL != h->put_cb)
+ {
+ struct GNUNET_PeerIdentity *path;
+ uint32_t putl;
+ path = (struct GNUNET_PeerIdentity *) &msg[1];
+ putl = ntohl (msg->put_path_length);
+ h->put_cb (h->cb_cls,
+ ntohl (msg->options),
+ (enum GNUNET_BLOCK_Type) ntohl(msg->type),
+ ntohl (msg->hop_count),
+ ntohl (msg->desired_replication_level),
+ putl, path,
+ GNUNET_TIME_absolute_ntoh(msg->expiration_time),
+ &msg->key,
+ (void *) &path[putl],
+ msize -
+ sizeof (struct GNUNET_DHT_MonitorPutMessage) -
+ sizeof (struct GNUNET_PeerIdentity) * putl);
+ }
+ h = h->next;
+ }
return GNUNET_OK;
}
+
+/**
+ * Process a monitoring message from the service: demultiplex for proper type.
+ *
+ * @param handle The DHT handle.
+ * @param msg Message from the service.
+ *
+ * @return GNUNET_OK if everything went fine,
+ * GNUNET_SYSERR if the message is malformed.
+ */
+static int
+process_monitor_message (struct GNUNET_DHT_Handle *handle,
+ const struct GNUNET_MessageHeader *msg)
+{
+ switch (ntohs (msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET:
+ return process_monitor_get_message(handle,
+ (struct GNUNET_DHT_MonitorGetMessage *)
+ msg);
+
+ case GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP:
+ {
+ return process_monitor_get_resp_message(
+ handle,
+ (struct GNUNET_DHT_MonitorGetRespMessage *) msg);
+ }
+ case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT:
+ {
+ return process_monitor_put_message(handle,
+ (struct GNUNET_DHT_MonitorPutMessage *)
+ msg);
+ }
+ case GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT_RESP:
+ /* Not implemented yet */
+ GNUNET_break(0);
+ /* Fall through */
+ default:
+ GNUNET_break(0);
+ return GNUNET_SYSERR;
+ }
+}
+
/**
* Handler for messages received from the DHT service
* a demultiplexer which handles numerous message types
@@ -930,7 +1082,9 @@ GNUNET_DHT_get_stop (struct GNUNET_DHT_GetHandle *get_handle)
* @param handle Handle to the DHT service.
* @param type Type of blocks that are of interest.
* @param key Key of data of interest, NULL for all.
- * @param cb Callback to process all monitored data.
+ * @param get_cb Callback to process monitored get messages.
+ * @param get_resp_cb Callback to process monitored get response messages.
+ * @param put_cb Callback to process monitored put messages.
* @param cb_cls Closure for cb.
*
* @return Handle to stop monitoring.
@@ -939,18 +1093,21 @@ struct GNUNET_DHT_MonitorHandle *
GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
enum GNUNET_BLOCK_Type type,
const GNUNET_HashCode *key,
- GNUNET_DHT_MonitorCB cb,
+ GNUNET_DHT_MonitorGetCB get_cb,
+ GNUNET_DHT_MonitorGetRespCB get_resp_cb,
+ GNUNET_DHT_MonitorPutCB put_cb,
void *cb_cls)
{
struct GNUNET_DHT_MonitorHandle *h;
- struct GNUNET_DHT_MonitorMessage *m;
+ struct GNUNET_DHT_MonitorStartMessage *m;
struct PendingMessage *pending;
h = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorHandle));
GNUNET_CONTAINER_DLL_insert(handle->monitor_head, handle->monitor_tail, h);
- GNUNET_assert (NULL != cb);
- h->cb = cb;
+ h->get_cb = get_cb;
+ h->get_resp_cb = get_resp_cb;
+ h->put_cb = put_cb;
h->cb_cls = cb_cls;
h->type = type;
h->dht_handle = handle;
@@ -960,17 +1117,22 @@ GNUNET_DHT_monitor_start (struct GNUNET_DHT_Handle *handle,
memcpy (h->key, key, sizeof(GNUNET_HashCode));
}
- pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorMessage) +
+ pending = GNUNET_malloc (sizeof (struct GNUNET_DHT_MonitorStartMessage) +
sizeof (struct PendingMessage));
- m = (struct GNUNET_DHT_MonitorMessage *) &pending[1];
+ m = (struct GNUNET_DHT_MonitorStartMessage *) &pending[1];
pending->msg = &m->header;
pending->handle = handle;
pending->free_on_send = GNUNET_YES;
m->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
- m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorMessage));
+ m->header.size = htons (sizeof (struct GNUNET_DHT_MonitorStartMessage));
m->type = htonl(type);
- if (NULL != key)
+ m->get = (NULL != get_cb);
+ m->get_resp = (NULL != get_resp_cb);
+ m->put = (NULL != put_cb);
+ if (NULL != key) {
+ m->filter_key = 1;
memcpy (&m->key, key, sizeof(GNUNET_HashCode));
+ }
GNUNET_CONTAINER_DLL_insert (handle->pending_head, handle->pending_tail,
pending);
pending->in_pending_queue = GNUNET_YES;