diff options
author | David Barksdale <amatus.amongus@gmail.com> | 2014-12-15 00:32:17 +0000 |
---|---|---|
committer | David Barksdale <amatus.amongus@gmail.com> | 2014-12-15 00:32:17 +0000 |
commit | bd94aa6fe80a7687c3727ebcdb3ba5185d3b8b11 (patch) | |
tree | 02c23cc57494a495c9da5f8e88e282a376ba4016 | |
parent | 6c8fa85819a2b02b3c4a175a08c1779283eda209 (diff) |
Implement asynchronous peerstore plugin API
Resolves #3506
-rw-r--r-- | src/include/gnunet_peerstore_plugin.h | 25 | ||||
-rw-r--r-- | src/include/gnunet_peerstore_service.h | 4 | ||||
-rw-r--r-- | src/peerstore/gnunet-service-peerstore.c | 96 | ||||
-rw-r--r-- | src/peerstore/plugin_peerstore_sqlite.c | 41 |
4 files changed, 127 insertions, 39 deletions
diff --git a/src/include/gnunet_peerstore_plugin.h b/src/include/gnunet_peerstore_plugin.h index 8eea796bb7..4ea68f31d1 100644 --- a/src/include/gnunet_peerstore_plugin.h +++ b/src/include/gnunet_peerstore_plugin.h @@ -59,7 +59,11 @@ struct GNUNET_PEERSTORE_PluginFunctions * @param peer peer identity * @param value value to be stored * @param size size of value to be stored - * @return #GNUNET_OK on success, else #GNUNET_SYSERR + * @param expiry absolute time after which the record is (possibly) deleted + * @param options options related to the store operation + * @param cont continuation called when record is stored + * @param cont_cls continuation closure + * @return #GNUNET_OK on success, else #GNUNET_SYSERR and cont is not called */ int (*store_record) (void *cls, @@ -69,7 +73,9 @@ struct GNUNET_PEERSTORE_PluginFunctions const void *value, size_t size, struct GNUNET_TIME_Absolute expiry, - enum GNUNET_PEERSTORE_StoreOption options); + enum GNUNET_PEERSTORE_StoreOption options, + GNUNET_PEERSTORE_Continuation cont, + void *cont_cls); /** * Iterate over the records given an optional peer id @@ -79,9 +85,11 @@ struct GNUNET_PEERSTORE_PluginFunctions * @param sub_system name of sub system * @param peer Peer identity (can be NULL) * @param key entry key string (can be NULL) - * @param iter function to call with the result + * @param iter function to call asynchronously with the results, terminated + * by a NULL result * @param iter_cls closure for @a iter - * @return #GNUNET_OK on success, #GNUNET_SYSERR on error + * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and iter is not + * called */ int (*iterate_records) (void *cls, @@ -95,11 +103,16 @@ struct GNUNET_PEERSTORE_PluginFunctions * * @param cls closure (internal context for the plugin) * @param now time to use as reference - * @return number of records deleted + * @param cont continuation called with the number of records expired + * @param cont_cls continuation closure + * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and cont is not + * called */ int (*expire_records) (void *cls, - struct GNUNET_TIME_Absolute now); + struct GNUNET_TIME_Absolute now, + GNUNET_PEERSTORE_Continuation cont, + void *cont_cls); }; diff --git a/src/include/gnunet_peerstore_service.h b/src/include/gnunet_peerstore_service.h index 73ecadc8b9..a3fed7065c 100644 --- a/src/include/gnunet_peerstore_service.h +++ b/src/include/gnunet_peerstore_service.h @@ -102,6 +102,10 @@ struct GNUNET_PEERSTORE_Record */ struct GNUNET_TIME_Absolute *expiry; + /** + * Client from which this record originated + */ + struct GNUNET_SERVER_Client *client; }; /** diff --git a/src/peerstore/gnunet-service-peerstore.c b/src/peerstore/gnunet-service-peerstore.c index ed5b14eb9b..f8ec631b93 100644 --- a/src/peerstore/gnunet-service-peerstore.c +++ b/src/peerstore/gnunet-service-peerstore.c @@ -136,6 +136,10 @@ shutdown_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) } +/* Forward declaration */ +static void expire_records_continuation (void *cls, int success); + + /** * Deletes any expired records from storage */ @@ -143,14 +147,34 @@ static void cleanup_expired_records (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) { - int deleted; + int ret; if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; GNUNET_assert (NULL != db); - deleted = db->expire_records (db->cls, GNUNET_TIME_absolute_get ()); - if (deleted > 0) - GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", deleted); + ret = db->expire_records (db->cls, GNUNET_TIME_absolute_get (), + expire_records_continuation, NULL); + if (GNUNET_OK != ret) + { + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply + (GNUNET_TIME_UNIT_SECONDS, + EXPIRED_RECORDS_CLEANUP_INTERVAL), + &cleanup_expired_records, NULL); + } +} + + +/** + * Continuation to expire_records called by the peerstore plugin + * + * @param cls unused + * @param success count of records deleted or #GNUNET_SYSERR + */ +static void +expire_records_continuation(void *cls, int success) +{ + if (success > 0) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", success); GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL), @@ -217,15 +241,32 @@ handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) static int record_iterator (void *cls, struct GNUNET_PEERSTORE_Record *record, char *emsg) { - struct GNUNET_SERVER_Client *client = cls; + struct GNUNET_PEERSTORE_Record *cls_record = cls; struct StoreRecordMessage *srm; + if (NULL == record) + { + /* No more records */ + struct GNUNET_MessageHeader *endmsg; + + endmsg = GNUNET_new (struct GNUNET_MessageHeader); + endmsg->size = htons (sizeof (struct GNUNET_MessageHeader)); + endmsg->type = htons (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END); + GNUNET_SERVER_notification_context_unicast (nc, cls_record->client, endmsg, + GNUNET_NO); + GNUNET_free (endmsg); + GNUNET_SERVER_receive_done (cls_record->client, + NULL == emsg ? GNUNET_OK : GNUNET_SYSERR); + PEERSTORE_destroy_record (cls_record); + return GNUNET_NO; + } + srm = PEERSTORE_create_record_message (record->sub_system, record->peer, record->key, record->value, record->value_size, record->expiry, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD); - GNUNET_SERVER_notification_context_unicast (nc, client, + GNUNET_SERVER_notification_context_unicast (nc, cls_record->client, (struct GNUNET_MessageHeader *) srm, GNUNET_NO); GNUNET_free (srm); @@ -334,7 +375,6 @@ handle_iterate (void *cls, struct GNUNET_SERVER_Client *client, const struct GNUNET_MessageHeader *message) { struct GNUNET_PEERSTORE_Record *record; - struct GNUNET_MessageHeader *endmsg; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received an iterate request.\n"); record = PEERSTORE_parse_record_message (message); @@ -358,21 +398,32 @@ handle_iterate (void *cls, struct GNUNET_SERVER_Client *client, (NULL == record->peer) ? "NULL" : GNUNET_i2s (record->peer), (NULL == record->key) ? "NULL" : record->key); GNUNET_SERVER_notification_context_add (nc, client); - if (GNUNET_OK == + record->client = client; + if (GNUNET_OK != db->iterate_records (db->cls, record->sub_system, record->peer, - record->key, &record_iterator, client)) + record->key, &record_iterator, record)) { - endmsg = GNUNET_new (struct GNUNET_MessageHeader); - - endmsg->size = htons (sizeof (struct GNUNET_MessageHeader)); - endmsg->type = htons (GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END); - GNUNET_SERVER_notification_context_unicast (nc, client, endmsg, GNUNET_NO); - GNUNET_free (endmsg); - GNUNET_SERVER_receive_done (client, GNUNET_OK); + GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + PEERSTORE_destroy_record (record); } - else +} + + +/** + * Continuation of store_record called by the peerstore plugin + * + * @param cls closure + * @param success result + */ +static void +store_record_continuation (void *cls, int success) +{ + struct GNUNET_PEERSTORE_Record *record = cls; + + GNUNET_SERVER_receive_done (record->client, success); + if (GNUNET_OK == success) { - GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); + watch_notifier (record); } PEERSTORE_destroy_record (record); } @@ -418,20 +469,19 @@ handle_store (void *cls, struct GNUNET_SERVER_Client *client, " Options: %d.\n", record->value_size, record->sub_system, GNUNET_i2s (record->peer), record->key, record->value_size, ntohl (srm->options)); + record->client = client; if (GNUNET_OK != db->store_record (db->cls, record->sub_system, record->peer, record->key, record->value, record->value_size, *record->expiry, - ntohl (srm->options))) + ntohl (srm->options), store_record_continuation, + record)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - _("Failed to store requested value, sqlite database error.")); + _("Failed to store requested value, database error.")); PEERSTORE_destroy_record (record); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); return; } - GNUNET_SERVER_receive_done (client, GNUNET_OK); - watch_notifier (record); - PEERSTORE_destroy_record (record); } diff --git a/src/peerstore/plugin_peerstore_sqlite.c b/src/peerstore/plugin_peerstore_sqlite.c index fc644d9b7d..cd402aaae8 100644 --- a/src/peerstore/plugin_peerstore_sqlite.c +++ b/src/peerstore/plugin_peerstore_sqlite.c @@ -160,10 +160,15 @@ peerstore_sqlite_delete_records (void *cls, const char *sub_system, * * @param cls closure (internal context for the plugin) * @param now time to use as reference - * @return number of records deleted + * @param cont continuation called with the number of records expired + * @param cont_cls continuation closure + * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and cont is not + * called */ static int -peerstore_sqlite_expire_records (void *cls, struct GNUNET_TIME_Absolute now) +peerstore_sqlite_expire_records (void *cls, struct GNUNET_TIME_Absolute now, + GNUNET_PEERSTORE_Continuation cont, + void *cont_cls) { struct Plugin *plugin = cls; sqlite3_stmt *stmt = plugin->expire_peerstoredata; @@ -183,9 +188,13 @@ peerstore_sqlite_expire_records (void *cls, struct GNUNET_TIME_Absolute now) { LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - return 0; + return GNUNET_SYSERR; } - return sqlite3_changes (plugin->dbh); + if (NULL != cont) + { + cont (cont_cls, sqlite3_changes (plugin->dbh)); + } + return GNUNET_OK; } @@ -197,9 +206,11 @@ peerstore_sqlite_expire_records (void *cls, struct GNUNET_TIME_Absolute now) * @param sub_system name of sub system * @param peer Peer identity (can be NULL) * @param key entry key string (can be NULL) - * @param iter function to call with the result + * @param iter function to call asynchronously with the results, terminated + * by a NULL result * @param iter_cls closure for @a iter - * @return #GNUNET_OK on success, #GNUNET_SYSERR on error + * @return #GNUNET_OK on success, #GNUNET_SYSERR on error and iter is not + * called */ static int peerstore_sqlite_iterate_records (void *cls, const char *sub_system, @@ -296,8 +307,10 @@ peerstore_sqlite_iterate_records (void *cls, const char *sub_system, "sqlite3_reset"); err = 1; } - if (err) - return GNUNET_SYSERR; + if (NULL != iter) + { + iter (iter_cls, NULL, err ? "sqlite error" : NULL); + } return GNUNET_OK; } @@ -315,14 +328,18 @@ peerstore_sqlite_iterate_records (void *cls, const char *sub_system, * @param size size of value to be stored * @param expiry absolute time after which the record is (possibly) deleted * @param options options related to the store operation - * @return #GNUNET_OK on success, else #GNUNET_SYSERR + * @param cont continuation called when record is stored + * @param cont_cls continuation closure + * @return #GNUNET_OK on success, else #GNUNET_SYSERR and cont is not called */ static int peerstore_sqlite_store_record (void *cls, const char *sub_system, const struct GNUNET_PeerIdentity *peer, const char *key, const void *value, size_t size, struct GNUNET_TIME_Absolute expiry, - enum GNUNET_PEERSTORE_StoreOption options) + enum GNUNET_PEERSTORE_StoreOption options, + GNUNET_PEERSTORE_Continuation cont, + void *cont_cls) { struct Plugin *plugin = cls; sqlite3_stmt *stmt = plugin->insert_peerstoredata; @@ -355,6 +372,10 @@ peerstore_sqlite_store_record (void *cls, const char *sub_system, "sqlite3_reset"); return GNUNET_SYSERR; } + if (NULL != cont) + { + cont (cont_cls, GNUNET_OK); + } return GNUNET_OK; } |