diff options
author | grothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96> | 2011-04-15 13:08:23 +0000 |
---|---|---|
committer | grothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96> | 2011-04-15 13:08:23 +0000 |
commit | 3cbd42f75a28a6ae238279094574b589f411f978 (patch) | |
tree | 58b40b99d1ee97a19fe47c9590b352473736e17d /src/datastore/plugin_datastore_mysql.c | |
parent | ccb949b55d7009fa95626ae530982db9cbccf1cb (diff) |
fixes
git-svn-id: https://gnunet.org/svn/gnunet@14992 140774ce-b5e7-0310-ab8b-a85725594a96
Diffstat (limited to 'src/datastore/plugin_datastore_mysql.c')
-rw-r--r-- | src/datastore/plugin_datastore_mysql.c | 434 |
1 files changed, 269 insertions, 165 deletions
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c index 1658aa51a2..deef46af03 100644 --- a/src/datastore/plugin_datastore_mysql.c +++ b/src/datastore/plugin_datastore_mysql.c @@ -208,6 +208,8 @@ struct NextRequestClosure unsigned int count; int end_it; + + int one_shot; }; @@ -284,9 +286,12 @@ struct Plugin #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX (idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 OFFSET ?" struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type; -#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=? LIMIT 1" +#define UPDATE_ENTRY "UPDATE gn090 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?" struct GNUNET_MysqlStatementHandle *update_entry; +#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?" + struct GNUNET_MysqlStatementHandle *dec_repl; + #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090" struct GNUNET_MysqlStatementHandle *get_size; @@ -866,144 +871,6 @@ return_ok (void *cls, /** - * Continuation of "mysql_next_request". - * - * @param next_cls the next context - * @param tc the task context (unused) - */ -static void -mysql_next_request_cont (void *next_cls, - const struct GNUNET_SCHEDULER_TaskContext *tc) -{ - struct NextRequestClosure *nrc = next_cls; - struct Plugin *plugin; - int ret; - unsigned int type; - unsigned int priority; - unsigned int anonymity; - unsigned long long exp; - unsigned long hashSize; - unsigned long size; - unsigned long long uid; - char value[GNUNET_DATASTORE_MAX_VALUE_SIZE]; - GNUNET_HashCode key; - struct GNUNET_TIME_Absolute expiration; - MYSQL_BIND *rbind = nrc->rbind; - - plugin = nrc->plugin; - plugin->next_task = GNUNET_SCHEDULER_NO_TASK; - plugin->next_task_nc = NULL; - - if (GNUNET_YES == nrc->end_it) - goto END_SET; - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->now = GNUNET_TIME_absolute_get (); - hashSize = sizeof (GNUNET_HashCode); - memset (nrc->rbind, 0, sizeof (nrc->rbind)); - rbind = nrc->rbind; - rbind[0].buffer_type = MYSQL_TYPE_LONG; - rbind[0].buffer = &type; - rbind[0].is_unsigned = 1; - rbind[1].buffer_type = MYSQL_TYPE_LONG; - rbind[1].buffer = &priority; - rbind[1].is_unsigned = 1; - rbind[2].buffer_type = MYSQL_TYPE_LONG; - rbind[2].buffer = &anonymity; - rbind[2].is_unsigned = 1; - rbind[3].buffer_type = MYSQL_TYPE_LONGLONG; - rbind[3].buffer = &exp; - rbind[3].is_unsigned = 1; - rbind[4].buffer_type = MYSQL_TYPE_BLOB; - rbind[4].buffer = &key; - rbind[4].buffer_length = hashSize; - rbind[4].length = &hashSize; - rbind[5].buffer_type = MYSQL_TYPE_BLOB; - rbind[5].buffer = value; - rbind[5].buffer_length = size = sizeof (value); - rbind[5].length = &size; - rbind[6].buffer_type = MYSQL_TYPE_LONGLONG; - rbind[6].buffer = &uid; - rbind[6].is_unsigned = 1; - - if (GNUNET_OK != nrc->prep (nrc->prep_cls, - nrc)) - goto END_SET; - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - GNUNET_assert (size <= sizeof(value)); - if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) || - (hashSize != sizeof (GNUNET_HashCode)) ) - { - GNUNET_break (0); - goto END_SET; - } -#if DEBUG_MYSQL - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n", - (unsigned int) size, - GNUNET_h2s (&key), - priority, - anonymity, - exp); -#endif - expiration.abs_value = exp; - ret = nrc->dviter (nrc->dviter_cls, (nrc->end_it == GNUNET_YES) ? NULL : nrc, - &key, - size, value, - type, priority, anonymity, expiration, - uid); - if (ret == GNUNET_SYSERR) - { - nrc->end_it = GNUNET_YES; - return; - } - if (ret == GNUNET_NO) - { - do_delete_entry (plugin, uid); - if (size != 0) - plugin->env->duc (plugin->env->cls, - - size); - } - return; - END_SET: - /* call dviter with "end of set" */ - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->dviter (nrc->dviter_cls, - NULL, NULL, 0, NULL, 0, 0, 0, - GNUNET_TIME_UNIT_ZERO_ABS, 0); - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - nrc->prep (nrc->prep_cls, NULL); - GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); - GNUNET_free (nrc); -} - - -/** - * Function invoked on behalf of a "PluginIterator" - * asking the database plugin to call the iterator - * with the next item. - * - * @param next_cls whatever argument was given - * to the PluginIterator as "next_cls". - * @param end_it set to GNUNET_YES if we - * should terminate the iteration early - * (iterator should be still called once more - * to signal the end of the iteration). - */ -static void -mysql_plugin_next_request (void *next_cls, - int end_it) -{ - struct NextRequestClosure *nrc = next_cls; - - if (GNUNET_YES == end_it) - nrc->end_it = GNUNET_YES; - nrc->plugin->next_task_nc = nrc; - nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont, - nrc); -} - - -/** * Get an estimate of how much space the database is * currently using. * @@ -1167,6 +1034,152 @@ mysql_plugin_update (void *cls, } + + +/** + * Continuation of "mysql_next_request". + * + * @param next_cls the next context + * @param tc the task context (unused) + */ +static void +mysql_next_request_cont (void *next_cls, + const struct GNUNET_SCHEDULER_TaskContext *tc) +{ + struct NextRequestClosure *nrc = next_cls; + struct Plugin *plugin; + int ret; + unsigned int type; + unsigned int priority; + unsigned int anonymity; + unsigned long long exp; + unsigned long hashSize; + unsigned long size; + unsigned long long uid; + char value[GNUNET_DATASTORE_MAX_VALUE_SIZE]; + GNUNET_HashCode key; + struct GNUNET_TIME_Absolute expiration; + MYSQL_BIND *rbind = nrc->rbind; + + plugin = nrc->plugin; + plugin->next_task = GNUNET_SCHEDULER_NO_TASK; + plugin->next_task_nc = NULL; + + if (GNUNET_YES == nrc->end_it) + goto END_SET; + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + nrc->now = GNUNET_TIME_absolute_get (); + hashSize = sizeof (GNUNET_HashCode); + memset (nrc->rbind, 0, sizeof (nrc->rbind)); + rbind = nrc->rbind; + rbind[0].buffer_type = MYSQL_TYPE_LONG; + rbind[0].buffer = &type; + rbind[0].is_unsigned = 1; + rbind[1].buffer_type = MYSQL_TYPE_LONG; + rbind[1].buffer = &priority; + rbind[1].is_unsigned = 1; + rbind[2].buffer_type = MYSQL_TYPE_LONG; + rbind[2].buffer = &anonymity; + rbind[2].is_unsigned = 1; + rbind[3].buffer_type = MYSQL_TYPE_LONGLONG; + rbind[3].buffer = &exp; + rbind[3].is_unsigned = 1; + rbind[4].buffer_type = MYSQL_TYPE_BLOB; + rbind[4].buffer = &key; + rbind[4].buffer_length = hashSize; + rbind[4].length = &hashSize; + rbind[5].buffer_type = MYSQL_TYPE_BLOB; + rbind[5].buffer = value; + rbind[5].buffer_length = size = sizeof (value); + rbind[5].length = &size; + rbind[6].buffer_type = MYSQL_TYPE_LONGLONG; + rbind[6].buffer = &uid; + rbind[6].is_unsigned = 1; + + if (GNUNET_OK != nrc->prep (nrc->prep_cls, + nrc)) + goto END_SET; + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_assert (size <= sizeof(value)); + if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) || + (hashSize != sizeof (GNUNET_HashCode)) ) + { + GNUNET_break (0); + goto END_SET; + } +#if DEBUG_MYSQL + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found %u-byte value under key `%s' with prio %u, anon %u, expire %llu selecting from gn090 table\n", + (unsigned int) size, + GNUNET_h2s (&key), + priority, + anonymity, + exp); +#endif + expiration.abs_value = exp; + ret = nrc->dviter (nrc->dviter_cls, + (nrc->one_shot == GNUNET_YES) ? NULL : nrc, + &key, + size, value, + type, priority, anonymity, expiration, + uid); + if (ret == GNUNET_SYSERR) + { + nrc->end_it = GNUNET_YES; + return; + } + if (ret == GNUNET_NO) + { + do_delete_entry (plugin, uid); + if (size != 0) + plugin->env->duc (plugin->env->cls, + - size); + } + if (nrc->one_shot == GNUNET_YES) + GNUNET_free (nrc); + return; + END_SET: + /* call dviter with "end of set" */ + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + nrc->dviter (nrc->dviter_cls, + NULL, NULL, 0, NULL, 0, 0, 0, + GNUNET_TIME_UNIT_ZERO_ABS, 0); + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + nrc->prep (nrc->prep_cls, NULL); + GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK); + GNUNET_free (nrc); +} + + +/** + * Function invoked on behalf of a "PluginIterator" + * asking the database plugin to call the iterator + * with the next item. + * + * @param next_cls whatever argument was given + * to the PluginIterator as "next_cls". + * @param end_it set to GNUNET_YES if we + * should terminate the iteration early + * (iterator should be still called once more + * to signal the end of the iteration). + */ +static void +mysql_plugin_next_request (void *next_cls, + int end_it) +{ + struct NextRequestClosure *nrc = next_cls; + + if (GNUNET_YES == end_it) + nrc->end_it = GNUNET_YES; + nrc->plugin->next_task_nc = nrc; + nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont, + nrc); +} + + +/** + * Context for 'get_statement_prepare'. + */ struct GetContext { GNUNET_HashCode key; @@ -1466,7 +1479,6 @@ replication_prepare (void *cls, { struct Plugin *plugin = cls; - nrc->end_it = GNUNET_YES; return prepared_statement_run_select (plugin, plugin->select_replication, 7, nrc->rbind, @@ -1475,6 +1487,92 @@ replication_prepare (void *cls, } + +/** + * Context for 'repl_iter' function. + */ +struct ReplCtx +{ + + /** + * Plugin handle. + */ + struct Plugin *plugin; + + /** + * Function to call for the result (or the NULL). + */ + PluginIterator iter; + + /** + * Closure for iter. + */ + void *iter_cls; +}; + + +/** + * Wrapper for the iterator for 'sqlite_plugin_replication_get'. + * Decrements the replication counter and calls the original + * iterator. + * + * @param cls closure + * @param next_cls closure to pass to the "next" function. + * @param key key for the content + * @param size number of bytes in data + * @param data content stored + * @param type type of the content + * @param priority priority of the content + * @param anonymity anonymity-level for the content + * @param expiration expiration time for the content + * @param uid unique identifier for the datum; + * maybe 0 if no unique identifier is available + * + * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue + * (continue on call to "next", of course), + * GNUNET_NO to delete the item and continue (if supported) + */ +static int +repl_iter (void *cls, + void *next_cls, + const GNUNET_HashCode *key, + uint32_t size, + const void *data, + enum GNUNET_BLOCK_Type type, + uint32_t priority, + uint32_t anonymity, + struct GNUNET_TIME_Absolute expiration, + uint64_t uid) +{ + struct ReplCtx *rc = cls; + struct Plugin *plugin = rc->plugin; + unsigned long long oid; + int ret; + + ret = rc->iter (rc->iter_cls, + next_cls, key, + size, data, + type, priority, anonymity, expiration, + uid); + if (NULL != key) + { + oid = (unsigned long long) uid; + ret = prepared_statement_run (plugin, + plugin->dec_repl, + NULL, + MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, + -1); + if (ret == GNUNET_SYSERR) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to reduce replication counter\n"); + return GNUNET_SYSERR; + } + } + return ret; +} + + /** * Get a random item for replication. Returns a single, not expired, random item * from those with the highest replication counters. The item's @@ -1490,18 +1588,23 @@ mysql_plugin_replication_get (void *cls, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure nrc; - - memset (&nrc, 0, sizeof (nrc)); - nrc.plugin = plugin; - nrc.now = GNUNET_TIME_absolute_get (); - nrc.prep = &replication_prepare; - nrc.prep_cls = plugin; - nrc.type = 0; - nrc.dviter = iter; - nrc.dviter_cls = iter_cls; - nrc.end_it = GNUNET_NO; - mysql_next_request_cont (&nrc, NULL); + struct NextRequestClosure *nrc; + struct ReplCtx rc; + + rc.plugin = plugin; + rc.iter = iter; + rc.iter_cls = iter_cls; + nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); + nrc->plugin = plugin; + nrc->now = GNUNET_TIME_absolute_get (); + nrc->prep = &replication_prepare; + nrc->prep_cls = plugin; + nrc->type = 0; + nrc->dviter = &repl_iter; + nrc->dviter_cls = &rc; + nrc->end_it = GNUNET_NO; + nrc->one_shot = GNUNET_YES; + mysql_next_request_cont (nrc, NULL); } @@ -1522,7 +1625,6 @@ expiration_prepare (void *cls, if (NULL == nrc) return GNUNET_NO; - nrc->end_it = GNUNET_YES; nt = (long long) nrc->now.abs_value; return prepared_statement_run_select (plugin, @@ -1547,18 +1649,19 @@ mysql_plugin_expiration_get (void *cls, PluginIterator iter, void *iter_cls) { struct Plugin *plugin = cls; - struct NextRequestClosure nrc; - - memset (&nrc, 0, sizeof (nrc)); - nrc.plugin = plugin; - nrc.now = GNUNET_TIME_absolute_get (); - nrc.prep = &expiration_prepare; - nrc.prep_cls = plugin; - nrc.type = 0; - nrc.dviter = iter; - nrc.dviter_cls = iter_cls; - nrc.end_it = GNUNET_NO; - mysql_next_request_cont (&nrc, NULL); + struct NextRequestClosure *nrc; + + nrc = GNUNET_malloc (sizeof (struct NextRequestClosure)); + nrc->plugin = plugin; + nrc->now = GNUNET_TIME_absolute_get (); + nrc->prep = &expiration_prepare; + nrc->prep_cls = plugin; + nrc->type = 0; + nrc->dviter = iter; + nrc->dviter_cls = iter_cls; + nrc->end_it = GNUNET_NO; + nrc->one_shot = GNUNET_YES; + mysql_next_request_cont (nrc, NULL); } @@ -1639,6 +1742,7 @@ libgnunet_plugin_datastore_mysql_init (void *cls) || PINIT (plugin->count_entry_by_hash_vhash_and_type, COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE) || PINIT (plugin->update_entry, UPDATE_ENTRY) + || PINIT (plugin->dec_repl, DEC_REPL) || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) ) |