diff options
Diffstat (limited to 'src/datastore/plugin_datastore_mysql.c')
-rw-r--r-- | src/datastore/plugin_datastore_mysql.c | 1786 |
1 files changed, 1786 insertions, 0 deletions
diff --git a/src/datastore/plugin_datastore_mysql.c b/src/datastore/plugin_datastore_mysql.c new file mode 100644 index 0000000000..03fe06b933 --- /dev/null +++ b/src/datastore/plugin_datastore_mysql.c @@ -0,0 +1,1786 @@ +/* + This file is part of GNUnet + (C) 2009, 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file datastore/plugin_datastore_mysql.c + * @brief mysql-based datastore backend + * @author Igor Wronsky + * @author Christian Grothoff + * + * NOTE: This db module does NOT work with mysql prior to 4.1 since + * it uses prepared statements. MySQL 5.0.46 promises to fix a bug + * in MyISAM that is causing us grief. At the time of this writing, + * that version is yet to be released. In anticipation, the code + * will use MyISAM with 5.0.46 (and higher). If you run such a + * version, please run "make check" to verify that the MySQL bug + * was actually fixed in your version (and if not, change the + * code below to use MyISAM for gn071). + * + * HIGHLIGHTS + * + * Pros + * + On up-to-date hardware where mysql can be used comfortably, this + * module will have better performance than the other db choices + * (according to our tests). + * + Its often possible to recover the mysql database from internal + * inconsistencies. The other db choices do not support repair! + * Cons + * - Memory usage (Comment: "I have 1G and it never caused me trouble") + * - Manual setup + * + * MANUAL SETUP INSTRUCTIONS + * + * 1) in /etc/gnunet.conf, set + * <pre> + * + * sqstore = "sqstore_mysql" + * + * </pre> + * 2) Then access mysql as root, + * <pre> + * + * $ mysql -u root -p + * + * </pre> + * and do the following. [You should replace $USER with the username + * that will be running the gnunetd process]. + * <pre> + * + CREATE DATABASE gnunet; + GRANT select,insert,update,delete,create,alter,drop,create temporary tables + ON gnunet.* TO $USER@localhost; + SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like'); + FLUSH PRIVILEGES; + * + * </pre> + * 3) In the $HOME directory of $USER, create a ".my.cnf" file + * with the following lines + * <pre> + + [client] + user=$USER + password=$the_password_you_like + + * </pre> + * + * Thats it. Note that .my.cnf file is a security risk unless its on + * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic + * link. Even greater security risk can be achieved by setting no + * password for $USER. Luckily $USER has only priviledges to mess + * up GNUnet's tables, nothing else (unless you give him more, + * of course).<p> + * + * 4) Still, perhaps you should briefly try if the DB connection + * works. First, login as $USER. Then use, + * + * <pre> + * $ mysql -u $USER -p $the_password_you_like + * mysql> use gnunet; + * </pre> + * + * If you get the message "Database changed" it probably works. + * + * [If you get "ERROR 2002: Can't connect to local MySQL server + * through socket '/tmp/mysql.sock' (2)" it may be resolvable by + * "ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock" + * so there may be some additional trouble depending on your mysql setup.] + * + * REPAIRING TABLES + * + * - Its probably healthy to check your tables for inconsistencies + * every now and then. + * - If you get odd SEGVs on gnunetd startup, it might be that the mysql + * databases have been corrupted. + * - The tables can be verified/fixed in two ways; + * 1) by running mysqlcheck -A, or + * 2) by executing (inside of mysql using the GNUnet database): + * mysql> REPAIR TABLE gn080; + * mysql> REPAIR TABLE gn072; + * + * PROBLEMS? + * + * If you have problems related to the mysql module, your best + * friend is probably the mysql manual. The first thing to check + * is that mysql is basically operational, that you can connect + * to it, create tables, issue queries etc. + * + */ + +#include "platform.h" +#include "plugin_datastore.h" + +#define DEBUG_MYSQL GNUNET_NO + +#define MAX_DATUM_SIZE 65536 + +/** + * Maximum number of supported parameters for a prepared + * statement. Increase if needed. + */ +#define MAX_PARAM 16 + +/** + * Die with an error message that indicates + * a failure of the command 'cmd' with the message given + * by strerror(errno). + */ +#define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE_ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); abort(); } while(0); + +/** + * Log an error message at log-level 'level' that indicates + * a failure of the command 'cmd' on file 'filename' + * with the message given by strerror(errno). + */ +#define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0); + + +/* warning, slighly crazy mysql statements ahead. Essentially, MySQL does not handle + "OR" very well, so we need to use UNION instead. And UNION does not + automatically apply a LIMIT on the outermost clause, so we need to + repeat ourselves quite a bit. All hail the performance gods (and thanks + to #mysql on freenode) */ +#define SELECT_IT_LOW_PRIORITY "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey > ?) "\ + "ORDER BY prio ASC,vkey ASC LIMIT 1) "\ + "UNION "\ + "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio > ? AND vkey != ?)"\ + "ORDER BY prio ASC,vkey ASC LIMIT 1)"\ + "ORDER BY prio ASC,vkey ASC LIMIT 1" + +#define SELECT_IT_NON_ANONYMOUS "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio = ? AND vkey < ?)"\ + " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\ + "UNION "\ + "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(prio) WHERE (prio < ? AND vkey != ?)"\ + " AND anonLevel=0 ORDER BY prio DESC,vkey DESC LIMIT 1) "\ + "ORDER BY prio DESC,vkey DESC LIMIT 1" + +#define SELECT_IT_EXPIRATION_TIME "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey > ?) "\ + "ORDER BY expire ASC,vkey ASC LIMIT 1) "\ + "UNION "\ + "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire > ? AND vkey != ?) "\ + "ORDER BY expire ASC,vkey ASC LIMIT 1)"\ + "ORDER BY expire ASC,vkey ASC LIMIT 1" + + +#define SELECT_IT_MIGRATION_ORDER "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire = ? AND vkey < ?)"\ + " AND expire > ? AND type!=3"\ + " ORDER BY expire DESC,vkey DESC LIMIT 1) "\ + "UNION "\ + "(SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX(expire) WHERE (expire < ? AND vkey != ?)"\ + " AND expire > ? AND type!=3"\ + " ORDER BY expire DESC,vkey DESC LIMIT 1)"\ + "ORDER BY expire DESC,vkey DESC LIMIT 1" + +#define SELECT_SIZE "SELECT sum(size) FROM gn080" + + +struct GNUNET_MysqlStatementHandle +{ + struct GNUNET_MysqlStatementHandle *next; + + struct GNUNET_MysqlStatementHandle *prev; + + char *query; + + MYSQL_STMT *statement; + + int valid; + +}; + + +/** + * Context for all functions in this plugin. + */ +struct Plugin +{ + /** + * Our execution environment. + */ + struct GNUNET_DATASTORE_PluginEnvironment *env; + + MYSQL *dbf; + + struct GNUNET_MysqlStatementHandle *shead; + + struct GNUNET_MysqlStatementHandle *stail; + + /** + * Filename of "my.cnf" (msyql configuration). + */ + char *cnffile; + + /** + * Statements dealing with gn072 table + */ +#define SELECT_VALUE "SELECT value FROM gn072 WHERE vkey=?" + struct GNUNET_MysqlStatementHandle *select_value; + +#define DELETE_VALUE "DELETE FROM gn072 WHERE vkey=?"o + struct GNUNET_MysqlStatementHandle *delete_value; + +#define INSERT_VALUE "INSERT INTO gn072 (value) VALUES (?)" + struct GNUNET_MysqlStatementHandle *insert_value; + + /** + * Statements dealing with gn080 table + */ +#define INSERT_ENTRY "INSERT INTO gn080 (size,type,prio,anonLevel,expire,hash,vhash,vkey) VALUES (?,?,?,?,?,?,?,?)" + struct GNUNET_MysqlStatementHandle *insert_entry; + +#define DELETE_ENTRY_BY_VKEY "DELETE FROM gn080 WHERE vkey=?" + struct GNUNET_MysqlStatementHandle *delete_entry_by_vkey; + +#define SELECT_ENTRY_BY_HASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?" + struct GNUNET_MysqlStatementHandle *select_entry_by_hash; + +#define SELECT_ENTRY_BY_HASH_AND_VHASH "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? ORDER BY vkey ASC LIMIT 1 OFFSET ?" + struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_vhash; + +#define SELECT_ENTRY_BY_HASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vkey) WHERE hash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?" + struct GNUNET_MysqlStatementHandle *select_entry_by_hash_and_type; + +#define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT size,type,prio,anonLevel,expire,hash,vkey FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=? AND vkey > ? AND type=? ORDER BY vkey ASC LIMIT 1 OFFSET ?" + struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type; + +#define COUNT_ENTRY_BY_HASH "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=?" + struct GNUNET_MysqlStatementHandle *count_entry_by_hash; + +#define COUNT_ENTRY_BY_HASH_AND_VHASH "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash_vkey) WHERE hash=? AND vhash=?" + struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_vhash; + +#define COUNT_ENTRY_BY_HASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash) WHERE hash=? AND type=?" + struct GNUNET_MysqlStatementHandle *count_entry_by_hash_and_type; + +#define COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT count(*) FROM gn080 FORCE INDEX (hash_vhash) WHERE hash=? AND vhash=? AND type=?" + struct GNUNET_MysqlStatementHandle *count_entry_by_hash_vhash_and_type; + +#define UPDATE_ENTRY "UPDATE gn080 SET prio=prio+?,expire=IF(expire>=?,expire,?) WHERE vkey=?" + struct GNUNET_MysqlStatementHandle *update_entry; + + struct GNUNET_MysqlStatementHandle *iter[4]; + + //static unsigned int stat_size; + + /** + * Size of the mysql database on disk. + */ + unsigned long long content_size; + +}; + + +/** + * Obtain the location of ".my.cnf". + * @return NULL on error + */ +static char * +get_my_cnf_path (struct GNUNET_ConfigurationHandle *cfg) +{ + char *cnffile; + char *home_dir; + struct stat st; +#ifndef WINDOWS + struct passwd *pw; +#endif + +#ifndef WINDOWS + pw = getpwuid (getuid ()); + if (!pw) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, + "getpwuid"); + return NULL; + } + home_dir = GNUNET_strdup (pw->pw_dir); +#else + home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1); + plibc_conv_to_win_path ("~/", home_dir); +#endif + GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir); + GNUNET_free (home_dir); + GNUNET_CONFIUGRATION_get_value_filename (cfg, + "MYSQL", "CONFIG", cnffile, + &home_dir); + GNUNET_free (cnffile); + cnffile = home_dir; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Trying to use file `%s' for MySQL configuration.\n"), + cnffile); + if ((0 != STAT (cnffile, &st)) || + (0 != ACCESS (cnffile, R_OK)) || (!S_ISREG (st.st_mode))) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Could not access file `%s': %s\n"), cnffile, + STRERROR (errno)); + GNUNET_free (cnffile); + return NULL; + } + return cnffile; +} + + +/** + * Close database connection and all prepared statements (we got a DB + * disconnect error). + */ +static int +iclose (struct Plugin *plugin) +{ + struct GNUNET_MysqlStatementHandle *spos; + + spos = plugin->shead; + while (spos != NULL) + { + if (spos->statement != NULL) + { + mysql_stmt_close (spos->statement); + spos->statement = NULL; + } + spos->valid = GNUNET_NO; + spos = spos->next; + } + if (plugin->dbf != NULL) + { + mysql_close (plugin->dbf); + plugin->dbf = NULL; + } + return GNUNET_OK; +} + + +/** + * Open the connection with the database (and initialize + * our default options). + * + * @return GNUNET_OK on success + */ +static int +iopen (struct Plugin *ret) +{ + char *mysql_dbname; + char *mysql_server; + char *mysql_user; + char *mysql_password; + unsigned long long mysql_port; + my_bool reconnect; + unsigned int timeout; + + ret->dbf = mysql_init (NULL); + if (ret->dbf == NULL) + return GNUNET_SYSERR; + if (ret->cnffile != NULL) + mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile); + mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client"); + reconnect = 0; + mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect); + mysql_options (ret->dbf, + MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout); + timeout = 60; /* in seconds */ + mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout); + mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout); + mysql_dbname = NULL; + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, + "MYSQL", "DATABASE")) + GNUNET_assert (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + "MYSQL", "DATABASE", + &mysql_dbname)); + else + mysql_dbname = GNUNET_strdup ("gnunet"); + mysql_user = NULL; + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, + "MYSQL", "USER")) + { + GNUNET_break (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + "MYSQL", "USER", + &mysql_user)); + } + mysql_password = NULL; + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg, + "MYSQL", "PASSWORD")) + { + GNUNET_break (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string (ret->cfg, + "MYSQL", "PASSWORD", + &mysql_password)); + } + mysql_server = NULL; + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->cfg, + "MYSQL", "HOST")) + { + GNUNET_break (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string (ret->cfg, + "MYSQL", "HOST", "", + &mysql_server)); + } + mysql_port = 0; + if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->cfg, + "MYSQL", "PORT")) + { + GNUNET_break (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_number (ret->cfg, "MYSQL", + "PORT", &mysql_port)); + } + + GNUNET_assert (mysql_dbname != NULL); + mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password, + mysql_dbname, (unsigned int) mysql_port, NULL, 0); + GNUNET_free (mysql_dbname); + if (mysql_error (ret->dbf)[0]) + { + LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, + "mysql_real_connect", ret); + return GNUNET_SYSERR; + } + ret->valid = GNUNET_YES; + return GNUNET_OK; +} + + +/** + * Run the given MySQL statement. + * + * @return GNUNET_OK on success, GNUNET_SYSERR on error + */ +static int +run_statement (struct Plugin *plugin, + const char *statement) +{ + if ((NULL == plugin->dbh) && (GNUNET_OK != iopen (plugin))) + return GNUNET_SYSERR; + mysql_query (plugin->dbf, statement); + if (mysql_error (plugin->dbf)[0]) + { + LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, + "mysql_query", plugin); + iclose (plugin); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Run the given MySQL SELECT statement. The statement + * must have only a single result (one column, one row). + * + * @return result on success, NULL on error + */ +static char * +run_statement_select (struct Plugin *plugin, + const char *statement) +{ + MYSQL_RES *sql_res; + MYSQL_ROW sql_row; + char *ret; + + if ((NULL == plugin->dbh) && (GNUNET_OK != iopen (plugin))) + return NULL; + mysql_query (plugin->dbf, statement); + if ((mysql_error (plugin->dbf)[0]) || + (!(sql_res = mysql_use_result (plugin->dbf))) || + (!(sql_row = mysql_fetch_row (sql_res)))) + { + LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, + "mysql_query", plugin); + return NULL; + } + if ((mysql_num_fields (sql_res) != 1) || (sql_row[0] == NULL)) + { + GNUNET_break (mysql_num_fields (sql_res) == 1); + if (sql_res != NULL) + mysql_free_result (sql_res); + return NULL; + } + ret = GNUNET_strdup (sql_row[0]); + mysql_free_result (sql_res); + return ret; +} + + +/** + * Create a prepared statement. + * + * @return NULL on error + */ +static struct GNUNET_MysqlStatementHandle * +prepared_statement_create (struct Plugin *plugin, + const char *statement) +{ + struct GNUNET_MysqlStatementHandle *ret; + + ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle)); + ret->query = GNUNET_strdup (statement); + GNUNET_CONTAINER_DLL_insert (plugin->shead, + plugin->stail, + ret); + return ret; +} + + +/** + * Prepare a statement for running. + * + * @return GNUNET_OK on success + */ +static int +prepare_statement (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *ret) +{ + if (GNUNET_YES == ret->valid) + return GNUNET_OK; + if ((NULL == plugin->dbh) && + (GNUNET_OK != iopen (plugin))) + return GNUNET_SYSERR; + ret->statement = mysql_stmt_init (plugin->dbf); + if (ret->statement == NULL) + { + iclose (plugin); + return GNUNET_SYSERR; + } + if (mysql_stmt_prepare (ret->statement, + ret->query, + strlen (ret->query))) + { + LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, + "mysql_stmt_prepare", + plugin); + mysql_stmt_close (ret->statement); + ret->statement = NULL; + iclose (plugin); + return GNUNET_SYSERR; + } + ret->valid = GNUNET_YES; + return GNUNET_OK; +} + + +/** + * Free a prepared statement. + */ +static void +prepared_statement_destroy (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle + *s) +{ + GNUNET_CONTAINER_DLL_remove (plugin->shead, + plugin->stail, + s); + if (s->valid) + mysql_stmt_close (s->statement); + GNUNET_free (s->query); + GNUNET_free (s); +} + + +/** + * Bind the parameters for the given MySQL statement + * and run it. + * + * @param s statement to bind and run + * @param ap arguments for the binding + * @return GNUNET_SYSERR on error, GNUNET_OK on success + */ +static int +init_params (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *s, + va_list ap) +{ + MYSQL_BIND qbind[MAX_PARAM]; + unsigned int pc; + unsigned int off; + enum enum_field_types ft; + + pc = mysql_stmt_param_count (s->statement); + if (pc > MAX_PARAM) + { + /* increase internal constant! */ + GNUNET_break (0); + return GNUNET_SYSERR; + } + memset (qbind, 0, sizeof (qbind)); + off = 0; + ft = 0; + while ((pc > 0) && (-1 != (ft = va_arg (ap, enum enum_field_types)))) + { + qbind[off].buffer_type = ft; + switch (ft) + { + case MYSQL_TYPE_FLOAT: + qbind[off].buffer = va_arg (ap, float *); + break; + case MYSQL_TYPE_LONGLONG: + qbind[off].buffer = va_arg (ap, unsigned long long *); + qbind[off].is_unsigned = va_arg (ap, int); + break; + case MYSQL_TYPE_LONG: + qbind[off].buffer = va_arg (ap, unsigned int *); + qbind[off].is_unsigned = va_arg (ap, int); + break; + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_BLOB: + qbind[off].buffer = va_arg (ap, void *); + qbind[off].buffer_length = va_arg (ap, unsigned long); + qbind[off].length = va_arg (ap, unsigned long *); + break; + default: + /* unsupported type */ + GNUNET_break (0); + return GNUNET_SYSERR; + } + pc--; + off++; + } + if (!((pc == 0) && (ft != -1) && (va_arg (ap, int) == -1))) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (mysql_stmt_bind_param (s->statement, qbind)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_bind_param", + __FILE__, __LINE__, mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; + } + if (mysql_stmt_execute (s->statement)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_execute", + __FILE__, __LINE__, mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + +/** + * Type of a callback that will be called for each + * data set returned from MySQL. + * + * @param cls user-defined argument + * @param num_values number of elements in values + * @param values values returned by MySQL + * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort + */ +typedef int (*GNUNET_MysqlDataProcessor) (void *cls, + unsigned int num_values, + MYSQL_BIND * values); + + +/** + * Run a prepared SELECT statement. + * + * @param result_size number of elements in results array + * @param results pointer to already initialized MYSQL_BIND + * array (of sufficient size) for passing results + * @param processor function to call on each result + * @param processor_cls extra argument to processor + * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective + * values (size + buffer-reference for pointers); terminated + * with "-1" + * @return GNUNET_SYSERR on error, otherwise + * the number of successfully affected (or queried) rows + */ +static int +prepared_statement_run_select (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle + *s, + unsigned int result_size, + MYSQL_BIND * results, + GNUNET_MysqlDataProcessor + processor, void *processor_cls, + ...) +{ + va_list ap; + int ret; + unsigned int rsize; + int total; + + if (GNUNET_OK != prepare_statement (plugin, s)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + va_start (ap, processor_cls); + if (GNUNET_OK != init_params (plugin, s, ap)) + { + GNUNET_break (0); + va_end (ap); + return GNUNET_SYSERR; + } + va_end (ap); + rsize = mysql_stmt_field_count (s->statement); + if (rsize > result_size) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (mysql_stmt_bind_result (s->statement, results)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_bind_result", + __FILE__, __LINE__, mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; + } + + total = 0; + while (1) + { + ret = mysql_stmt_fetch (s->statement); + if (ret == MYSQL_NO_DATA) + break; + if (ret != 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_fetch", + __FILE__, __LINE__, mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; + } + if (processor != NULL) + if (GNUNET_OK != processor (processor_cls, rsize, results)) + break; + total++; + } + mysql_stmt_reset (s->statement); + return total; +} + + +/** + * Run a prepared statement that does NOT produce results. + * + * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective + * values (size + buffer-reference for pointers); terminated + * with "-1" + * @param insert_id NULL or address where to store the row ID of whatever + * was inserted (only for INSERT statements!) + * @return GNUNET_SYSERR on error, otherwise + * the number of successfully affected rows + */ +static int +prepared_statement_run (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *s, + unsigned long long *insert_id, ...) +{ + va_list ap; + int affected; + + if (GNUNET_OK != prepare_statement (plugin, s)) + return GNUNET_SYSERR; + va_start (ap, insert_id); + if (GNUNET_OK != init_params (plugin, s, ap)) + { + va_end (ap); + return GNUNET_SYSERR; + } + va_end (ap); + affected = mysql_stmt_affected_rows (s->statement); + if (NULL != insert_id) + *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement); + mysql_stmt_reset (s->statement); + return affected; +} + + + + +/** + * Delete an value from the gn072 table. + * + * @param vkey vkey identifying the value to delete + * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error + */ +static int +do_delete_value (unsigned long long vkey) +{ + int ret; + + ret = GNUNET_MYSQL_prepared_statement_run (delete_value, + NULL, + MYSQL_TYPE_LONGLONG, + &vkey, GNUNET_YES, -1); + if (ret > 0) + ret = GNUNET_OK; + return ret; +} + +/** + * Insert a value into the gn072 table. + * + * @param value the value to insert + * @param size size of the value + * @param vkey vkey identifying the value henceforth (set) + * @return GNUNET_OK on success, GNUNET_SYSERR on error + */ +static int +do_insert_value (const void *value, unsigned int size, + unsigned long long *vkey) +{ + unsigned long length = size; + + return GNUNET_MYSQL_prepared_statement_run (insert_value, + vkey, + MYSQL_TYPE_BLOB, + value, length, &length, -1); +} + +/** + * Delete an entry from the gn080 table. + * + * @param vkey vkey identifying the entry to delete + * @return GNUNET_OK on success, GNUNET_NO if no such value exists, GNUNET_SYSERR on error + */ +static int +do_delete_entry_by_vkey (unsigned long long vkey) +{ + int ret; + + ret = GNUNET_MYSQL_prepared_statement_run (delete_entry_by_vkey, + NULL, + MYSQL_TYPE_LONGLONG, + &vkey, GNUNET_YES, -1); + if (ret > 0) + ret = GNUNET_OK; + return ret; +} + +static int +return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values) +{ + return GNUNET_OK; +} + +/** + * Given a full (SELECT *) result set from gn080 table, + * assemble it into a GNUNET_DatastoreValue representation. + * + * Call *without* holding the lock, but while within + * mysql_thread_start/end. + * + * @param result location where mysql_stmt_fetch stored the results + * @return NULL on error + */ +static GNUNET_DatastoreValue * +assembleDatum (MYSQL_BIND * result) +{ + GNUNET_DatastoreValue *datum; + unsigned int contentSize; + unsigned int type; + unsigned int prio; + unsigned int level; + unsigned long long exp; + unsigned long long vkey; + unsigned long length; + MYSQL_BIND rbind[1]; + int ret; + + if ((result[0].buffer_type != MYSQL_TYPE_LONG) || + (!result[0].is_unsigned) || + (result[1].buffer_type != MYSQL_TYPE_LONG) || + (!result[1].is_unsigned) || + (result[2].buffer_type != MYSQL_TYPE_LONG) || + (!result[2].is_unsigned) || + (result[3].buffer_type != MYSQL_TYPE_LONG) || + (!result[3].is_unsigned) || + (result[4].buffer_type != MYSQL_TYPE_LONGLONG) || + (!result[4].is_unsigned) || + (result[5].buffer_type != MYSQL_TYPE_BLOB) || + (result[5].buffer_length != sizeof (GNUNET_HashCode)) || + (*result[5].length != sizeof (GNUNET_HashCode)) || + (result[6].buffer_type != MYSQL_TYPE_LONGLONG) || + (!result[6].is_unsigned)) + { + GNUNET_break (0); + return NULL; /* error */ + } + + contentSize = *(unsigned int *) result[0].buffer; + if (contentSize < sizeof (GNUNET_DatastoreValue)) + return NULL; /* error */ + if (contentSize > GNUNET_MAX_BUFFER_SIZE) + { + GNUNET_break (0); /* far too big */ + return NULL; + } + contentSize -= sizeof (GNUNET_DatastoreValue); + type = *(unsigned int *) result[1].buffer; + prio = *(unsigned int *) result[2].buffer; + level = *(unsigned int *) result[3].buffer; + exp = *(unsigned long long *) result[4].buffer; + vkey = *(unsigned long long *) result[6].buffer; + datum = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) + contentSize); + datum->size = htonl (contentSize + sizeof (GNUNET_DatastoreValue)); + datum->type = htonl (type); + datum->priority = htonl (prio); + datum->anonymity_level = htonl (level); + datum->expiration_time = GNUNET_htonll (exp); + + /* now do query on gn072 */ + length = contentSize; + memset (rbind, 0, sizeof (rbind)); + rbind[0].buffer_type = MYSQL_TYPE_BLOB; + rbind[0].buffer_length = contentSize; + rbind[0].length = &length; + rbind[0].buffer = &datum[1]; + ret = GNUNET_MYSQL_prepared_statement_run_select (select_value, + 1, + rbind, + &return_ok, + NULL, + MYSQL_TYPE_LONGLONG, + &vkey, GNUNET_YES, -1); + GNUNET_break (ret <= 1); /* should only have one result! */ + if (ret > 0) + ret = GNUNET_OK; + if ((ret != GNUNET_OK) || + (rbind[0].buffer_length != contentSize) || (length != contentSize)) + { + GNUNET_break (ret != 0); /* should have one result! */ + GNUNET_break (length == contentSize); /* length should match! */ + GNUNET_break (rbind[0].buffer_length == contentSize); /* length should be internally consistent! */ + do_delete_value (vkey); + if (ret != 0) + do_delete_entry_by_vkey (vkey); + content_size -= ntohl (datum->size); + GNUNET_free (datum); + return NULL; + } + return datum; +} + + +/** + * Iterate over the items in the datastore + * using the given query to select and order + * the items. + * + * @param type entries of which type should be considered? + * Use 0 for any type. + * @param iter never NULL + * @param is_asc are we using ascending order? + * @return the number of results, GNUNET_SYSERR if the + * iter is non-NULL and aborted the iteration + */ +static int +iterateHelper (struct Plugin *plugin, + unsigned int type, + int is_asc, + unsigned int iter_select, GNUNET_DatastoreValueIterator dviter, + void *closure) +{ + GNUNET_DatastoreValue *datum; + int count; + int ret; + unsigned int last_prio; + unsigned long long last_expire; + unsigned long long last_vkey; + unsigned int size; + unsigned int rtype; + unsigned int prio; + unsigned int level; + unsigned long long expiration; + unsigned long long vkey; + unsigned long hashSize; + GNUNET_HashCode key; + GNUNET_CronTime now; + MYSQL_BIND rbind[7]; + + if (is_asc) + { + last_prio = 0; + last_vkey = 0; + last_expire = 0; + } + else + { + last_prio = 0x7FFFFFFFL; + last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */ + last_expire = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */ + } + hashSize = sizeof (GNUNET_HashCode); + memset (rbind, 0, sizeof (rbind)); + rbind[0].buffer_type = MYSQL_TYPE_LONG; + rbind[0].buffer = &size; + rbind[0].is_unsigned = 1; + rbind[1].buffer_type = MYSQL_TYPE_LONG; + rbind[1].buffer = &rtype; + rbind[1].is_unsigned = 1; + rbind[2].buffer_type = MYSQL_TYPE_LONG; + rbind[2].buffer = &prio; + rbind[2].is_unsigned = 1; + rbind[3].buffer_type = MYSQL_TYPE_LONG; + rbind[3].buffer = &level; + rbind[3].is_unsigned = 1; + rbind[4].buffer_type = MYSQL_TYPE_LONGLONG; + rbind[4].buffer = &expiration; + rbind[4].is_unsigned = 1; + rbind[5].buffer_type = MYSQL_TYPE_BLOB; + rbind[5].buffer = &key; + rbind[5].buffer_length = hashSize; + rbind[5].length = &hashSize; + rbind[6].buffer_type = MYSQL_TYPE_LONGLONG; + rbind[6].buffer = &vkey; + rbind[6].is_unsigned = GNUNET_YES; + + now = GNUNET_get_time (); + count = 0; + while (1) + { + switch (iter_select) + { + case 0: + case 1: + ret = prepared_statement_run_select (iter[iter_select], + 7, + rbind, + &return_ok, + NULL, + MYSQL_TYPE_LONG, + &last_prio, + GNUNET_YES, + MYSQL_TYPE_LONGLONG, + &last_vkey, + GNUNET_YES, + MYSQL_TYPE_LONG, + &last_prio, + GNUNET_YES, + MYSQL_TYPE_LONGLONG, + &last_vkey, + GNUNET_YES, -1); + break; + case 2: + ret = prepared_statement_run_select (iter[iter_select], + 7, + rbind, + &return_ok, + NULL, + MYSQL_TYPE_LONGLONG, + &last_expire, + GNUNET_YES, + MYSQL_TYPE_LONGLONG, + &last_vkey, + GNUNET_YES, + MYSQL_TYPE_LONGLONG, + &last_expire, + GNUNET_YES, + MYSQL_TYPE_LONGLONG, + &last_vkey, + GNUNET_YES, -1); + break; + case 3: + ret = prepared_statement_run_select (iter[iter_select], + 7, |