aboutsummaryrefslogtreecommitdiff
path: root/src/datastore/plugin_datastore_postgres.c
diff options
context:
space:
mode:
authorgrothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96>2011-04-15 13:08:23 +0000
committergrothoff <grothoff@140774ce-b5e7-0310-ab8b-a85725594a96>2011-04-15 13:08:23 +0000
commit3cbd42f75a28a6ae238279094574b589f411f978 (patch)
tree58b40b99d1ee97a19fe47c9590b352473736e17d /src/datastore/plugin_datastore_postgres.c
parentccb949b55d7009fa95626ae530982db9cbccf1cb (diff)
fixes
git-svn-id: https://gnunet.org/svn/gnunet@14992 140774ce-b5e7-0310-ab8b-a85725594a96
Diffstat (limited to 'src/datastore/plugin_datastore_postgres.c')
-rw-r--r--src/datastore/plugin_datastore_postgres.c611
1 files changed, 286 insertions, 325 deletions
diff --git a/src/datastore/plugin_datastore_postgres.c b/src/datastore/plugin_datastore_postgres.c
index 1ff56da315..2cecfa9a11 100644
--- a/src/datastore/plugin_datastore_postgres.c
+++ b/src/datastore/plugin_datastore_postgres.c
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+ (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -30,45 +30,6 @@
#define DEBUG_POSTGRES GNUNET_NO
-#define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
- "WHERE (prio = $1 AND oid > $2) " \
- "ORDER BY prio ASC,oid ASC LIMIT 1) "\
- "UNION "\
- "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
- "WHERE (prio > $1 AND oid != $2)"\
- "ORDER BY prio ASC,oid ASC LIMIT 1)"\
- "ORDER BY prio ASC,oid ASC LIMIT 1"
-
-#define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
- "WHERE (prio = $1 AND oid < $2)"\
- " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
- "UNION "\
- "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
- "WHERE (prio < $1 AND oid != $2)"\
- " AND anonLevel=0 ORDER BY prio DESC,oid DESC LIMIT 1) "\
- "ORDER BY prio DESC,oid DESC LIMIT 1"
-
-#define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
- "WHERE (expire = $1 AND oid > $2) "\
- "ORDER BY expire ASC,oid ASC LIMIT 1) "\
- "UNION "\
- "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
- "WHERE (expire > $1 AND oid != $2) " \
- "ORDER BY expire ASC,oid ASC LIMIT 1)"\
- "ORDER BY expire ASC,oid ASC LIMIT 1"
-
-
-#define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
- "WHERE (expire = $1 AND oid < $2)"\
- " AND expire > $3 AND type!=3"\
- " ORDER BY expire DESC,oid DESC LIMIT 1) "\
- "UNION "\
- "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "\
- "WHERE (expire < $1 AND oid != $2)" \
- " AND expire > $3 AND type!=3"\
- " ORDER BY expire DESC,oid DESC LIMIT 1)"\
- "ORDER BY expire DESC,oid DESC LIMIT 1"
-
/**
* After how many ms "busy" should a DB operation fail for good?
* A low value makes sure that we are more responsive to requests
@@ -140,7 +101,7 @@ struct NextRequestClosure
/**
* Number of entries found so far
*/
- long long count;
+ unsigned long long count;
/**
* Offset this iteration starts at.
@@ -153,24 +114,14 @@ struct NextRequestClosure
uint64_t blimit_off;
/**
- * Overall number of matching entries.
- */
- unsigned long long total;
-
- /**
- * Expiration value of previous result (possible parameter), big-endian.
+ * Current total number of entries found so far, big-endian.
*/
- uint64_t blast_expire;
+ uint64_t bcount;
/**
- * Row ID of last result (possible paramter), big-endian.
- */
- uint32_t blast_rowid;
-
- /**
- * Priority of last result (possible parameter), big-endian.
+ * Overall number of matching entries.
*/
- uint32_t blast_prio;
+ unsigned long long total;
/**
* Type of block (possible paramter), big-endian.
@@ -181,6 +132,11 @@ struct NextRequestClosure
* Flag set to GNUNET_YES to stop iteration.
*/
int end_it;
+
+ /**
+ * Flag to indicate that there should only be one result.
+ */
+ int one_shot;
};
@@ -336,6 +292,7 @@ init_connection (struct Plugin *plugin)
GNUNET_free_non_null (conninfo);
ret = PQexec (plugin->dbh,
"CREATE TABLE gn090 ("
+ " repl INTEGER NOT NULL DEFAULT 0,"
" type INTEGER NOT NULL DEFAULT 0,"
" prio INTEGER NOT NULL DEFAULT 0,"
" anonLevel INTEGER NOT NULL DEFAULT 0,"
@@ -385,7 +342,6 @@ init_connection (struct Plugin *plugin)
}
}
PQclear (ret);
-#if 1
ret = PQexec (plugin->dbh,
"ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
if (GNUNET_OK !=
@@ -421,44 +377,43 @@ init_connection (struct Plugin *plugin)
return GNUNET_SYSERR;
}
PQclear (ret);
-#endif
if ((GNUNET_OK !=
pq_prepare (plugin,
"getvt",
"SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
"WHERE hash=$1 AND vhash=$2 AND type=$3 "
- "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
- 5,
+ "ORDER BY oid ASC LIMIT 1 OFFSET $4",
+ 4,
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
"gett",
"SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 AND type=$2"
- "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
- 4,
+ "WHERE hash=$1 AND type=$2 "
+ "ORDER BY oid ASC LIMIT 1 OFFSET $3",
+ 3,
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
"getv",
"SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1 AND vhash=$2"
- "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
- 4,
+ "WHERE hash=$1 AND vhash=$2 "
+ "ORDER BY oid ASC LIMIT 1 OFFSET $3",
+ 3,
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
"get",
"SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
- "WHERE hash=$1"
- "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
- 3,
+ "WHERE hash=$1 "
+ "ORDER BY oid ASC LIMIT 1 OFFSET $2",
+ 2,
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
"put",
- "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, vhash, value) "
- "VALUES ($1, $2, $3, $4, $5, $6, $7)",
+ "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, hash, vhash, value) "
+ "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
8,
__LINE__)) ||
(GNUNET_OK !=
@@ -470,32 +425,42 @@ init_connection (struct Plugin *plugin)
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
- "select_low_priority",
- SELECT_IT_LOW_PRIORITY,
- 2,
+ "decrepl",
+ "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
+ "WHERE oid = $1",
+ 1,
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
"select_non_anonymous",
- SELECT_IT_NON_ANONYMOUS,
- 2,
+ "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
+ "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1",
+ 1,
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
- "select_expiration_time",
- SELECT_IT_EXPIRATION_TIME,
- 2,
+ "select_expiration_order",
+ "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
+ "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "
+ "UNION "
+ "(SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 "
+ "ORDER BY prio ASC LIMIT 1) "
+ "ORDER BY expire ASC LIMIT 1",
+ 1,
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
- "select_migration_order",
- SELECT_IT_MIGRATION_ORDER,
- 3,
+ "select_replication_order",
+ "SELECT type, prio, anonLevel, expire, hash, value, oid FROM gn090 " \
+ "ORDER BY repl DESC,RANDOM() LIMIT 1",
+ 0,
__LINE__)) ||
(GNUNET_OK !=
pq_prepare (plugin,
"delrow",
- "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__)))
+ "DELETE FROM gn090 " "WHERE oid=$1",
+ 1,
+ __LINE__)))
{
PQfinish (plugin->dbh);
plugin->dbh = NULL;
@@ -610,8 +575,10 @@ postgres_plugin_put (void *cls,
uint32_t btype = htonl (type);
uint32_t bprio = htonl (priority);
uint32_t banon = htonl (anonymity);
+ uint32_t brepl = htonl (replication);
uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
const char *paramValues[] = {
+ (const char *) &brepl,
(const char *) &btype,
(const char *) &bprio,
(const char *) &banon,
@@ -621,6 +588,7 @@ postgres_plugin_put (void *cls,
(const char *) data
};
int paramLengths[] = {
+ sizeof (brepl),
sizeof (btype),
sizeof (bprio),
sizeof (banon),
@@ -629,11 +597,11 @@ postgres_plugin_put (void *cls,
sizeof (GNUNET_HashCode),
size
};
- const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 };
+ const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
GNUNET_CRYPTO_hash (data, size, &vhash);
ret = PQexecPrepared (plugin->dbh,
- "put", 7, paramValues, paramLengths, paramFormats, 1);
+ "put", 8, paramValues, paramLengths, paramFormats, 1);
if (GNUNET_OK != check_result (plugin, ret,
PGRES_COMMAND_OK,
"PQexecPrepared", "put", __LINE__))
@@ -649,6 +617,7 @@ postgres_plugin_put (void *cls,
return GNUNET_OK;
}
+
/**
* Function invoked on behalf of a "PluginIterator"
* asking the database plugin to call the iterator
@@ -690,15 +659,11 @@ postgres_next_request_cont (void *next_cls,
GNUNET_TIME_UNIT_ZERO_ABS, 0);
GNUNET_free (nrc);
return;
- }
-
- if (nrc->count == 0)
- nrc->blimit_off = GNUNET_htonll (nrc->off);
- else
- nrc->blimit_off = GNUNET_htonll (0);
- if (nrc->count + nrc->off == nrc->total)
- nrc->blast_rowid = htonl (0); /* back to start */
-
+ }
+ if (nrc->off == nrc->total)
+ nrc->off = 0;
+ nrc->blimit_off = GNUNET_htonll (nrc->off);
+ nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count);
res = PQexecPrepared (plugin->dbh,
nrc->pname,
nrc->nparams,
@@ -773,14 +738,10 @@ postgres_next_request_cont (void *next_cls,
priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 3));
- memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
+ memcpy (&key,
+ PQgetvalue (res, 0, 4),
+ sizeof (GNUNET_HashCode));
size = PQgetlength (res, 0, 5);
-
- nrc->blast_prio = htonl (priority);
- nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value);
- nrc->blast_rowid = htonl (rowid);
- nrc->count++;
-
#if DEBUG_POSTGRES
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
"datastore-postgres",
@@ -789,7 +750,7 @@ postgres_next_request_cont (void *next_cls,
(unsigned int) type);
#endif
iret = nrc->iter (nrc->iter_cls,
- nrc,
+ (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
&key,
size,
PQgetvalue (res, 0, 5),
@@ -799,6 +760,11 @@ postgres_next_request_cont (void *next_cls,
expiration_time,
rowid);
PQclear (res);
+ if (iret != GNUNET_NO)
+ {
+ nrc->count++;
+ nrc->off++;
+ }
if (iret == GNUNET_SYSERR)
{
#if DEBUG_POSTGRES
@@ -828,6 +794,8 @@ postgres_next_request_cont (void *next_cls,
#endif
}
}
+ if (nrc->one_shot == GNUNET_YES)
+ GNUNET_free (nrc);
}
@@ -858,183 +826,6 @@ postgres_plugin_next_request (void *next_cls,
/**
- * Update the priority for a particular key in the datastore. If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept. For the
- * anonymity level, the lower value is to be used. The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- * change? If priority + delta < 0 the
- * priority should be set to 0 (never go
- * negative).
- * @param expire new expiration time should be the
- * MAX of any existing expiration time and
- * this value
- * @param msg set to error message
- * @return GNUNET_OK on success
- */
-static int
-postgres_plugin_update (void *cls,
- uint64_t uid,
- int delta, struct GNUNET_TIME_Absolute expire,
- char **msg)
-{
- struct Plugin *plugin = cls;
- PGresult *ret;
- int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
- uint32_t boid = htonl ( (uint32_t) uid);
- uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
- const char *paramValues[] = {
- (const char *) &bdelta,
- (const char *) &bexpire,
- (const char *) &boid,
- };
- int paramLengths[] = {
- sizeof (bdelta),
- sizeof (bexpire),
- sizeof (boid),
- };
- const int paramFormats[] = { 1, 1, 1 };
-
- ret = PQexecPrepared (plugin->dbh,
- "update",
- 3, paramValues, paramLengths, paramFormats, 1);
- if (GNUNET_OK != check_result (plugin,
- ret,
- PGRES_COMMAND_OK,
- "PQexecPrepared", "update", __LINE__))
- return GNUNET_SYSERR;
- PQclear (ret);
- return GNUNET_OK;
-}
-
-
-/**
- * Call a method for each key in the database and
- * call the callback method on it.
- *
- * @param plugin global context
- * @param type entries of which type should be considered?
- * @param is_asc ascending or descending iteration?
- * @param iter_select which SELECT method should be used?
- * @param iter maybe NULL (to just count); iter
- * should return GNUNET_SYSERR to abort the
- * iteration, GNUNET_NO to delete the entry and
- * continue and GNUNET_OK to continue iterating
- * @param iter_cls closure for 'iter'
- */
-static void
-postgres_iterate (struct Plugin *plugin,
- unsigned int type,
- int is_asc,
- unsigned int iter_select,
- PluginIterator iter, void *iter_cls)
-{
- struct NextRequestClosure *nrc;
-
- nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
- nrc->count = UINT32_MAX;
- nrc->plugin = plugin;
- nrc->iter = iter;
- nrc->iter_cls = iter_cls;
- if (is_asc)
- {
- nrc->blast_prio = htonl (0);
- nrc->blast_rowid = htonl (0);
- nrc->blast_expire = htonl (0);
- }
- else
- {
- nrc->blast_prio = htonl (0x7FFFFFFFL);
- nrc->blast_rowid = htonl (0xFFFFFFFF);
- nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
- }
- switch (iter_select)
- {
- case 0:
- nrc->pname = "select_low_priority";
- nrc->nparams = 2;
- nrc->paramValues[0] = (const char *) &nrc->blast_prio;
- nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
- nrc->paramLengths[0] = sizeof (nrc->blast_prio);
- nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
- break;
- case 1:
- nrc->pname = "select_non_anonymous";
- nrc->nparams = 2;
- nrc->paramValues[0] = (const char *) &nrc->blast_prio;
- nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
- nrc->paramLengths[0] = sizeof (nrc->blast_prio);
- nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
- break;
- case 2:
- nrc->pname = "select_expiration_time";
- nrc->nparams = 2;
- nrc->paramValues[0] = (const char *) &nrc->blast_expire;
- nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
- nrc->paramLengths[0] = sizeof (nrc->blast_expire);
- nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
- break;
- case 3:
- nrc->pname = "select_migration_order";
- nrc->nparams = 3;
- nrc->paramValues[0] = (const char *) &nrc->blast_expire;
- nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
- nrc->paramValues[2] = (const char *) &nrc->bnow;
- nrc->paramLengths[0] = sizeof (nrc->blast_expire);
- nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
- nrc->paramLengths[2] = sizeof (nrc->bnow);
- break;
- default:
- GNUNET_break (0);
- iter (iter_cls,
- NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
- GNUNET_free (nrc);
- return;
- }
- nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()).abs_value__;
- postgres_plugin_next_request (nrc,
- GNUNET_NO);
-}
-
-
-/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
- *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
- */
-static void
-postgres_plugin_iter_low_priority (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
-{
- struct Plugin *plugin = cls;
-
- postgres_iterate (plugin,
- type,
- GNUNET_YES, 0,
- iter, iter_cls);
-}
-
-
-
-
-/**
* Iterate over the results for a particular key
* in the datastore.
*
@@ -1063,12 +854,7 @@ postgres_plugin_get (void *cls,
const int paramFormats[] = { 1, 1, 1, 1, 1 };
PGresult *ret;
- if (key == NULL)
- {
- postgres_plugin_iter_low_priority (plugin, type,
- iter, iter_cls);
- return;
- }
+ GNUNET_assert (key != NULL);
nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
nrc->plugin = plugin;
nrc->iter = iter;
@@ -1087,11 +873,9 @@ postgres_plugin_get (void *cls,
nrc->paramLengths[1] = sizeof (nrc->vhash);
nrc->paramValues[2] = (const char *) &nrc->btype;
nrc->paramLengths[2] = sizeof (nrc->btype);
- nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
- nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
- nrc->paramValues[4] = (const char *) &nrc->blimit_off;
- nrc->paramLengths[4] = sizeof (nrc->blimit_off);
- nrc->nparams = 5;
+ nrc->paramValues[3] = (const char *) &nrc->blimit_off;
+ nrc->paramLengths[3] = sizeof (nrc->blimit_off);
+ nrc->nparams = 4;
nrc->pname = "getvt";
ret = PQexecParams (plugin->dbh,
"SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2 AND type=$3",
@@ -1105,11 +889,9 @@ postgres_plugin_get (void *cls,
{
nrc->paramValues[1] = (const char *) &nrc->btype;
nrc->paramLengths[1] = sizeof (nrc->btype);
- nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
- nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
- nrc->paramValues[3] = (const char *) &nrc->blimit_off;
- nrc->paramLengths[3] = sizeof (nrc->blimit_off);
- nrc->nparams = 4;
+ nrc->paramValues[2] = (const char *) &nrc->blimit_off;
+ nrc->paramLengths[2] = sizeof (nrc->blimit_off);
+ nrc->nparams = 3;
nrc->pname = "gett";
ret = PQexecParams (plugin->dbh,
"SELECT count(*) FROM gn090 WHERE hash=$1 AND type=$2",
@@ -1126,11 +908,9 @@ postgres_plugin_get (void *cls,
{
nrc->paramValues[1] = (const char *) &nrc->vhash;
nrc->paramLengths[1] = sizeof (nrc->vhash);
- nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
- nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
- nrc->paramValues[3] = (const char *) &nrc->blimit_off;
- nrc->paramLengths[3] = sizeof (nrc->blimit_off);
- nrc->nparams = 4;
+ nrc->paramValues[2] = (const char *) &nrc->blimit_off;
+ nrc->paramLengths[2] = sizeof (nrc->blimit_off);
+ nrc->nparams = 3;
nrc->pname = "getv";
ret = PQexecParams (plugin->dbh,
"SELECT count(*) FROM gn090 WHERE hash=$1 AND vhash=$2",
@@ -1142,11 +922,9 @@ postgres_plugin_get (void *cls,
}
else
{
- nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
- nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
- nrc->paramValues[2] = (const char *) &nrc->blimit_off;
- nrc->paramLengths[2] = sizeof (nrc->blimit_off);
- nrc->nparams = 3;
+ nrc->paramValues[1] = (const char *) &nrc->blimit_off;
+ nrc->paramLengths[1] = sizeof (nrc->blimit_off);
+ nrc->nparams = 2;
nrc->pname = "get";
ret = PQexecParams (plugin->dbh,
"SELECT count(*) FROM gn090 WHERE hash=$1",
@@ -1200,6 +978,131 @@ postgres_plugin_get (void *cls,
/**
+ * Select a subset of the items in the datastore and call
+ * the given iterator for each of them.
+ *
+ * @param cls our "struct Plugin*"
+ * @param type entries of which type should be considered?
+ * Use 0 for any type.
+ * @param iter function to call on each matching value;
+ * will be called once with a NULL value at the end
+ * @param iter_cls closure for iter
+ */
+static void
+postgres_plugin_iter_zero_anonymity (void *cls,
+ enum GNUNET_BLOCK_Type type,
+ PluginIterator iter,
+ void *iter_cls)
+{
+ struct Plugin *plugin = cls;
+ struct NextRequestClosure *nrc;
+
+ nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->btype = htonl ((uint32_t) type);
+ nrc->plugin = plugin;
+ nrc->iter = iter;
+ nrc->iter_cls = iter_cls;
+ nrc->pname = "select_non_anonymous";
+ nrc->nparams = 1;
+ nrc->paramLengths[0] = sizeof (nrc->bcount);
+ nrc->paramValues[0] = (const char*) &nrc->bcount;
+ postgres_plugin_next_request (nrc,
+ GNUNET_NO);
+}
+
+/**
+ * Context for 'repl_iter' function.
+ */
+struct ReplCtx
+{
+
+ /**
+ * Plugin handle.
+ */
+ struct Plugin *plugin;
+
+ /**
+ * Function to call for the result (or the NULL).
+ */
+ PluginIterator iter;
+
+ /**
+ * Closure for iter.
+ */
+ void *iter_cls;
+};
+
+
+/**
+ * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Decrements the replication counter and calls the original
+ * iterator.
+ *
+ * @param cls closure
+ * @param next_cls closure to pass to the "next" function.
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ * maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ * (continue on call to "next", of course),
+ * GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+repl_iter (void *cls,
+ void *next_cls,
+ const GNUNET_HashCode *key,
+ uint32_t size,
+ const void *data,
+ enum GNUNET_BLOCK_Type type,
+ uint32_t priority,
+ uint32_t anonymity,
+ struct GNUNET_TIME_Absolute expiration,
+ uint64_t uid)
+{
+ struct ReplCtx *rc = cls;
+ struct Plugin *plugin = rc->plugin;
+ int ret;
+ PGresult *qret;
+ uint32_t boid;
+
+ ret = rc->iter (rc->iter_cls,
+ next_cls, key,
+ size, data,
+ type, priority, anonymity, expiration,
+ uid);
+ if (NULL != key)
+ {
+ boid = htonl ( (uint32_t) uid);
+ const char *paramValues[] = {
+ (const char *) &boid,
+ };
+ int paramLengths[] = {
+ sizeof (boid),
+ };
+ const int paramFormats[] = { 1 };
+ qret = PQexecPrepared (plugin->dbh,
+ "decrepl",
+ 1, paramValues, paramLengths, paramFormats, 1);
+ if (GNUNET_OK != check_result (plugin,
+ qret,
+ PGRES_COMMAND_OK,
+ "PQexecPrepared",
+ "decrepl", __LINE__))
+ return GNUNET_SYSERR;
+ PQclear (qret);
+ }
+ return ret;
+}
+
+
+/**
* Get a random item for replication. Returns a single, not expired, random item
* from those with the highest replication counters. The item's
* replication counter is decremented by one IF it was positive before.
@@ -1213,9 +1116,21 @@ static void
postgres_plugin_replication_get (void *cls,
PluginIterator iter, void *iter_cls)
{
- /* FIXME: not implemented! */
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ struct Plugin *plugin = cls;
+ struct NextRequestClosure *nrc;
+ struct ReplCtx rc;
+
+ rc.plugin = plugin;
+ rc.iter = iter;
+ rc.iter_cls = iter_cls;
+ nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->one_shot = GNUNET_YES;
+ nrc->plugin = plugin;
+ nrc->iter = &repl_iter;
+ nrc->iter_cls = &rc;
+ nrc->pname = "select_replication_order";
+ nrc->nparams = 0;
+ postgres_next_request_cont (nrc, NULL);
}
@@ -1231,34 +1146,80 @@ static void
postgres_plugin_expiration_get (void *cls,
PluginIterator iter, void *iter_cls)
{
- /* FIXME: not implemented! */
- iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,
- GNUNET_TIME_UNIT_ZERO_ABS, 0);
+ struct Plugin *plugin = cls;
+ struct NextRequestClosure *nrc;
+ uint64_t btime;
+
+ btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
+ nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+ nrc->one_shot = GNUNET_YES;
+ nrc->plugin = plugin;
+ nrc->iter = iter;
+ nrc->iter_cls = iter_cls;
+ nrc->pname = "select_expiration_order";
+ nrc->nparams = 1;
+ nrc->paramValues[0] = (const char *) &btime;
+ nrc->paramLengths[0] = sizeof (btime);
+ postgres_next_request_cont (nrc, NULL);
}
/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Update the priority for a particular key in the datastore. If
+ * the expiration time in value is different than the time found in
+ * the datastore, the higher value should be kept. For the
+ * anonymity level, the lower value is to be used. The specified
+ * priority should be added to the existing priority, ignoring the
+ * priority in value.
+ *
+ * Note that it is possible for multiple values to match this put.
+ * In that case, all of the respective values are updated.
*
* @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- * Use 0 for any type.
- * @param iter function to call on each matching value;
- * will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param uid unique identifier of the datum
+ * @param delta by how much should the priority
+ * change? If priority + delta < 0 the
+ * priority should be set to 0 (never go
+ * negative).
+ * @param expire new expiration time should be the
+ * MAX of any existing expiration time and
+ * this value
+ * @param msg set to error message
+ * @return GNUNET_OK on success
*/
-static void
-postgres_plugin_iter_zero_anonymity (void *cls,
- enum GNUNET_BLOCK_Type type,
- PluginIterator iter,
- void *iter_cls)
+static int
+postgres_plugin_update (void *cls,
+ uint64_t uid,
+ int delta, struct GNUNET_TIME_Absolute expire,
+ char **msg)
{
struct Plugin *plugin = cls;
+ PGresult *ret;
+ int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
+ uint32_t boid = htonl ( (uint32_t) uid);
+ uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
+ const char *paramValues[] = {
+ (const char *) &bdelta,
+ (const char *) &bexpire,
+ (const char *) &boid,
+ };
+ int paramLengths[] = {
+ sizeof (bdelta),
+ sizeof (bexpire),
+ sizeof (boid),
+ };
+ const int paramFormats[] = { 1, 1, 1 };
- postgres_iterate (plugin,
- type, GNUNET_NO, 1,
- iter, iter_cls);
+ ret = PQexecPrepared (plugin->dbh,
+ "update",
+ 3, paramValues, paramLengths, paramFormats, 1);
+ if (GNUNET_OK != check_result (plugin,
+ ret,
+ PGRES_COMMAND_OK,
+ "PQexecPrepared", "update", __LINE__))
+ return GNUNET_SYSERR;
+ PQclear (ret);
+ return GNUNET_OK;
}