diff options
Diffstat (limited to 'src/datacache/plugin_datacache_mysql.c')
-rw-r--r-- | src/datacache/plugin_datacache_mysql.c | 1005 |
1 files changed, 1005 insertions, 0 deletions
diff --git a/src/datacache/plugin_datacache_mysql.c b/src/datacache/plugin_datacache_mysql.c new file mode 100644 index 0000000..9185c5c --- /dev/null +++ b/src/datacache/plugin_datacache_mysql.c @@ -0,0 +1,1005 @@ +/* + This file is part of GNUnet + (C) 2006, 2009, 2010 Christian Grothoff (and other contributing authors) + + GNUnet is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published + by the Free Software Foundation; either version 3, or (at your + option) any later version. + + GNUnet is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU General Public License + along with GNUnet; see the file COPYING. If not, write to the + Free Software Foundation, Inc., 59 Temple Place - Suite 330, + Boston, MA 02111-1307, USA. +*/ + +/** + * @file datacache/plugin_datacache_mysql.c + * @brief mysql for an implementation of a database backend for the datacache + * @author Christian Grothoff + * + * SETUP INSTRUCTIONS: + * + * 1) Access mysql as root, + * <pre> + * + * $ mysql -u root -p + * + * </pre> + * and do the following. [You should replace $USER with the username + * that will be running the gnunetd process]. + * @verbatim + CREATE DATABASE gnunet; + GRANT select,insert,update,delete,create,alter,drop,create temporary tables + ON gnunet.* TO $USER@localhost; + SET PASSWORD FOR $USER@localhost=PASSWORD('$the_password_you_like'); + FLUSH PRIVILEGES; + @endverbatim + * 2) In the $HOME directory of $USER, create a ".my.cnf" file + * with the following lines + * @verbatim + [client] + user=$USER + password=$the_password_you_like + @endverbatim + * + * Thats it -- now you can configure your datastores in GNUnet to + * use MySQL. Note that .my.cnf file is a security risk unless its on + * a safe partition etc. The $HOME/.my.cnf can of course be a symbolic + * link. Even greater security risk can be achieved by setting no + * password for $USER. Luckily $USER has only priviledges to mess + * up GNUnet's tables, nothing else (unless you give him more, + * of course).<p> + * + * 3) Still, perhaps you should briefly try if the DB connection + * works. First, login as $USER. Then use, + * @verbatim + $ mysql -u $USER -p $the_password_you_like + mysql> use gnunet; + @endverbatim + * + * If you get the message "Database changed" it probably works. + * + * [If you get "ERROR 2002: Can't connect to local MySQL server + * through socket '/tmp/mysql.sock' (2)" it may be resolvable by + * "ln -s /var/run/mysqld/mysqld.sock /tmp/mysql.sock" + * so there may be some additional trouble depending on your mysql setup.] + * + * PROBLEMS? + * + * If you have problems related to the mysql module, your best + * friend is probably the mysql manual. The first thing to check + * is that mysql is basically operational, that you can connect + * to it, create tables, issue queries etc. + */ +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_datacache_plugin.h" +#include <mysql/mysql.h> + +#define DEBUG_DATACACHE_MYSQL GNUNET_EXTRA_LOGGING + +/** + * Estimate of the per-entry overhead (including indices). + */ +#define OVERHEAD ((4*2+4*2+8*2+8*2+sizeof(GNUNET_HashCode)*5+8)) + +/** + * Maximum number of supported parameters for a prepared + * statement. Increase if needed. + */ +#define MAX_PARAM 16 + +/** + * Die with an error message that indicates + * a failure of the command 'cmd' with the message given + * by strerror(errno). + */ +#define DIE_MYSQL(cmd, dbh) do { GNUNET_log(GNUNET_ERROR_TYPE__ERROR, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); GNUNET_abort(); } while(0); + +/** + * Log an error message at log-level 'level' that indicates + * a failure of the command 'cmd' on file 'filename' + * with the message given by strerror(errno). + */ +#define LOG_MYSQL(level, cmd, dbh) do { GNUNET_log(level, _("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, mysql_error((dbh)->dbf)); } while(0); + +struct GNUNET_MysqlStatementHandle +{ + struct GNUNET_MysqlStatementHandle *next; + + struct GNUNET_MysqlStatementHandle *prev; + + char *query; + + MYSQL_STMT *statement; + + int valid; + +}; + + +/** + * Context for all functions in this plugin. + */ +struct Plugin +{ + /** + * Our execution environment. + */ + struct GNUNET_DATACACHE_PluginEnvironment *env; + + /** + * Handle to the mysql database. + */ + MYSQL *dbf; + + struct GNUNET_MysqlStatementHandle *shead; + + struct GNUNET_MysqlStatementHandle *stail; + + /** + * Filename of "my.cnf" (msyql configuration). + */ + char *cnffile; + +#define SELECT_VALUE_STMT "SELECT value,expire FROM gn080dstore FORCE INDEX (hashidx) WHERE hash=? AND type=? AND expire >= ? LIMIT 1 OFFSET ?" + struct GNUNET_MysqlStatementHandle *select_value; + +#define COUNT_VALUE_STMT "SELECT count(*) FROM gn080dstore FORCE INDEX (hashidx) WHERE hash=? AND type=? AND expire >= ?" + struct GNUNET_MysqlStatementHandle *count_value; + +#define SELECT_OLD_VALUE_STMT "SELECT hash, vhash, type, value FROM gn080dstore FORCE INDEX (expireidx) ORDER BY puttime ASC LIMIT 1" + struct GNUNET_MysqlStatementHandle *select_old_value; + +#define DELETE_VALUE_STMT "DELETE FROM gn080dstore WHERE hash = ? AND vhash = ? AND type = ? AND value = ?" + struct GNUNET_MysqlStatementHandle *delete_value; + +#define INSERT_VALUE_STMT "INSERT INTO gn080dstore (type, puttime, expire, hash, vhash, value) "\ + "VALUES (?, ?, ?, ?, ?, ?)" + struct GNUNET_MysqlStatementHandle *insert_value; + +#define UPDATE_VALUE_STMT "UPDATE gn080dstore FORCE INDEX (allidx) SET puttime=?, expire=? "\ + "WHERE hash=? AND vhash=? AND type=?" + struct GNUNET_MysqlStatementHandle *update_value; + +}; + + +/** + * Obtain the location of ".my.cnf". + * + * @param cfg our configuration + * @return NULL on error + */ +static char * +get_my_cnf_path (const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + char *cnffile; + char *home_dir; + struct stat st; + +#ifndef WINDOWS + struct passwd *pw; +#endif + int configured; + +#ifndef WINDOWS + pw = getpwuid (getuid ()); + if (!pw) + { + GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR, "getpwuid"); + return NULL; + } + if (GNUNET_YES == + GNUNET_CONFIGURATION_have_value (cfg, "datacache-mysql", "CONFIG")) + { + GNUNET_assert (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_filename (cfg, + "datacache-mysql", + "CONFIG", + &cnffile)); + configured = GNUNET_YES; + } + else + { + home_dir = GNUNET_strdup (pw->pw_dir); + GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir); + GNUNET_free (home_dir); + configured = GNUNET_NO; + } +#else + home_dir = (char *) GNUNET_malloc (_MAX_PATH + 1); + plibc_conv_to_win_path ("~/", home_dir); + GNUNET_asprintf (&cnffile, "%s/.my.cnf", home_dir); + GNUNET_free (home_dir); + configured = GNUNET_NO; +#endif + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Trying to use file `%s' for MySQL configuration.\n"), cnffile); + if ((0 != STAT (cnffile, &st)) || (0 != ACCESS (cnffile, R_OK)) || + (!S_ISREG (st.st_mode))) + { + if (configured == GNUNET_YES) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Could not access file `%s': %s\n"), cnffile, + STRERROR (errno)); + GNUNET_free (cnffile); + return NULL; + } + return cnffile; +} + + +/** + * Free a prepared statement. + * + * @param plugin plugin context + * @param s prepared statement + */ +static void +prepared_statement_destroy (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *s) +{ + GNUNET_CONTAINER_DLL_remove (plugin->shead, plugin->stail, s); + if (s->valid) + mysql_stmt_close (s->statement); + GNUNET_free (s->query); + GNUNET_free (s); +} + + +/** + * Close database connection and all prepared statements (we got a DB + * disconnect error). + */ +static int +iclose (struct Plugin *plugin) +{ + while (NULL != plugin->shead) + prepared_statement_destroy (plugin, plugin->shead); + if (plugin->dbf != NULL) + { + mysql_close (plugin->dbf); + plugin->dbf = NULL; + } + return GNUNET_OK; +} + + +/** + * Open the connection with the database (and initialize + * our default options). + * + * @return GNUNET_OK on success + */ +static int +iopen (struct Plugin *ret) +{ + char *mysql_dbname; + char *mysql_server; + char *mysql_user; + char *mysql_password; + unsigned long long mysql_port; + my_bool reconnect; + unsigned int timeout; + + ret->dbf = mysql_init (NULL); + if (ret->dbf == NULL) + return GNUNET_SYSERR; + if (ret->cnffile != NULL) + mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile); + mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client"); + reconnect = 0; + mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect); + mysql_options (ret->dbf, MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout); + mysql_options (ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8"); + timeout = 60; /* in seconds */ + mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout); + mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout); + mysql_dbname = NULL; + if (GNUNET_YES == + GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datacache-mysql", + "DATABASE")) + GNUNET_assert (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + "datacache-mysql", + "DATABASE", + &mysql_dbname)); + else + mysql_dbname = GNUNET_strdup ("gnunet"); + mysql_user = NULL; + if (GNUNET_YES == + GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datacache-mysql", + "USER")) + { + GNUNET_assert (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + "datacache-mysql", + "USER", &mysql_user)); + } + mysql_password = NULL; + if (GNUNET_YES == + GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datacache-mysql", + "PASSWORD")) + { + GNUNET_assert (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + "datacache-mysql", + "PASSWORD", + &mysql_password)); + } + mysql_server = NULL; + if (GNUNET_YES == + GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datacache-mysql", + "HOST")) + { + GNUNET_assert (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_string (ret->env->cfg, + "datacache-mysql", + "HOST", + &mysql_server)); + } + mysql_port = 0; + if (GNUNET_YES == + GNUNET_CONFIGURATION_have_value (ret->env->cfg, "datacache-mysql", + "PORT")) + { + GNUNET_assert (GNUNET_OK == + GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, + "datacache-mysql", + "PORT", &mysql_port)); + } + + GNUNET_assert (mysql_dbname != NULL); + mysql_real_connect (ret->dbf, mysql_server, mysql_user, mysql_password, + mysql_dbname, (unsigned int) mysql_port, NULL, + CLIENT_IGNORE_SIGPIPE); + GNUNET_free_non_null (mysql_server); + GNUNET_free_non_null (mysql_user); + GNUNET_free_non_null (mysql_password); + GNUNET_free (mysql_dbname); + if (mysql_error (ret->dbf)[0]) + { + LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_real_connect", ret); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Run the given MySQL statement. + * + * @return GNUNET_OK on success, GNUNET_SYSERR on error + */ +static int +run_statement (struct Plugin *plugin, const char *statement) +{ + if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin))) + return GNUNET_SYSERR; + mysql_query (plugin->dbf, statement); + if (mysql_error (plugin->dbf)[0]) + { + LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_query", plugin); + iclose (plugin); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + +/** + * Create a prepared statement. + * + * @return NULL on error + */ +static struct GNUNET_MysqlStatementHandle * +prepared_statement_create (struct Plugin *plugin, const char *statement) +{ + struct GNUNET_MysqlStatementHandle *ret; + + ret = GNUNET_malloc (sizeof (struct GNUNET_MysqlStatementHandle)); + ret->query = GNUNET_strdup (statement); + GNUNET_CONTAINER_DLL_insert (plugin->shead, plugin->stail, ret); + return ret; +} + + +/** + * Prepare a statement for running. + * + * @return GNUNET_OK on success + */ +static int +prepare_statement (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *ret) +{ + if (GNUNET_YES == ret->valid) + return GNUNET_OK; + if ((NULL == plugin->dbf) && (GNUNET_OK != iopen (plugin))) + return GNUNET_SYSERR; + ret->statement = mysql_stmt_init (plugin->dbf); + if (ret->statement == NULL) + { + iclose (plugin); + return GNUNET_SYSERR; + } + if (mysql_stmt_prepare (ret->statement, ret->query, strlen (ret->query))) + { + LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR, "mysql_stmt_prepare", plugin); + mysql_stmt_close (ret->statement); + ret->statement = NULL; + iclose (plugin); + return GNUNET_SYSERR; + } + ret->valid = GNUNET_YES; + return GNUNET_OK; + +} + + +/** + * Bind the parameters for the given MySQL statement + * and run it. + * + * @param plugin plugin context + * @param s statement to bind and run + * @param ap arguments for the binding + * @return GNUNET_SYSERR on error, GNUNET_OK on success + */ +static int +init_params (struct Plugin *plugin, struct GNUNET_MysqlStatementHandle *s, + va_list ap) +{ + MYSQL_BIND qbind[MAX_PARAM]; + unsigned int pc; + unsigned int off; + enum enum_field_types ft; + + pc = mysql_stmt_param_count (s->statement); + if (pc > MAX_PARAM) + { + /* increase internal constant! */ + GNUNET_break (0); + return GNUNET_SYSERR; + } + memset (qbind, 0, sizeof (qbind)); + off = 0; + ft = 0; + while ((pc > 0) && (-1 != (int) (ft = va_arg (ap, enum enum_field_types)))) + { + qbind[off].buffer_type = ft; + switch (ft) + { + case MYSQL_TYPE_FLOAT: + qbind[off].buffer = va_arg (ap, float *); + + break; + case MYSQL_TYPE_LONGLONG: + qbind[off].buffer = va_arg (ap, unsigned long long *); + qbind[off].is_unsigned = va_arg (ap, int); + + break; + case MYSQL_TYPE_LONG: + qbind[off].buffer = va_arg (ap, unsigned int *); + qbind[off].is_unsigned = va_arg (ap, int); + + break; + case MYSQL_TYPE_VAR_STRING: + case MYSQL_TYPE_STRING: + case MYSQL_TYPE_BLOB: + qbind[off].buffer = va_arg (ap, void *); + qbind[off].buffer_length = va_arg (ap, unsigned long); + qbind[off].length = va_arg (ap, unsigned long *); + + break; + default: + /* unsupported type */ + GNUNET_break (0); + return GNUNET_SYSERR; + } + pc--; + off++; + } + if (!((pc == 0) && (-1 != (int) ft) && (va_arg (ap, int) == -1))) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (mysql_stmt_bind_param (s->statement, qbind)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_bind_param", __FILE__, __LINE__, + mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; + } + if (mysql_stmt_execute (s->statement)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_execute", __FILE__, __LINE__, + mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + +/** + * Type of a callback that will be called for each + * data set returned from MySQL. + * + * @param cls user-defined argument + * @param num_values number of elements in values + * @param values values returned by MySQL + * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort + */ +typedef int (*GNUNET_MysqlDataProcessor) (void *cls, unsigned int num_values, + MYSQL_BIND * values); + + +/** + * Run a prepared SELECT statement. + * + * @param plugin plugin context + * @param s handle to SELECT statment + * @param result_size number of elements in results array + * @param results pointer to already initialized MYSQL_BIND + * array (of sufficient size) for passing results + * @param processor function to call on each result + * @param processor_cls extra argument to processor + * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective + * values (size + buffer-reference for pointers); terminated + * with "-1" + * @return GNUNET_SYSERR on error, otherwise + * the number of successfully affected (or queried) rows + */ +static int +prepared_statement_run_select (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *s, + unsigned int result_size, MYSQL_BIND * results, + GNUNET_MysqlDataProcessor processor, + void *processor_cls, ...) +{ + va_list ap; + int ret; + unsigned int rsize; + int total; + + if (GNUNET_OK != prepare_statement (plugin, s)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + va_start (ap, processor_cls); + if (GNUNET_OK != init_params (plugin, s, ap)) + { + GNUNET_break (0); + va_end (ap); + return GNUNET_SYSERR; + } + va_end (ap); + rsize = mysql_stmt_field_count (s->statement); + if (rsize > result_size) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + if (mysql_stmt_bind_result (s->statement, results)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_bind_result", __FILE__, __LINE__, + mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; + } + + total = 0; + while (1) + { + ret = mysql_stmt_fetch (s->statement); + if (ret == MYSQL_NO_DATA) + break; + if (ret != 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("`%s' failed at %s:%d with error: %s\n"), + "mysql_stmt_fetch", __FILE__, __LINE__, + mysql_stmt_error (s->statement)); + iclose (plugin); + return GNUNET_SYSERR; + } + if (processor != NULL) + if (GNUNET_OK != processor (processor_cls, rsize, results)) + break; + total++; + } + mysql_stmt_reset (s->statement); + return total; +} + + + +/** + * Run a prepared statement that does NOT produce results. + * + * @param plugin plugin context + * @param s handle to SELECT statment + * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective + * values (size + buffer-reference for pointers); terminated + * with "-1" + * @param insert_id NULL or address where to store the row ID of whatever + * was inserted (only for INSERT statements!) + * @return GNUNET_SYSERR on error, otherwise + * the number of successfully affected rows + */ +static int +prepared_statement_run (struct Plugin *plugin, + struct GNUNET_MysqlStatementHandle *s, + unsigned long long *insert_id, ...) +{ + va_list ap; + int affected; + + if (GNUNET_OK != prepare_statement (plugin, s)) + return GNUNET_SYSERR; + va_start (ap, insert_id); + if (GNUNET_OK != init_params (plugin, s, ap)) + { + va_end (ap); + return GNUNET_SYSERR; + } + va_end (ap); + affected = mysql_stmt_affected_rows (s->statement); + if (NULL != insert_id) + *insert_id = (unsigned long long) mysql_stmt_insert_id (s->statement); + mysql_stmt_reset (s->statement); + return affected; +} + + +/** + * Create temporary table and prepare statements. + * + * @param plugin plugin context + * @return GNUNET_OK on success + */ +static int +itable (struct Plugin *plugin) +{ +#define MRUNS(a) (GNUNET_OK != run_statement (plugin, a) ) + if (MRUNS + ("CREATE TEMPORARY TABLE gn080dstore (" + " type INT(11) UNSIGNED NOT NULL DEFAULT 0," + " puttime BIGINT UNSIGNED NOT NULL DEFAULT 0," + " expire BIGINT UNSIGNED NOT NULL DEFAULT 0," + " hash BINARY(64) NOT NULL DEFAULT ''," + " vhash BINARY(64) NOT NULL DEFAULT ''," + " value BLOB NOT NULL DEFAULT ''," + " INDEX hashidx (hash(64),type,expire)," + " INDEX allidx (hash(64),vhash(64),type)," " INDEX expireidx (puttime)" + ") ENGINE=InnoDB") || MRUNS ("SET AUTOCOMMIT = 1")) + return GNUNET_SYSERR; +#undef MRUNS +#define PINIT(a,b) (NULL == (a = prepared_statement_create(plugin, b))) + if (PINIT (plugin->select_value, SELECT_VALUE_STMT) || + PINIT (plugin->count_value, COUNT_VALUE_STMT) || + PINIT (plugin->select_old_value, SELECT_OLD_VALUE_STMT) || + PINIT (plugin->delete_value, DELETE_VALUE_STMT) || + PINIT (plugin->insert_value, INSERT_VALUE_STMT) || + PINIT (plugin->update_value, UPDATE_VALUE_STMT)) + return GNUNET_SYSERR; +#undef PINIT + return GNUNET_OK; +} + + +/** + * Store an item in the datastore. + * + * @param cls closure (our "struct Plugin") + * @param key key to store data under + * @param size number of bytes in data + * @param data data to store + * @param type type of the value + * @param discard_time when to discard the value in any case + * @return 0 on error, number of bytes used otherwise + */ +static size_t +mysql_plugin_put (void *cls, const GNUNET_HashCode * key, size_t size, + const char *data, enum GNUNET_BLOCK_Type type, + struct GNUNET_TIME_Absolute discard_time) +{ + struct Plugin *plugin = cls; + struct GNUNET_TIME_Absolute now; + unsigned long k_length; + unsigned long h_length; + unsigned long v_length; + unsigned long long v_now; + unsigned long long v_discard_time; + unsigned int v_type; + GNUNET_HashCode vhash; + int ret; + + if (size > GNUNET_SERVER_MAX_MESSAGE_SIZE) + return GNUNET_SYSERR; + GNUNET_CRYPTO_hash (data, size, &vhash); + now = GNUNET_TIME_absolute_get (); + + /* first try UPDATE */ + h_length = sizeof (GNUNET_HashCode); + k_length = sizeof (GNUNET_HashCode); + v_length = size; + v_type = type; + v_now = (unsigned long long) now.abs_value; + v_discard_time = (unsigned long long) discard_time.abs_value; + if (GNUNET_OK == + prepared_statement_run (plugin, plugin->update_value, NULL, + MYSQL_TYPE_LONGLONG, &v_now, GNUNET_YES, + MYSQL_TYPE_LONGLONG, &v_discard_time, GNUNET_YES, + MYSQL_TYPE_BLOB, key, sizeof (GNUNET_HashCode), + &k_length, MYSQL_TYPE_BLOB, &vhash, + sizeof (GNUNET_HashCode), &h_length, + MYSQL_TYPE_LONG, &v_type, GNUNET_YES, -1)) + return GNUNET_OK; + + /* now try INSERT */ + h_length = sizeof (GNUNET_HashCode); + k_length = sizeof (GNUNET_HashCode); + v_length = size; + if (GNUNET_OK != + (ret = + prepared_statement_run (plugin, plugin->insert_value, NULL, + MYSQL_TYPE_LONG, &type, GNUNET_YES, + MYSQL_TYPE_LONGLONG, &v_now, GNUNET_YES, + MYSQL_TYPE_LONGLONG, &v_discard_time, GNUNET_YES, + MYSQL_TYPE_BLOB, key, sizeof (GNUNET_HashCode), + &k_length, MYSQL_TYPE_BLOB, &vhash, + sizeof (GNUNET_HashCode), &h_length, + MYSQL_TYPE_BLOB, data, (unsigned long) size, + &v_length, -1))) + { + if (ret == GNUNET_SYSERR) + itable (plugin); + return GNUNET_SYSERR; + } + return size + OVERHEAD; +} + + +static int +return_ok (void *cls, unsigned int num_values, MYSQL_BIND * values) +{ + return GNUNET_OK; +} + + +/** + * Iterate over the results for a particular key + * in the datastore. + * + * @param cls closure (our "struct Plugin") + * @param key + * @param type entries of which type are relevant? + * @param iter maybe NULL (to just count) + * @param iter_cls closure for iter + * @return the number of results found + */ +static unsigned int +mysql_plugin_get (void *cls, const GNUNET_HashCode * key, + enum GNUNET_BLOCK_Type type, GNUNET_DATACACHE_Iterator iter, + void *iter_cls) +{ + struct Plugin *plugin = cls; + MYSQL_BIND rbind[3]; + unsigned long h_length; + unsigned long v_length; + unsigned long long v_expire; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Absolute expire; + unsigned int cnt; + unsigned long long total; + unsigned long long v_now; + unsigned int off; + unsigned int v_type; + int ret; + char buffer[GNUNET_SERVER_MAX_MESSAGE_SIZE]; + + now = GNUNET_TIME_absolute_get (); + h_length = sizeof (GNUNET_HashCode); + v_length = sizeof (buffer); + total = -1; + memset (rbind, 0, sizeof (rbind)); + rbind[0].buffer_type = MYSQL_TYPE_LONGLONG; + rbind[0].buffer = &total; + rbind[0].is_unsigned = GNUNET_YES; + v_type = type; + v_now = (unsigned long long) now.abs_value; + if ((GNUNET_OK != + (ret = + prepared_statement_run_select (plugin, plugin->count_value, 1, rbind, + return_ok, NULL, MYSQL_TYPE_BLOB, key, + sizeof (GNUNET_HashCode), &h_length, + MYSQL_TYPE_LONG, &v_type, GNUNET_YES, + MYSQL_TYPE_LONGLONG, &v_now, GNUNET_YES, + -1))) || (-1 == total)) + { + if (ret == GNUNET_SYSERR) + itable (plugin); + return GNUNET_SYSERR; + } + if ((iter == NULL) || (total == 0)) + return (int) total; + + off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total); + cnt = 0; + while (cnt < total) + { + memset (rbind, 0, sizeof (rbind)); + rbind[0].buffer_type = MYSQL_TYPE_BLOB; + rbind[0].buffer_length = sizeof (buffer); + rbind[0].length = &v_length; + rbind[0].buffer = buffer; + rbind[1].buffer_type = MYSQL_TYPE_LONGLONG; + rbind[1].is_unsigned = 1; + rbind[1].buffer = &v_expire; + off = (off + 1) % total; + if (GNUNET_OK != + (ret = + prepared_statement_run_select (plugin, plugin->select_value, 2, rbind, + return_ok, NULL, MYSQL_TYPE_BLOB, key, + sizeof (GNUNET_HashCode), &h_length, + MYSQL_TYPE_LONG, &v_type, GNUNET_YES, + MYSQL_TYPE_LONGLONG, &v_now, GNUNET_YES, + MYSQL_TYPE_LONG, &off, GNUNET_YES, -1))) + { + if (ret == GNUNET_SYSERR) + itable (plugin); + return GNUNET_SYSERR; + } + cnt++; + expire.abs_value = v_expire; + if (GNUNET_OK != iter (iter_cls, expire, key, v_length, buffer, type)) + break; + } + return cnt; +} + + +/** + * Delete the entry with the lowest expiration value + * from the datacache right now. + * + * @param cls closure (our "struct Plugin") + * @return GNUNET_OK on success, GNUNET_SYSERR on error + */ +static int +mysql_plugin_del (void *cls) +{ + struct Plugin *plugin = cls; + + MYSQL_BIND rbind[5]; + unsigned int v_type; + GNUNET_HashCode v_key; + GNUNET_HashCode vhash; + unsigned long k_length; + unsigned long h_length; + unsigned long v_length; + int ret; + char buffer[GNUNET_SERVER_MAX_MESSAGE_SIZE]; + + k_length = sizeof (GNUNET_HashCode); + h_length = sizeof (GNUNET_HashCode); + v_length = sizeof (buffer); + memset (rbind, 0, sizeof (rbind)); + rbind[0].buffer_type = MYSQL_TYPE_BLOB; + rbind[0].buffer_length = sizeof (GNUNET_HashCode); + rbind[0].length = &k_length; + rbind[0].buffer = &v_key; + rbind[1].buffer_type = MYSQL_TYPE_BLOB; + rbind[1].buffer_length = sizeof (GNUNET_HashCode); + rbind[1].length = &h_length; + rbind[1].buffer = &vhash; + rbind[2].buffer_type = MYSQL_TYPE_LONG; + rbind[2].is_unsigned = 1; + rbind[2].buffer = &v_type; + rbind[3].buffer_type = MYSQL_TYPE_BLOB; + rbind[3].buffer_length = sizeof (buffer); + rbind[3].length = &v_length; + rbind[3].buffer = buffer; + if ((GNUNET_OK != + (ret = + prepared_statement_run_select (plugin, plugin->select_old_value, 4, + rbind, return_ok, NULL, -1))) || + (GNUNET_OK != + (ret = + prepared_statement_run (plugin, plugin->delete_value, NULL, + MYSQL_TYPE_BLOB, &v_key, + sizeof (GNUNET_HashCode), &k_length, + MYSQL_TYPE_BLOB, &vhash, + sizeof (GNUNET_HashCode), &h_length, + MYSQL_TYPE_LONG, &v_type, GNUNET_YES, + MYSQL_TYPE_BLOB, buffer, + (unsigned long) sizeof (buffer), &v_length, + -1)))) + { + if (ret == GNUNET_SYSERR) + itable (plugin); + return GNUNET_SYSERR; + } + plugin->env->delete_notify (plugin->env->cls, &v_key, v_length + OVERHEAD); + + return GNUNET_OK; +} + + +/** + * Entry point for the plugin. + * + * @param cls closure (the "struct GNUNET_DATACACHE_PluginEnvironmnet") + * @return the plugin's closure (our "struct Plugin") + */ +void * +libgnunet_plugin_datacache_mysql_init (void *cls) +{ + struct GNUNET_DATACACHE_PluginEnvironment *env = cls; + struct GNUNET_DATACACHE_PluginFunctions *api; + struct Plugin *plugin; + + plugin = GNUNET_malloc (sizeof (struct Plugin)); + plugin->env = env; + plugin->cnffile = get_my_cnf_path (env->cfg); + if (GNUNET_OK != iopen (plugin)) + { + GNUNET_free_non_null (plugin->cnffile); + GNUNET_free (plugin); + return NULL; + } + if (GNUNET_OK != itable (plugin)) + { + iclose (plugin); + GNUNET_free_non_null (plugin->cnffile); + GNUNET_free (plugin); + return NULL; + } + api = GNUNET_malloc (sizeof (struct GNUNET_DATACACHE_PluginFunctions)); + api->cls = plugin; + api->get = &mysql_plugin_get; + api->put = &mysql_plugin_put; + api->del = &mysql_plugin_del; + GNUNET_log_from (GNUNET_ERROR_TYPE_INFO, "mysql", + _("MySQL datacache running\n")); + return api; +} + + +/** + * Exit point from the plugin. + * + * @param cls closure (our "struct Plugin") + * @return NULL + */ +void * +libgnunet_plugin_datacache_mysql_done (void *cls) +{ + struct GNUNET_DATACACHE_PluginFunctions *api = cls; + struct Plugin *plugin = api->cls; + + iclose (plugin); + GNUNET_free_non_null (plugin->cnffile); + GNUNET_free (plugin); + GNUNET_free (api); + mysql_library_end (); + return NULL; +} + + +/* end of plugin_datacache_mysql.c */ |