aboutsummaryrefslogtreecommitdiff
path: root/src/peerstore/gnunet-service-peerstore.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/peerstore/gnunet-service-peerstore.c')
-rw-r--r--src/peerstore/gnunet-service-peerstore.c94
1 files changed, 72 insertions, 22 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c
index 70d79ea5e2..706fcaaae3 100644
--- a/src/peerstore/gnunet-service-peerstore.c
+++ b/src/peerstore/gnunet-service-peerstore.c
@@ -73,6 +73,11 @@ static struct GNUNET_PEERSTORE_PluginFunctions *db;
static struct GNUNET_CONTAINER_MultiHashMap *watchers;
/**
+ * Our notification context.
+ */
+static struct GNUNET_SERVER_NotificationContext *nc;
+
+/**
* Task run during shutdown.
*
* @param cls unused
@@ -88,8 +93,8 @@ shutdown_task (void *cls,
GNUNET_free (db_lib_name);
db_lib_name = NULL;
}
- if(NULL != watchers)
- GNUNET_CONTAINER_multihashmap_destroy(watchers);
+ GNUNET_SERVER_notification_context_destroy(nc);
+ GNUNET_CONTAINER_multihashmap_destroy(watchers);
GNUNET_SCHEDULER_shutdown();
}
@@ -154,6 +159,60 @@ int record_iterator(void *cls,
}
/**
+ * Iterator over all watcher clients
+ * to notify them of a new record
+ *
+ * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *'
+ * @param key hash of record key
+ * @param value the watcher client, a 'struct GNUNET_SERVER_Client *'
+ * @return #GNUNET_YES to continue iterating
+ */
+int watch_notifier_it(void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct GNUNET_PEERSTORE_Record *record = cls;
+ struct GNUNET_SERVER_Client *client = value;
+ struct StoreRecordMessage *srm;
+
+ GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n");
+ if(NULL == value)
+ {
+ GNUNET_CONTAINER_multihashmap_remove(watchers, key, value);
+ return GNUNET_YES;
+ }
+ srm = PEERSTORE_create_record_message(record->sub_system,
+ record->peer,
+ record->key,
+ record->value,
+ record->value_size,
+ record->expiry,
+ GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD);
+ GNUNET_SERVER_notification_context_unicast(nc, client,
+ (const struct GNUNET_MessageHeader *)srm, GNUNET_YES);
+ return GNUNET_YES;
+}
+
+/**
+ * Given a new record, notifies watchers
+ *
+ * @cls closure, a 'struct GNUNET_PEERSTORE_Record *'
+ * @tc unused
+ */
+void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct GNUNET_PEERSTORE_Record *record = cls;
+ struct GNUNET_HashCode keyhash;
+
+ GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n");
+ PEERSTORE_hash_key(record->sub_system,
+ record->peer,
+ record->key,
+ &keyhash);
+ GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, &watch_notifier_it, record);
+}
+
+/**
* Handle a watch cancel request from client
*
* @param cls unused
@@ -167,13 +226,6 @@ void handle_watch_cancel (void *cls,
struct StoreKeyHashMessage *hm;
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n");
- if(NULL == watchers)
- {
- GNUNET_log(GNUNET_ERROR_TYPE_WARNING,
- "Received a watch cancel request when we don't have any watchers.\n");
- GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
- return;
- }
hm = (struct StoreKeyHashMessage *) message;
GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client);
GNUNET_SERVER_receive_done(client, GNUNET_OK);
@@ -195,8 +247,7 @@ void handle_watch (void *cls,
GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n");
hm = (struct StoreKeyHashMessage *) message;
GNUNET_SERVER_client_mark_monitor(client);
- if(NULL == watchers)
- watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
+ GNUNET_SERVER_notification_context_add(nc, client);
GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash,
client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE);
GNUNET_SERVER_receive_done(client, GNUNET_OK);
@@ -246,7 +297,7 @@ void handle_iterate (void *cls,
GNUNET_free(tc);
GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
}
- GNUNET_free(record);
+ GNUNET_free(record); /* FIXME: destroy record */
}
/**
@@ -261,7 +312,6 @@ void handle_store (void *cls,
const struct GNUNET_MessageHeader *message)
{
struct GNUNET_PEERSTORE_Record *record;
- uint16_t response_type;
struct GNUNET_SERVER_TransmitContext *tc;
record = PEERSTORE_parse_record_message(message);
@@ -275,6 +325,7 @@ void handle_store (void *cls,
|| NULL == record->peer
|| NULL == record->key)
{
+ /* FIXME: Destroy record */
GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n");
GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
return;
@@ -284,7 +335,7 @@ void handle_store (void *cls,
record->sub_system,
GNUNET_i2s (record->peer),
record->key);
- if(GNUNET_OK == db->store_record(db->cls,
+ if(GNUNET_OK != db->store_record(db->cls,
record->sub_system,
record->peer,
record->key,
@@ -292,18 +343,15 @@ void handle_store (void *cls,
record->value_size,
*record->expiry))
{
- response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK;
- }
- else
- {
+ /* FIXME: Destroy record */
GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error.");
- response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL;
+ GNUNET_SERVER_receive_done(client, GNUNET_SYSERR);
+ return;
}
-
tc = GNUNET_SERVER_transmit_context_create (client);
- GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type);
+ GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK);
GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL);
- //TODO: notify watchers, if a client is disconnected, remove its watch entry
+ GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1);
}
/**
@@ -343,6 +391,8 @@ run (void *cls,
GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name);
else
{
+ nc = GNUNET_SERVER_notification_context_create (server, 16);
+ watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO);
GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL);
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SERVER_disconnect_notify (server,