diff options
Diffstat (limited to 'src/peerstore/gnunet-service-peerstore.c')
-rw-r--r-- | src/peerstore/gnunet-service-peerstore.c | 94 |
1 files changed, 72 insertions, 22 deletions
diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index 70d79ea5e2..706fcaaae3 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c @@ -73,6 +73,11 @@ static struct GNUNET_PEERSTORE_PluginFunctions *db; static struct GNUNET_CONTAINER_MultiHashMap *watchers; /** + * Our notification context. + */ +static struct GNUNET_SERVER_NotificationContext *nc; + +/** * Task run during shutdown. * * @param cls unused @@ -88,8 +93,8 @@ shutdown_task (void *cls, GNUNET_free (db_lib_name); db_lib_name = NULL; } - if(NULL != watchers) - GNUNET_CONTAINER_multihashmap_destroy(watchers); + GNUNET_SERVER_notification_context_destroy(nc); + GNUNET_CONTAINER_multihashmap_destroy(watchers); GNUNET_SCHEDULER_shutdown(); } @@ -154,6 +159,60 @@ int record_iterator(void *cls, } /** + * Iterator over all watcher clients + * to notify them of a new record + * + * @param cls closuer, a 'struct GNUNET_PEERSTORE_Record *' + * @param key hash of record key + * @param value the watcher client, a 'struct GNUNET_SERVER_Client *' + * @return #GNUNET_YES to continue iterating + */ +int watch_notifier_it(void *cls, + const struct GNUNET_HashCode *key, + void *value) +{ + struct GNUNET_PEERSTORE_Record *record = cls; + struct GNUNET_SERVER_Client *client = value; + struct StoreRecordMessage *srm; + + GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n"); + if(NULL == value) + { + GNUNET_CONTAINER_multihashmap_remove(watchers, key, value); + return GNUNET_YES; + } + srm = PEERSTORE_create_record_message(record->sub_system, + record->peer, + record->key, + record->value, + record->value_size, + record->expiry, + GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); + GNUNET_SERVER_notification_context_unicast(nc, client, + (const struct GNUNET_MessageHeader *)srm, GNUNET_YES); + return GNUNET_YES; +} + +/** + * Given a new record, notifies watchers + * + * @cls closure, a 'struct GNUNET_PEERSTORE_Record *' + * @tc unused + */ +void watch_notifier (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct GNUNET_PEERSTORE_Record *record = cls; + struct GNUNET_HashCode keyhash; + + GNUNET_log(GNUNET_ERROR_TYPE_INFO, "Sending update to any watchers.\n"); + PEERSTORE_hash_key(record->sub_system, + record->peer, + record->key, + &keyhash); + GNUNET_CONTAINER_multihashmap_get_multiple(watchers, &keyhash, &watch_notifier_it, record); +} + +/** * Handle a watch cancel request from client * * @param cls unused @@ -167,13 +226,6 @@ void handle_watch_cancel (void *cls, struct StoreKeyHashMessage *hm; GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request from client.\n"); - if(NULL == watchers) - { - GNUNET_log(GNUNET_ERROR_TYPE_WARNING, - "Received a watch cancel request when we don't have any watchers.\n"); - GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); - return; - } hm = (struct StoreKeyHashMessage *) message; GNUNET_CONTAINER_multihashmap_remove(watchers, &hm->keyhash, client); GNUNET_SERVER_receive_done(client, GNUNET_OK); @@ -195,8 +247,7 @@ void handle_watch (void *cls, GNUNET_log(GNUNET_ERROR_TYPE_DEBUG, "Received a watch request from client.\n"); hm = (struct StoreKeyHashMessage *) message; GNUNET_SERVER_client_mark_monitor(client); - if(NULL == watchers) - watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); + GNUNET_SERVER_notification_context_add(nc, client); GNUNET_CONTAINER_multihashmap_put(watchers, &hm->keyhash, client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_SERVER_receive_done(client, GNUNET_OK); @@ -246,7 +297,7 @@ void handle_iterate (void *cls, GNUNET_free(tc); GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); } - GNUNET_free(record); + GNUNET_free(record); /* FIXME: destroy record */ } /** @@ -261,7 +312,6 @@ void handle_store (void *cls, const struct GNUNET_MessageHeader *message) { struct GNUNET_PEERSTORE_Record *record; - uint16_t response_type; struct GNUNET_SERVER_TransmitContext *tc; record = PEERSTORE_parse_record_message(message); @@ -275,6 +325,7 @@ void handle_store (void *cls, || NULL == record->peer || NULL == record->key) { + /* FIXME: Destroy record */ GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Full key not supplied in client store request\n"); GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); return; @@ -284,7 +335,7 @@ void handle_store (void *cls, record->sub_system, GNUNET_i2s (record->peer), record->key); - if(GNUNET_OK == db->store_record(db->cls, + if(GNUNET_OK != db->store_record(db->cls, record->sub_system, record->peer, record->key, @@ -292,18 +343,15 @@ void handle_store (void *cls, record->value_size, *record->expiry)) { - response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK; - } - else - { + /* FIXME: Destroy record */ GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Failed to store requested value, sqlite database error."); - response_type = GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_FAIL; + GNUNET_SERVER_receive_done(client, GNUNET_SYSERR); + return; } - tc = GNUNET_SERVER_transmit_context_create (client); - GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, response_type); + GNUNET_SERVER_transmit_context_append_data(tc, NULL, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE_RESULT_OK); GNUNET_SERVER_transmit_context_run (tc, GNUNET_TIME_UNIT_FOREVER_REL); - //TODO: notify watchers, if a client is disconnected, remove its watch entry + GNUNET_SCHEDULER_add_continuation(&watch_notifier, record, -1); } /** @@ -343,6 +391,8 @@ run (void *cls, GNUNET_log(GNUNET_ERROR_TYPE_ERROR, "Could not load database backend `%s'\n", db_lib_name); else { + nc = GNUNET_SERVER_notification_context_create (server, 16); + watchers = GNUNET_CONTAINER_multihashmap_create(10, GNUNET_NO); GNUNET_SCHEDULER_add_now(&cleanup_expired_records, NULL); GNUNET_SERVER_add_handlers (server, handlers); GNUNET_SERVER_disconnect_notify (server, |