aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore
diff options
context:
space:
mode:
authorOmar Tarabai <tarabai@devegypt.com>2014-05-30 16:06:00 +0000
committerOmar Tarabai <tarabai@devegypt.com>2014-05-30 16:06:00 +0000
commit95cdeb5c0bb1f14f3959863e6bf4675db48ea177 (patch)
tree545e2a910c0efdae9dc2e2af8da45efa007cdf32 /src/peerstore
parent02f9d1e7389d0da0a475ae0035b67801c7ca2d06 (diff)
peerstore: towards watch functionality
Diffstat (limited to 'src/peerstore')
-rw-r--r--src/peerstore/gnunet-service-peerstore.c79
-rw-r--r--src/peerstore/peerstore.h18
-rw-r--r--src/peerstore/peerstore_api.c142
-rw-r--r--src/peerstore/peerstore_common.c32
-rw-r--r--src/peerstore/peerstore_common.h10
-rw-r--r--src/peerstore/test_peerstore_api.c38
6 files changed, 275 insertions, 44 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c
index c410630c9e..70d79ea5e2 100644
--- a/src/peerstore/gnunet-service-peerstore.c
+++ b/src/peerstore/gnunet-service-peerstore.c
@@ -29,7 +29,23 @@
#include "gnunet_peerstore_plugin.h"
#include "peerstore_common.h"
-//TODO: GNUNET_SERVER_receive_done() ?
+/**
+ * Context of a PEERSTORE watch
+ */
+struct WatchContext
+{
+
+ /**
+ * Hash of key of watched record
+ */
+ struct GNUNET_HashCode keyhash;
+
+ /**
+ * Client requested the watch
+ */
+ struct GNUNET_SERVER_Client *client;
+
+};
/**
* Interval for expired records cleanup (in seconds)
@@ -52,6 +68,11 @@ char *db_lib_name;
static struct GNUNET_PEERSTORE_PluginFunctions *db;
/**
+ * Hashmap with all watch requests
+ */
+static struct GNUNET_CONTAINER_MultiHashMap *watchers;
+
+/**
* Task run during shutdown.
*
* @param cls unused
@@ -67,7 +88,8 @@ shutdown_task (void *cls,
GNUNET_free (db_lib_name);
db_lib_name = NULL;
}
-
+ if(NULL != watchers)
+ GNUNET_CONTAINER_multihashmap_destroy(watchers);
GNUNET_SCHEDULER_shutdown();
}
@@ -132,6 +154,55 @@ int record_iterator(void *cls,
}
/**
+ * Handle a watch cancel request from client
+ *
+ * @param cls unused
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void handle_watch_cancel (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct StoreKeyHashMessage *hm;
+
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n");
+ if(NULL == watchers)
+ {
+ GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
+ "Received a watch cancel request when we don't have any watchers.\n");
+ GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+ return;
+ }
+ hm = (struct StoreKeyHashMessage *) message;
+ GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client);
+ GNUNET_SERVER_receive_done(client, GNUNET_OK);
+}
+
+/**
+ * Handle a watch request from client
+ *
+ * @param cls unused
+ * @param client identification of the client
+ * @param message the actual message
+ */
+void handle_watch (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct StoreKeyHashMessage *hm;
+
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n");
+ hm = (struct StoreKeyHashMessage *) message;
+ GNUNET_SERVER_client_mark_monitor(client);
+ if(NULL == watchers)
+ watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
+ GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash,
+ client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
+ GNUNET_SERVER_receive_done(client, GNUNET_OK);
+}
+
+/**
* Handle an iterate request from client
*
* @param cls unused
@@ -232,7 +303,7 @@ void handle_store (void *cls,
tc = GNUNET_SERVER_transmit_context_create (client);
GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type);
GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
-
+ //TODO: notify watchers, if a client is disconnected, remove its watch entry
}
/**
@@ -250,6 +321,8 @@ run (void *cls,
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
{&handle_store, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, 0},
{&handle_iterate, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, 0},
+ {&handle_watch, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH, sizeof(struct StoreKeyHashMessage)},
+ {&handle_watch_cancel, NULL, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL, sizeof(struct StoreKeyHashMessage)},
{NULL, NULL, 0, 0}
};
char *database;
diff --git a/src/peerstore/peerstore.h b/src/peerstore/peerstore.h
index 7c6e6bdbc6..5adf9f3631 100644
--- a/src/peerstore/peerstore.h
+++ b/src/peerstore/peerstore.h
@@ -77,6 +77,24 @@ struct StoreRecordMessage
};
+/**
+ * Message carrying record key hash
+ */
+struct StoreKeyHashMessage
+{
+
+ /**
+ * GNUnet message header
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Hash of a record key
+ */
+ struct GNUNET_HashCode keyhash;
+
+};
+
GNUNET_NETWORK_STRUCT_END
#endif
diff --git a/src/peerstore/peerstore_api.c b/src/peerstore/peerstore_api.c
index 14b1c3e889..c9a68f4bf1 100644
--- a/src/peerstore/peerstore_api.c
+++ b/src/peerstore/peerstore_api.c
@@ -76,14 +76,9 @@ struct GNUNET_PEERSTORE_Handle
struct GNUNET_PEERSTORE_IterateContext *iterate_tail;
/**
- * Head of WATCH requests (active and inactive).
+ * Hashmap of watch requests
*/
- struct GNUNET_PEERSTORE_WatchContext *watch_head;
-
- /**
- * Tail of WATCH requests (active and inactive).
- */
- struct GNUNET_PEERSTORE_WatchContext *watch_tail;
+ struct GNUNET_CONTAINER_MultiHashMap *watches;
};
@@ -215,6 +210,11 @@ struct GNUNET_PEERSTORE_WatchContext
void *callback_cls;
/**
+ * Hash of the combined key
+ */
+ struct GNUNET_HashCode keyhash;
+
+ /**
* #GNUNET_YES / #GNUNET_NO
* if sent, cannot be canceled
*/
@@ -284,27 +284,6 @@ handle_client_error (void *cls, enum GNUNET_MQ_Error error)
}
/**
- * Should be called only after destroying MQ and connection
- */
-static void
-cleanup_handle(struct GNUNET_PEERSTORE_Handle *h)
-{
- struct GNUNET_PEERSTORE_StoreContext *sc;
- struct GNUNET_PEERSTORE_IterateContext *ic;
-
- while (NULL != (sc = h->store_head))
- {
- GNUNET_CONTAINER_DLL_remove(h->store_head, h->store_tail, sc);
- GNUNET_free(sc);
- }
- while (NULL != (ic = h->iterate_head))
- {
- GNUNET_CONTAINER_DLL_remove(h->iterate_head, h->iterate_tail, ic);
- GNUNET_free(ic);
- }
-}
-
-/**
* Close the existing connection to PEERSTORE and reconnect.
*
* @param h handle to the service
@@ -312,7 +291,6 @@ cleanup_handle(struct GNUNET_PEERSTORE_Handle *h)
static void
reconnect (struct GNUNET_PEERSTORE_Handle *h)
{
-
LOG(GNUNET_ERROR_TYPE_DEBUG, "Reconnecting...\n");
if (NULL != h->mq)
{
@@ -324,12 +302,13 @@ reconnect (struct GNUNET_PEERSTORE_Handle *h)
GNUNET_CLIENT_disconnect (h->client);
h->client = NULL;
}
- cleanup_handle(h);
h->client = GNUNET_CLIENT_connect ("peerstore", h->cfg);
+ //FIXME: retry connecting if fails again (client == NULL)
h->mq = GNUNET_MQ_queue_for_connection_client(h->client,
mq_handlers,
&handle_client_error,
h);
+ //FIXME: resend pending requests after reconnecting
}
@@ -373,6 +352,11 @@ GNUNET_PEERSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
void
GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
{
+ if(NULL != h->watches)
+ {
+ GNUNET_CONTAINER_multihashmap_destroy(h->watches);
+ h->watches = NULL;
+ }
if(NULL != h->mq)
{
GNUNET_MQ_destroy(h->mq);
@@ -383,7 +367,6 @@ GNUNET_PEERSTORE_disconnect(struct GNUNET_PEERSTORE_Handle *h)
GNUNET_CLIENT_disconnect (h->client);
h->client = NULL;
}
- cleanup_handle(h);
GNUNET_free(h);
LOG(GNUNET_ERROR_TYPE_DEBUG, "Disconnected, BYE!\n");
}
@@ -655,7 +638,7 @@ GNUNET_PEERSTORE_iterate_cancel (struct GNUNET_PEERSTORE_IterateContext *ic)
*/
struct GNUNET_PEERSTORE_IterateContext *
GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
- char *sub_system,
+ const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
struct GNUNET_TIME_Relative timeout,
@@ -698,7 +681,54 @@ GNUNET_PEERSTORE_iterate (struct GNUNET_PEERSTORE_Handle *h,
*/
void handle_watch_result (void *cls, const struct GNUNET_MessageHeader *msg)
{
+ /*struct GNUNET_PEERSTORE_Handle *h = cls;
+ struct GNUNET_PEERSTORE_WatchContext *wc;
+ GNUNET_PEERSTORE_Processor callback;
+ void *callback_cls;
+
+
+
+ struct GNUNET_PEERSTORE_IterateContext *ic;
+ uint16_t msg_type;
+ struct GNUNET_PEERSTORE_Record *record;
+ int continue_iter;
+
+ ic = h->iterate_head;
+ if(NULL == ic)
+ {
+ LOG(GNUNET_ERROR_TYPE_ERROR, "Unexpected iteration response, this should not happen.\n");
+ reconnect(h);
+ return;
+ }
+ callback = ic->callback;
+ callback_cls = ic->callback_cls;
+ if(NULL == msg) * Connection error *
+ {
+ if(NULL != callback)
+ callback(callback_cls, NULL,
+ _("Error communicating with `PEERSTORE' service."));
+ reconnect(h);
+ return;
+ }
+ msg_type = ntohs(msg->type);
+ if(GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END == msg_type)
+ {
+ GNUNET_PEERSTORE_iterate_cancel(ic);
+ if(NULL != callback)
+ callback(callback_cls, NULL, NULL);
+ return;
+ }
+ if(NULL != callback)
+ {
+ record = PEERSTORE_parse_record_message(msg);
+ if(NULL == record)
+ continue_iter = callback(callback_cls, record, _("Received a malformed response from service."));
+ else
+ continue_iter = callback(callback_cls, record, NULL);
+ if(GNUNET_NO == continue_iter)
+ ic->callback = NULL;
+ }*/
}
/**
@@ -715,6 +745,36 @@ void watch_request_sent (void *cls)
}
/**
+ * Cancel a watch request
+ *
+ * @wc handle to the watch request
+ */
+void
+GNUNET_PEERSTORE_watch_cancel(struct GNUNET_PEERSTORE_WatchContext *wc)
+{
+ struct GNUNET_PEERSTORE_Handle *h = wc->h;
+ struct GNUNET_MQ_Envelope *ev;
+ struct StoreKeyHashMessage *hm;
+
+ LOG(GNUNET_ERROR_TYPE_DEBUG, "Canceling watch.\n");
+ if(GNUNET_YES == wc->request_sent) /* If request already sent to service, send a cancel request. */
+ {
+ ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL);
+ GNUNET_MQ_send(h->mq, ev);
+ wc->callback = NULL;
+ wc->callback_cls = NULL;
+ }
+ if(NULL != wc->ev)
+ {
+ GNUNET_MQ_send_cancel(wc->ev);
+ wc->ev = NULL;
+ }
+ GNUNET_CONTAINER_multihashmap_remove(h->watches, &wc->keyhash, wc);
+ GNUNET_free(wc);
+
+}
+
+/**
* Request watching a given key
* User will be notified with any new values added to key
*
@@ -728,28 +788,28 @@ void watch_request_sent (void *cls)
*/
struct GNUNET_PEERSTORE_WatchContext *
GNUNET_PEERSTORE_watch (struct GNUNET_PEERSTORE_Handle *h,
- char *sub_system,
+ const char *sub_system,
const struct GNUNET_PeerIdentity *peer,
const char *key,
GNUNET_PEERSTORE_Processor callback, void *callback_cls)
{
struct GNUNET_MQ_Envelope *ev;
+ struct StoreKeyHashMessage *hm;
struct GNUNET_PEERSTORE_WatchContext *wc;
- ev = PEERSTORE_create_record_mq_envelope(sub_system,
- peer,
- key,
- NULL,
- 0,
- NULL,
- GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+ ev = GNUNET_MQ_msg(hm, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH);
+ PEERSTORE_hash_key(sub_system, peer, key, &hm->keyhash);
wc = GNUNET_new(struct GNUNET_PEERSTORE_WatchContext);
wc->callback = callback;
wc->callback_cls = callback_cls;
wc->ev = ev;
wc->h = h;
wc->request_sent = GNUNET_NO;
- GNUNET_CONTAINER_DLL_insert(h->watch_head, h->watch_tail, wc);
+ wc->keyhash = hm->keyhash;
+ if(NULL == h->watches)
+ h->watches = GNUNET_CONTAINER_multihashmap_create(5, GNUNET_NO);
+ GNUNET_CONTAINER_multihashmap_put(h->watches, &wc->keyhash,
+ wc, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
LOG(GNUNET_ERROR_TYPE_DEBUG,
"Sending a watch request for sub system `%s'.\n", sub_system);
GNUNET_MQ_notify_sent(ev, &watch_request_sent, wc);
diff --git a/src/peerstore/peerstore_common.c b/src/peerstore/peerstore_common.c
index 5783973b6e..2b62abf195 100644
--- a/src/peerstore/peerstore_common.c
+++ b/src/peerstore/peerstore_common.c
@@ -26,6 +26,38 @@
#include "peerstore_common.h"
/**
+ * Creates a hash of the given key combination
+ *
+ */
+void
+PEERSTORE_hash_key(const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ struct GNUNET_HashCode *ret)
+{
+ size_t sssize;
+ size_t psize;
+ size_t ksize;
+ size_t totalsize;
+ void *block;
+ void *blockptr;
+
+ sssize = strlen(sub_system) + 1;
+ psize = sizeof(struct GNUNET_PeerIdentity);
+ ksize = strlen(sub_system) + 1;
+ totalsize = sssize + psize + ksize;
+ block = GNUNET_malloc(totalsize);
+ blockptr = block;
+ memcpy(blockptr, sub_system, sssize);
+ blockptr += sssize;
+ memcpy(blockptr, peer, psize);
+ blockptr += psize;
+ memcpy(blockptr, key, ksize);
+ GNUNET_CRYPTO_hash(block, totalsize, ret);
+ GNUNET_free(block);
+}
+
+/**
* Creates a record message ready to be sent
*
* @param sub_system sub system string
diff --git a/src/peerstore/peerstore_common.h b/src/peerstore/peerstore_common.h
index cd918497b7..20cb9c0e74 100644
--- a/src/peerstore/peerstore_common.h
+++ b/src/peerstore/peerstore_common.h
@@ -27,6 +27,16 @@
#include "peerstore.h"
/**
+ * Creates a hash of the given key combination
+ *
+ */
+void
+PEERSTORE_hash_key(const char *sub_system,
+ const struct GNUNET_PeerIdentity *peer,
+ const char *key,
+ struct GNUNET_HashCode *ret);
+
+/**
* Creates a record message ready to be sent
*
* @param sub_system sub system string
diff --git a/src/peerstore/test_peerstore_api.c b/src/peerstore/test_peerstore_api.c
index fe62933e7a..7a512f664c 100644
--- a/src/peerstore/test_peerstore_api.c
+++ b/src/peerstore/test_peerstore_api.c
@@ -27,6 +27,8 @@
#include "gnunet_peerstore_service.h"
#include <inttypes.h>
+//TODO: test single cycle of watch, store, iterate
+
static int ok = 1;
static int counter = 0;
@@ -76,6 +78,25 @@ void store_cont(void *cls, int success)
NULL);
}
+int watch_cb (void *cls,
+ struct GNUNET_PEERSTORE_Record *record,
+ char *emsg)
+{
+ if(NULL != emsg)
+ {
+ printf("Error received: %s.\n", emsg);
+ return GNUNET_YES;
+ }
+
+ printf("Watch Record:\n");
+ printf("Sub system: %s\n", record->sub_system);
+ printf("Peer: %s\n", GNUNET_i2s (record->peer));
+ printf("Key: %s\n", record->key);
+ printf("Value: %.*s\n", (int)record->value_size, (char *)record->value);
+ printf("Expiry: %" PRIu64 "\n", record->expiry->abs_value_us);
+ return GNUNET_YES;
+}
+
static void
run (void *cls,
const struct GNUNET_CONFIGURATION_Handle *cfg,
@@ -91,6 +112,12 @@ run (void *cls,
expiry = GNUNET_TIME_absolute_get();
h = GNUNET_PEERSTORE_connect(cfg);
GNUNET_assert(NULL != h);
+ GNUNET_PEERSTORE_watch(h,
+ "peerstore-test",
+ &pid,
+ "peerstore-test-key",
+ &watch_cb,
+ NULL);
GNUNET_PEERSTORE_store(h,
"peerstore-test",
&pid,
@@ -103,6 +130,17 @@ run (void *cls,
}
+int iterator (void *cls, const struct GNUNET_HashCode *key, void *value)
+{
+ struct GNUNET_CONTAINER_MultiHashMap *map = cls;
+ uint32_t *x = value;
+
+ printf("Received value: %d\n", *x);
+ if(*x == 2)
+ GNUNET_CONTAINER_multihashmap_remove(map, key, value);
+ return GNUNET_YES;
+}
+
int
main (int argc, char *argv[])
{