diff options
author | grothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96> | 2011-04-03 20:00:42 +0000 |
---|---|---|
committer | grothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96> | 2011-04-03 20:00:42 +0000 |
commit | 364381bcb0892c50678c7009a7d1b360f74b3de1 (patch) | |
tree | 7a056253bddc61b1e6a3258567e176af128c2b67 /src/datastore/plugin_datastore_sqlite.c | |
parent | 3c0cbb657a68aa2c114dbe507a3e690e8554b268 (diff) |
improving datastore API --- not working yet
git-svn-id: https://gnunet.org/svn/gnunet@14835 140774ce-b5e7-0310-ab8b-a85725594a96
Diffstat (limited to 'src/datastore/plugin_datastore_sqlite.c')
-rw-r--r-- | src/datastore/plugin_datastore_sqlite.c | 706 |
1 files changed, 282 insertions, 424 deletions
diff --git a/src/datastore/plugin_datastore_sqlite.c b/src/datastore/plugin_datastore_sqlite.c index b8661f46dd..b05a0a9c1f 100644 --- a/src/datastore/plugin_datastore_sqlite.c +++ b/src/datastore/plugin_datastore_sqlite.c @@ -38,43 +38,25 @@ */ #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } while(0) -#define SELECT_IT_LOW_PRIORITY_1 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash > ?) "\ - "ORDER BY hash ASC LIMIT 1" - -#define SELECT_IT_LOW_PRIORITY_2 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio > ?) "\ - "ORDER BY prio ASC, hash ASC LIMIT 1" #define SELECT_IT_NON_ANONYMOUS_1 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio = ?1 AND expire > %llu AND anonLevel = 0 AND hash < ?2) "\ " ORDER BY hash DESC LIMIT 1" #define SELECT_IT_NON_ANONYMOUS_2 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ? AND anonLevel = 0 AND expire > %llu)"\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio < ?1 AND expire > %llu AND anonLevel = 0)"\ " ORDER BY prio DESC, hash DESC LIMIT 1" -#define SELECT_IT_EXPIRATION_TIME_1 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash > ?) "\ - " ORDER BY hash ASC LIMIT 1" - -#define SELECT_IT_EXPIRATION_TIME_2 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\ - " ORDER BY expire ASC, hash ASC LIMIT 1" - -#define SELECT_IT_MIGRATION_ORDER_1 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire = ? AND hash < ?) "\ - " ORDER BY hash DESC LIMIT 1" - -#define SELECT_IT_MIGRATION_ORDER_2 \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ? AND expire > %llu) "\ - " ORDER BY expire DESC, hash DESC LIMIT 1" - #define SELECT_IT_REPLICATION_ORDER \ - "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?) "\ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire > ?1) "\ " ORDER BY repl DESC, Random() LIMIT 1" +#define SELECT_IT_EXPIRATION_ORDER \ + "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (expire < ?1) "\ + " OR NOT EXISTS (SELECT 1 from gn090 WHERE (expire < ?1)) "\ + " ORDER BY prio ASC LIMIT 1" + /** * After how many ms "busy" should a DB operation fail for good? @@ -126,11 +108,16 @@ struct Plugin sqlite3_stmt *updRepl; /** - * Precompiled SQL for replication decrement. + * Precompiled SQL for replication selection. */ sqlite3_stmt *selRepl; /** + * Precompiled SQL for expiration selection. + */ + sqlite3_stmt *selExpi; + + /** * Precompiled SQL for insertion. */ sqlite3_stmt *insertContent; @@ -162,18 +149,23 @@ struct Plugin * @return 0 on success */ static int -sq_prepare (sqlite3 * dbh, const char *zSql, +sq_prepare (sqlite3 * dbh, + const char *zSql, sqlite3_stmt ** ppStmt) { char *dummy; int result; result = sqlite3_prepare_v2 (dbh, zSql, - strlen (zSql), ppStmt, (const char **) &dummy); + strlen (zSql), + ppStmt, + (const char **) &dummy); #if DEBUG_SQLITE GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", - "Prepared %p: %d\n", *ppStmt, result); + "Prepared %p: %d\n", + *ppStmt, + result); #endif return result; } @@ -190,21 +182,15 @@ create_indices (sqlite3 * dbh) /* create indices */ sqlite3_exec (dbh, "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL); - sqlite3_exec (dbh, - "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL, - NULL, NULL); sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)", NULL, NULL, + sqlite3_exec (dbh, "CREATE INDEX idx_expire_prio ON gn090 (expire,prio)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)", NULL, + sqlite3_exec (dbh, + "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)", + sqlite3_exec (dbh, "CREATE INDEX idx_comb ON gn090 (prio,expire,anonLevel,hash)", NULL, NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)", NULL, - NULL, NULL); - sqlite3_exec (dbh, "CREATE INDEX idx_comb8 ON gn090 (expire)", NULL, - NULL, NULL); } @@ -358,6 +344,9 @@ database_setup (const struct GNUNET_CONFIGURATION_Handle *cfg, SELECT_IT_REPLICATION_ORDER, &plugin->selRepl) != SQLITE_OK) || (sq_prepare (plugin->dbh, + SELECT_IT_EXPIRATION_ORDER, + &plugin->selExpi) != SQLITE_OK) || + (sq_prepare (plugin->dbh, "INSERT INTO gn090 (repl, type, prio, " "anonLevel, expire, hash, vhash, value) VALUES " "(?, ?, ?, ?, ?, ?, ?, ?)", @@ -396,6 +385,8 @@ database_shutdown (struct Plugin *plugin) sqlite3_finalize (plugin->updRepl); if (plugin->selRepl != NULL) sqlite3_finalize (plugin->selRepl); + if (plugin->selExpi != NULL) + sqlite3_finalize (plugin->selExpi); if (plugin->insertContent != NULL) sqlite3_finalize (plugin->insertContent); result = sqlite3_close(plugin->dbh); @@ -457,9 +448,9 @@ delete_by_rowid (struct Plugin* plugin, return GNUNET_SYSERR; } if (SQLITE_OK != sqlite3_reset (plugin->delRow)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); return GNUNET_OK; } @@ -532,11 +523,6 @@ struct NextContext GNUNET_HashCode lastKey; /** - * Expiration time of the last value visited. - */ - struct GNUNET_TIME_Absolute lastExpiration; - - /** * Priority of the last value visited. */ unsigned int lastPriority; @@ -566,15 +552,14 @@ sqlite_next_request_cont (void *cls, struct NextContext * nc = cls; struct Plugin *plugin; unsigned long long rowid; - sqlite3_stmt *stmtd; int ret; - unsigned int type; unsigned int size; - unsigned int priority; - unsigned int anonymity; - struct GNUNET_TIME_Absolute expiration; + uint32_t anonymity; + uint32_t priority; + enum GNUNET_BLOCK_Type type; const GNUNET_HashCode *key; - const void *data; + struct GNUNET_TIME_Absolute expiration; + char data[GNUNET_SERVER_MAX_MESSAGE_SIZE]; plugin = nc->plugin; plugin->next_task = GNUNET_SCHEDULER_NO_TASK; @@ -592,90 +577,72 @@ sqlite_next_request_cont (void *cls, return; } - rowid = sqlite3_column_int64 (nc->stmt, 6); - nc->last_rowid = rowid; type = sqlite3_column_int (nc->stmt, 0); + priority = sqlite3_column_int (nc->stmt, 1); + anonymity = sqlite3_column_int (nc->stmt, 2); + expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3); + key = sqlite3_column_blob (nc->stmt, 4); size = sqlite3_column_bytes (nc->stmt, 5); + memcpy (data, sqlite3_column_blob (nc->stmt, 5), size); + rowid = sqlite3_column_int64 (nc->stmt, 6); if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode)) { GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "sqlite", _("Invalid data in database. Trying to fix (by deletion).\n")); if (SQLITE_OK != sqlite3_reset (nc->stmt)) - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - if (sq_prepare - (nc->plugin->dbh, - "DELETE FROM gn090 WHERE NOT LENGTH(hash) = ?", - &stmtd) != SQLITE_OK) - { - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, - "sq_prepare"); - goto END; - } - - if (SQLITE_OK != sqlite3_bind_int (stmtd, 1, sizeof (GNUNET_HashCode))) - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_int"); - if (SQLITE_DONE != sqlite3_step (stmtd)) - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); - if (SQLITE_OK != sqlite3_finalize (stmtd)) - LOG_SQLITE (nc->plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_finalize"); + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + if (GNUNET_OK == delete_by_rowid (plugin, rowid)) + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); goto END; } - - priority = sqlite3_column_int (nc->stmt, 1); - anonymity = sqlite3_column_int (nc->stmt, 2); - expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3); - key = sqlite3_column_blob (nc->stmt, 4); - nc->lastPriority = priority; - nc->lastExpiration = expiration; - memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode)); - data = sqlite3_column_blob (nc->stmt, 5); nc->count++; - ret = nc->iter (nc->iter_cls, - nc, + nc->last_rowid = rowid; + nc->lastPriority = priority; + nc->lastKey = *key; + if (SQLITE_OK != sqlite3_reset (nc->stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + ret = nc->iter (nc->iter_cls, nc, key, - size, - data, - type, - priority, - anonymity, - expiration, + size, data, + type, priority, + anonymity, expiration, rowid); - if (ret == GNUNET_SYSERR) + switch (ret) { + case GNUNET_SYSERR: nc->end_it = GNUNET_YES; - return; - } -#if DEBUG_SQLITE - if (ret == GNUNET_NO) - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Asked to remove entry %llu (%u bytes)\n", - (unsigned long long) rowid, - size + GNUNET_DATASTORE_ENTRY_OVERHEAD); -#endif - if ( (ret == GNUNET_NO) && - (GNUNET_OK == delete_by_rowid (plugin, rowid)) ) - { - plugin->env->duc (plugin->env->cls, - - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); + break; + case GNUNET_NO: #if DEBUG_SQLITE GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite", - "Removed entry %llu (%u bytes)\n", + "Asked to remove entry %llu (%u bytes)\n", (unsigned long long) rowid, size + GNUNET_DATASTORE_ENTRY_OVERHEAD); #endif + if (GNUNET_OK == delete_by_rowid (plugin, rowid)) + { + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Removed entry %llu (%u bytes)\n", + (unsigned long long) rowid, + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); +#endif + } + break; + case GNUNET_YES: + break; + default: + GNUNET_break (0); } } @@ -723,7 +690,7 @@ sqlite_next_request (void *next_cls, */ static int sqlite_plugin_put (void *cls, - const GNUNET_HashCode * key, + const GNUNET_HashCode *key, uint32_t size, const void *data, enum GNUNET_BLOCK_Type type, @@ -774,37 +741,39 @@ sqlite_plugin_put (void *cls, return GNUNET_SYSERR; } n = sqlite3_step (stmt); - if (n != SQLITE_DONE) + switch (n) { - if (n == SQLITE_BUSY) - { - LOG_SQLITE (plugin, msg, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); - sqlite3_reset (stmt); - GNUNET_break (0); - return GNUNET_NO; - } + case SQLITE_DONE: + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | + GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + plugin->env->duc (plugin->env->cls, + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Stored new entry (%u bytes)\n", + size + GNUNET_DATASTORE_ENTRY_OVERHEAD); +#endif + return GNUNET_OK; + case SQLITE_BUSY: + GNUNET_break (0); + LOG_SQLITE (plugin, msg, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + sqlite3_reset (stmt); + return GNUNET_SYSERR; + default: LOG_SQLITE (plugin, msg, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); sqlite3_reset (stmt); database_shutdown (plugin); database_setup (plugin->env->cfg, plugin); - return GNUNET_SYSERR; + return GNUNET_SYSERR; } - if (SQLITE_OK != sqlite3_reset (stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | - GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - plugin->env->duc (plugin->env->cls, - size + GNUNET_DATASTORE_ENTRY_OVERHEAD); -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Stored new entry (%u bytes)\n", - size + GNUNET_DATASTORE_ENTRY_OVERHEAD); -#endif - return GNUNET_OK; } @@ -844,21 +813,27 @@ sqlite_plugin_update (void *cls, sqlite3_bind_int64 (plugin->updPrio, 2, expire.abs_value); sqlite3_bind_int64 (plugin->updPrio, 3, uid); n = sqlite3_step (plugin->updPrio); - if (n != SQLITE_DONE) - LOG_SQLITE (plugin, msg, - GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, - "sqlite3_step"); + sqlite3_reset (plugin->updPrio); + switch (n) + { + case SQLITE_DONE: #if DEBUG_SQLITE - else - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Block updated\n"); + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Block updated\n"); #endif - sqlite3_reset (plugin->updPrio); - - if (n == SQLITE_BUSY) - return GNUNET_NO; - return n == SQLITE_DONE ? GNUNET_OK : GNUNET_SYSERR; + return GNUNET_OK; + case SQLITE_BUSY: + LOG_SQLITE (plugin, msg, + GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + return GNUNET_NO; + default: + LOG_SQLITE (plugin, msg, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + return GNUNET_SYSERR; + } } @@ -878,26 +853,6 @@ struct IterContext sqlite3_stmt *stmt_2; /** - * FIXME. - */ - int is_asc; - - /** - * FIXME. - */ - int is_prio; - - /** - * FIXME. - */ - int is_migr; - - /** - * FIXME. - */ - int limit_nonanonymous; - - /** * Desired type for blocks returned by this iterator. */ enum GNUNET_BLOCK_Type type; @@ -934,26 +889,13 @@ iter_next_prepare (void *cls, sqlite3_reset (ic->stmt_1); sqlite3_reset (ic->stmt_2); plugin = nc->plugin; - if (ic->is_prio) - { -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Restricting to results larger than the last priority %u\n", - nc->lastPriority); -#endif - sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority); - sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority); - } - else - { #if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Restricting to results larger than the last expiration %llu\n", - (unsigned long long) nc->lastExpiration.abs_value); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Restricting to results larger than the last priority %u\n", + nc->lastPriority); #endif - sqlite3_bind_int64 (ic->stmt_1, 1, nc->lastExpiration.abs_value); - sqlite3_bind_int64 (ic->stmt_2, 1, nc->lastExpiration.abs_value); - } + sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority); + sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority); #if DEBUG_SQLITE GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Restricting to results larger than the last key `%s'\n", @@ -1016,63 +958,56 @@ iter_next_prepare (void *cls, /** - * Call a method for each key in the database and - * call the callback method on it. + * Select a subset of the items in the datastore and call + * the given iterator for each of them. * - * @param plugin our plugin context + * @param cls our plugin context * @param type entries of which type should be considered? - * @param is_asc are we iterating in ascending order? - * @param is_prio are we iterating by priority (otherwise by expiration) - * @param is_migr are we iterating in migration order? - * @param limit_nonanonymous are we restricting results to those with anonymity - * level zero? - * @param stmt_str_1 first SQL statement to execute - * @param stmt_str_2 SQL statement to execute to get "more" results (inner iteration) + * Use 0 for any type. * @param iter function to call on each matching value; * will be called once with a NULL value at the end * @param iter_cls closure for iter */ static void -basic_iter (struct Plugin *plugin, - enum GNUNET_BLOCK_Type type, - int is_asc, - int is_prio, - int is_migr, - int limit_nonanonymous, - const char *stmt_str_1, - const char *stmt_str_2, - PluginIterator iter, - void *iter_cls) +sqlite_plugin_iter_zero_anonymity (void *cls, + enum GNUNET_BLOCK_Type type, + PluginIterator iter, + void *iter_cls) { + struct Plugin *plugin = cls; + struct GNUNET_TIME_Absolute now; struct NextContext *nc; struct IterContext *ic; sqlite3_stmt *stmt_1; sqlite3_stmt *stmt_2; + char *q; -#if DEBUG_SQLITE - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "At %llu, using queries `%s' and `%s'\n", - (unsigned long long) GNUNET_TIME_absolute_get ().abs_value, - stmt_str_1, - stmt_str_2); -#endif - if (sq_prepare (plugin->dbh, stmt_str_1, &stmt_1) != SQLITE_OK) + now = GNUNET_TIME_absolute_get (); + GNUNET_asprintf (&q, SELECT_IT_NON_ANONYMOUS_1, + (unsigned long long) now.abs_value); + if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK) { LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (q); return; } - if (sq_prepare (plugin->dbh, stmt_str_2, &stmt_2) != SQLITE_OK) + GNUNET_free (q); + GNUNET_asprintf (&q, SELECT_IT_NON_ANONYMOUS_2, + (unsigned long long) now.abs_value); + if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK) { LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2"); sqlite3_finalize (stmt_1); iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_free (q); return; } + GNUNET_free (q); nc = GNUNET_malloc (sizeof(struct NextContext) + sizeof(struct IterContext)); nc->plugin = plugin; @@ -1083,166 +1018,15 @@ basic_iter (struct Plugin *plugin, ic->stmt_1 = stmt_1; ic->stmt_2 = stmt_2; ic->type = type; - ic->is_asc = is_asc; - ic->is_prio = is_prio; - ic->is_migr = is_migr; - ic->limit_nonanonymous = limit_nonanonymous; nc->prep = &iter_next_prepare; nc->prep_cls = ic; - if (is_asc) - { - nc->lastPriority = 0; - nc->lastExpiration.abs_value = 0; - memset (&nc->lastKey, 0, sizeof (GNUNET_HashCode)); - } - else - { - nc->lastPriority = 0x7FFFFFFF; - nc->lastExpiration.abs_value = 0x7FFFFFFFFFFFFFFFLL; - memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode)); - } + nc->lastPriority = 0x7FFFFFFF; + memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode)); sqlite_next_request (nc, GNUNET_NO); } /** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our plugin context - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -sqlite_plugin_iter_low_priority (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - basic_iter (cls, - type, - GNUNET_YES, GNUNET_YES, - GNUNET_NO, GNUNET_NO, - SELECT_IT_LOW_PRIORITY_1, - SELECT_IT_LOW_PRIORITY_2, - iter, iter_cls); -} - - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our plugin context - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -sqlite_plugin_iter_zero_anonymity (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - struct GNUNET_TIME_Absolute now; - char *q1; - char *q2; - - now = GNUNET_TIME_absolute_get (); - GNUNET_asprintf (&q1, SELECT_IT_NON_ANONYMOUS_1, - (unsigned long long) now.abs_value); - GNUNET_asprintf (&q2, SELECT_IT_NON_ANONYMOUS_2, - (unsigned long long) now.abs_value); - basic_iter (cls, - type, - GNUNET_NO, GNUNET_YES, - GNUNET_NO, GNUNET_YES, - q1, - q2, - iter, iter_cls); - GNUNET_free (q1); - GNUNET_free (q2); -} - - - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our plugin context - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -sqlite_plugin_iter_ascending_expiration (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - struct GNUNET_TIME_Absolute now; - char *q1; - char *q2; - - now = GNUNET_TIME_absolute_get (); - GNUNET_asprintf (&q1, SELECT_IT_EXPIRATION_TIME_1, - (unsigned long long) 0*now.abs_value); - GNUNET_asprintf (&q2, SELECT_IT_EXPIRATION_TIME_2, - (unsigned long long) 0*now.abs_value); - basic_iter (cls, - type, - GNUNET_YES, GNUNET_NO, - GNUNET_NO, GNUNET_NO, - q1, q2, - iter, iter_cls); - GNUNET_free (q1); - GNUNET_free (q2); -} - - -/** - * Select a subset of the items in the datastore and call - * the given iterator for each of them. - * - * @param cls our plugin context - * @param type entries of which type should be considered? - * Use 0 for any type. - * @param iter function to call on each matching value; - * will be called once with a NULL value at the end - * @param iter_cls closure for iter - */ -static void -sqlite_plugin_iter_migration_order (void *cls, - enum GNUNET_BLOCK_Type type, - PluginIterator iter, - void *iter_cls) -{ - struct GNUNET_TIME_Absolute now; - char *q; - - now = GNUNET_TIME_absolute_get (); - GNUNET_asprintf (&q, SELECT_IT_MIGRATION_ORDER_2, - (unsigned long long) now.abs_value); - basic_iter (cls, - type, - GNUNET_NO, GNUNET_NO, - GNUNET_YES, GNUNET_NO, - SELECT_IT_MIGRATION_ORDER_1, - q, - iter, iter_cls); - GNUNET_free (q); -} - - -/** * Call sqlite using the already prepared query to get * the next result. * @@ -1271,19 +1055,20 @@ all_next_prepare (void *cls, return GNUNET_SYSERR; } plugin = nc->plugin; - if (SQLITE_ROW == (ret = sqlite3_step (nc->stmt))) - { - return GNUNET_OK; - } - if (ret != SQLITE_DONE) + ret = sqlite3_step (nc->stmt); + switch (ret) { + case SQLITE_ROW: + return GNUNET_OK; + case SQLITE_DONE: + return GNUNET_NO; + default: LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_step"); return GNUNET_SYSERR; } - return GNUNET_NO; } @@ -1466,7 +1251,7 @@ sqlite_plugin_get (void *cls, GNUNET_assert (iter != NULL); if (key == NULL) { - sqlite_plugin_iter_low_priority (cls, type, iter, iter_cls); + sqlite_plugin_iter_all_now (cls, type, iter, iter_cls); return; } GNUNET_snprintf (scratch, sizeof (scratch), @@ -1561,46 +1346,30 @@ sqlite_plugin_get (void *cls, /** - * Get a random item for replication. Returns a single, not expired, random item - * from those with the highest replication counters. The item's - * replication counter is decremented by one IF it was positive before. - * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * Execute statement that gets a row and call the iterator + * with the result. Resets the statement afterwards. * - * @param cls closure - * @param iter function to call the value (once only). - * @param iter_cls closure for iter + * @param plugin the plugin + * @param stmt the statement + * @param iter iterator to call + * @param iter_cls closure for 'iter' */ static void -sqlite_plugin_replication_get (void *cls, - PluginIterator iter, void *iter_cls) +execute_get (struct Plugin *plugin, + sqlite3_stmt *stmt, + PluginIterator iter, void *iter_cls) { - struct Plugin *plugin = cls; int n; - sqlite3_stmt *stmt; struct GNUNET_TIME_Absolute expiration; unsigned long long rowid; + unsigned int size; + int ret; -#if DEBUG_SQLITE - GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, - "sqlite", - "Getting random block based on replication order.\n"); -#endif - stmt = plugin->selRepl; - if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, expiration.abs_value)) - { - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); - if (SQLITE_OK != sqlite3_reset (stmt)) - LOG_SQLITE (plugin, NULL, - GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - return; - } n = sqlite3_step (stmt); switch (n) { case SQLITE_ROW: + size = sqlite3_column_bytes (stmt, 5); rowid = sqlite3_column_int64 (stmt, 6); if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode)) { @@ -1611,24 +1380,30 @@ sqlite_plugin_replication_get (void *cls, LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); - delete_by_rowid (plugin, rowid); + if (GNUNET_OK == delete_by_rowid (plugin, rowid)) + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); break; } expiration.abs_value = sqlite3_column_int64 (stmt, 3); - (void) iter (iter_cls, - NULL, - sqlite3_column_blob (stmt, 4) /* key */, - sqlite3_column_bytes (stmt, 5) /* size of data */, - sqlite3_column_blob (stmt, 5) /* data */, - sqlite3_column_int (stmt, 0) /* type */, - sqlite3_column_int (stmt, 1) /* priority */, - sqlite3_column_int (stmt, 2) /* anonymity */, - expiration, - rowid); + ret = iter (iter_cls, + NULL, + sqlite3_column_blob (stmt, 4) /* key */, + size, + sqlite3_column_blob (stmt, 5) /* data */, + sqlite3_column_int (stmt, 0) /* type */, + sqlite3_column_int (stmt, 1) /* priority */, + sqlite3_column_int (stmt, 2) /* anonymity */, + expiration, + rowid); if (SQLITE_OK != sqlite3_reset (stmt)) LOG_SQLITE (plugin, NULL, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + if ( (GNUNET_NO == ret) && + (GNUNET_OK == delete_by_rowid (plugin, rowid)) ) + plugin->env->duc (plugin->env->cls, + - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); return; case SQLITE_DONE: /* database must be empty */ @@ -1657,6 +1432,85 @@ sqlite_plugin_replication_get (void *cls, /** + * Get a random item for replication. Returns a single, not expired, random item + * from those with the highest replication counters. The item's + * replication counter is decremented by one IF it was positive before. + * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * + * @param cls closure + * @param iter function to call the value (once only). + * @param iter_cls closure for iter + */ +static void +sqlite_plugin_replication_get (void *cls, + PluginIterator iter, void *iter_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt; + struct GNUNET_TIME_Absolute now; + +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Getting random block based on replication order.\n"); +#endif + stmt = plugin->selRepl; + now = GNUNET_TIME_absolute_get (); + if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value)) + { + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; + } + execute_get (plugin, stmt, iter, iter_cls); +} + + + +/** + * Get a random item that has expired or has low priority. + * Call 'iter' with all values ZERO or NULL if the datastore is empty. + * + * @param cls closure + * @param iter function to call the value (once only). + * @param iter_cls closure for iter + */ +static void +sqlite_plugin_expiration_get (void *cls, + PluginIterator iter, void *iter_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt; + struct GNUNET_TIME_Absolute now; + +#if DEBUG_SQLITE + GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, + "sqlite", + "Getting random block based on expiration and priority order.\n"); +#endif + now = GNUNET_TIME_absolute_get (); + stmt = plugin->selExpi; + if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, now.abs_value)) + { + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_bind_XXXX"); + if (SQLITE_OK != sqlite3_reset (stmt)) + LOG_SQLITE (plugin, NULL, + GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, "sqlite3_reset"); + iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + return; + } + execute_get (plugin, stmt, iter, iter_cls); +} + + +/** * Drop database. * * @param cls our plugin context @@ -1669,6 +1523,12 @@ sqlite_plugin_drop (void *cls) } +/** + * FIXME. + * + * @param cls the 'struct Plugin' + * @return the size of the database on disk (estimate) + */ static unsigned long long sqlite_plugin_get_size (void *cls) { @@ -1749,11 +1609,9 @@ libgnunet_plugin_datastore_sqlite_init (void *cls) api->next_request = &sqlite_next_request; api->get = &sqlite_plugin_get; api->replication_get = &sqlite_plugin_replication_get; + api->expiration_get = &sqlite_plugin_expiration_get; api->update = &sqlite_plugin_update; - api->iter_low_priority = &sqlite_plugin_iter_low_priority; api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity; - api->iter_ascending_expiration = &sqlite_plugin_iter_ascending_expiration; - api->iter_migration_order = &sqlite_plugin_iter_migration_order; api->iter_all_now = &sqlite_plugin_iter_all_now; api->drop = &sqlite_plugin_drop; GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, |