exchange/src/auditordb/plugin_auditordb_postgres.c

570 lines
15 KiB
C
Raw Normal View History

2016-10-06 15:17:10 +02:00
/*
This file is part of TALER
Copyright (C) 2014-2016 GNUnet e.V.
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
/**
* @file plugin_auditordb_postgres.c
* @brief Low-level (statement-level) Postgres database access for the auditor
* @author Florian Dold
* @author Christian Grothoff
* @author Sree Harsha Totakura
*/
#include "platform.h"
#include "taler_pq_lib.h"
#include "taler_auditordb_plugin.h"
#include <pthread.h>
#include <libpq-fe.h>
/**
* Log a query error.
*
* @param result PQ result object of the query that failed
*/
#define QUERY_ERR(result) \
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Query failed at %s:%u: %s (%s)\n", __FILE__, __LINE__, PQresultErrorMessage (result), PQresStatus (PQresultStatus (result)))
/**
* Log a really unexpected PQ error.
*
* @param result PQ result object of the PQ operation that failed
*/
#define BREAK_DB_ERR(result) do { \
GNUNET_break (0); \
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Database failure: %s (%s)\n", PQresultErrorMessage (result), PQresStatus (PQresultStatus (result))); \
} while (0)
/**
* Shorthand for exit jumps. Logs the current line number
* and jumps to the "EXITIF_exit" label.
*
* @param cond condition that must be TRUE to exit with an error
*/
#define EXITIF(cond) \
do { \
if (cond) { GNUNET_break (0); goto EXITIF_exit; } \
} while (0)
/**
* Execute an SQL statement and log errors on failure. Must be
* run in a function that has an "SQLEXEC_fail" label to jump
* to in case the SQL statement failed.
*
* @param conn database connection
* @param sql SQL statement to run
*/
#define SQLEXEC_(conn, sql) \
do { \
PGresult *result = PQexec (conn, sql); \
if (PGRES_COMMAND_OK != PQresultStatus (result)) \
{ \
BREAK_DB_ERR (result); \
PQclear (result); \
goto SQLEXEC_fail; \
} \
PQclear (result); \
} while (0)
/**
* Run an SQL statement, ignoring errors and clearing the result.
*
* @param conn database connection
* @param sql SQL statement to run
*/
#define SQLEXEC_IGNORE_ERROR_(conn, sql) \
do { \
PGresult *result = PQexec (conn, sql); \
PQclear (result); \
} while (0)
/**
* Handle for a database session (per-thread, for transactions).
*/
struct TALER_AUDITORDB_Session
{
/**
* Postgres connection handle.
*/
PGconn *conn;
};
/**
* Type of the "cls" argument given to each of the functions in
* our API.
*/
struct PostgresClosure
{
/**
* Thread-local database connection.
* Contains a pointer to `PGconn` or NULL.
*/
pthread_key_t db_conn_threadlocal;
/**
* Database connection string, as read from
* the configuration.
*/
char *connection_cfg_str;
};
/**
* Function called by libpq whenever it wants to log something.
* We already log whenever we care, so this function does nothing
* and merely exists to silence the libpq logging.
*
* @param arg NULL
* @param res information about some libpq event
*/
static void
pq_notice_receiver_cb (void *arg,
const PGresult *res)
{
/* do nothing, intentionally */
}
/**
* Function called by libpq whenever it wants to log something.
* We log those using the Taler logger.
*
* @param arg NULL
* @param message information about some libpq event
*/
static void
pq_notice_processor_cb (void *arg,
const char *message)
{
GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
"pq",
"%s",
message);
}
/**
* Establish connection to the Postgres database
* and initialize callbacks for logging.
*
* @param pc configuration to use
* @return NULL on error
*/
static PGconn *
connect_to_postgres (struct PostgresClosure *pc)
{
PGconn *conn;
conn = PQconnectdb (pc->connection_cfg_str);
if (CONNECTION_OK !=
PQstatus (conn))
{
TALER_LOG_ERROR ("Database connection failed: %s\n",
PQerrorMessage (conn));
GNUNET_break (0);
return NULL;
}
PQsetNoticeReceiver (conn,
&pq_notice_receiver_cb,
NULL);
PQsetNoticeProcessor (conn,
&pq_notice_processor_cb,
NULL);
return conn;
}
/**
* Drop all Taler tables. This should only be used by testcases.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure
*/
static int
postgres_drop_tables (void *cls)
{
struct PostgresClosure *pc = cls;
PGconn *conn;
conn = connect_to_postgres (pc);
if (NULL == conn)
return GNUNET_SYSERR;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Dropping ALL tables\n");
SQLEXEC_ (conn,
"DROP TABLE IF EXISTS test;");
PQfinish (conn);
return GNUNET_OK;
SQLEXEC_fail:
PQfinish (conn);
return GNUNET_SYSERR;
}
/**
* Create the necessary tables if they are not present
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure
*/
static int
postgres_create_tables (void *cls)
{
struct PostgresClosure *pc = cls;
PGconn *conn;
conn = connect_to_postgres (pc);
if (NULL == conn)
return GNUNET_SYSERR;
#define SQLEXEC(sql) SQLEXEC_(conn, sql);
#define SQLEXEC_INDEX(sql) SQLEXEC_IGNORE_ERROR_(conn, sql);
SQLEXEC ("CREATE TABLE IF NOT EXISTS test"
"(test_pub BYTEA NOT NULL CHECK (LENGTH(master_pub)=32"
")");
SQLEXEC_INDEX("CREATE INDEX testx "
"ON test(test_pub)");
#undef SQLEXEC
#undef SQLEXEC_INDEX
PQfinish (conn);
return GNUNET_OK;
SQLEXEC_fail:
PQfinish (conn);
return GNUNET_SYSERR;
}
/**
* Setup prepared statements.
*
* @param db_conn connection handle to initialize
* @return #GNUNET_OK on success, #GNUNET_SYSERR on failure
*/
static int
postgres_prepare (PGconn *db_conn)
{
PGresult *result;
#define PREPARE(name, sql, ...) \
do { \
result = PQprepare (db_conn, name, sql, __VA_ARGS__); \
if (PGRES_COMMAND_OK != PQresultStatus (result)) \
{ \
BREAK_DB_ERR (result); \
PQclear (result); result = NULL; \
return GNUNET_SYSERR; \
} \
PQclear (result); result = NULL; \
} while (0);
/* Used in #postgres_XXX() */
PREPARE ("test_insert",
"INSERT INTO test "
"(test_pub"
") VALUES "
"($1);",
1, NULL);
return GNUNET_OK;
#undef PREPARE
}
/**
* Close thread-local database connection when a thread is destroyed.
*
* @param cls closure we get from pthreads (the db handle)
*/
static void
db_conn_destroy (void *cls)
{
struct TALER_AUDITORDB_Session *session = cls;
PGconn *db_conn = session->conn;
if (NULL != db_conn)
PQfinish (db_conn);
GNUNET_free (session);
}
/**
* Get the thread-local database-handle.
* Connect to the db if the connection does not exist yet.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @return the database connection, or NULL on error
*/
static struct TALER_AUDITORDB_Session *
postgres_get_session (void *cls)
{
struct PostgresClosure *pc = cls;
PGconn *db_conn;
struct TALER_AUDITORDB_Session *session;
if (NULL != (session = pthread_getspecific (pc->db_conn_threadlocal)))
return session;
db_conn = connect_to_postgres (pc);
if (NULL == db_conn)
return NULL;
if (GNUNET_OK !=
postgres_prepare (db_conn))
{
GNUNET_break (0);
PQfinish (db_conn);
return NULL;
}
session = GNUNET_new (struct TALER_AUDITORDB_Session);
session->conn = db_conn;
if (0 != pthread_setspecific (pc->db_conn_threadlocal,
session))
{
GNUNET_break (0);
PQfinish (db_conn);
GNUNET_free (session);
return NULL;
}
return session;
}
/**
* Start a transaction.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @param session the database connection
* @return #GNUNET_OK on success
*/
static int
postgres_start (void *cls,
struct TALER_AUDITORDB_Session *session)
{
PGresult *result;
result = PQexec (session->conn,
"START TRANSACTION ISOLATION LEVEL SERIALIZABLE");
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
TALER_LOG_ERROR ("Failed to start transaction: %s\n",
PQresultErrorMessage (result));
GNUNET_break (0);
PQclear (result);
return GNUNET_SYSERR;
}
PQclear (result);
return GNUNET_OK;
}
/**
* Roll back the current transaction of a database connection.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @param session the database connection
* @return #GNUNET_OK on success
*/
static void
postgres_rollback (void *cls,
struct TALER_AUDITORDB_Session *session)
{
PGresult *result;
result = PQexec (session->conn,
"ROLLBACK");
GNUNET_break (PGRES_COMMAND_OK ==
PQresultStatus (result));
PQclear (result);
}
/**
* Commit the current transaction of a database connection.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @param session the database connection
* @return #GNUNET_OK on success
*/
static int
postgres_commit (void *cls,
struct TALER_AUDITORDB_Session *session)
{
PGresult *result;
result = PQexec (session->conn,
"COMMIT");
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
const char *sqlstate;
sqlstate = PQresultErrorField (result,
PG_DIAG_SQLSTATE);
if (NULL == sqlstate)
{
/* very unexpected... */
GNUNET_break (0);
PQclear (result);
return GNUNET_SYSERR;
}
/* 40P01: deadlock, 40001: serialization failure */
if ( (0 == strcmp (sqlstate,
"40P01")) ||
(0 == strcmp (sqlstate,
"40001")) )
{
/* These two can be retried and have a fair chance of working
the next time */
PQclear (result);
return GNUNET_NO;
}
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Database commit failure: %s\n",
sqlstate);
PQclear (result);
return GNUNET_SYSERR;
}
PQclear (result);
return GNUNET_OK;
}
/**
* Function called to perform "garbage collection" on the
* database, expiring records we no longer require.
*
* @param cls closure
* @return #GNUNET_OK on success,
* #GNUNET_SYSERR on DB errors
*/
static int
postgres_gc (void *cls)
{
struct PostgresClosure *pc = cls;
struct GNUNET_TIME_Absolute now;
struct GNUNET_PQ_QueryParam params_time[] = {
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_end
};
PGconn *conn;
PGresult *result;
now = GNUNET_TIME_absolute_get ();
conn = connect_to_postgres (pc);
if (NULL == conn)
return GNUNET_SYSERR;
if (GNUNET_OK !=
postgres_prepare (conn))
{
PQfinish (conn);
return GNUNET_SYSERR;
}
result = GNUNET_PQ_exec_prepared (conn,
"gc_auditor",
params_time);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
BREAK_DB_ERR (result);
PQclear (result);
PQfinish (conn);
return GNUNET_SYSERR;
}
PQclear (result);
PQfinish (conn);
return GNUNET_OK;
}
/**
* Initialize Postgres database subsystem.
*
* @param cls a configuration instance
* @return NULL on error, otherwise a `struct TALER_AUDITORDB_Plugin`
*/
void *
libtaler_plugin_auditordb_postgres_init (void *cls)
{
struct GNUNET_CONFIGURATION_Handle *cfg = cls;
struct PostgresClosure *pg;
struct TALER_AUDITORDB_Plugin *plugin;
const char *ec;
pg = GNUNET_new (struct PostgresClosure);
if (0 != pthread_key_create (&pg->db_conn_threadlocal,
&db_conn_destroy))
{
TALER_LOG_ERROR ("Cannnot create pthread key.\n");
GNUNET_free (pg);
return NULL;
}
ec = getenv ("TALER_AUDITORDB_POSTGRES_CONFIG");
if (NULL != ec)
{
pg->connection_cfg_str = GNUNET_strdup (ec);
}
else
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg,
"auditordb-postgres",
"db_conn_str",
&pg->connection_cfg_str))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"auditordb-postgres",
"db_conn_str");
GNUNET_free (pg);
return NULL;
}
}
plugin = GNUNET_new (struct TALER_AUDITORDB_Plugin);
plugin->cls = pg;
plugin->get_session = &postgres_get_session;
plugin->drop_tables = &postgres_drop_tables;
plugin->create_tables = &postgres_create_tables;
plugin->start = &postgres_start;
plugin->commit = &postgres_commit;
plugin->rollback = &postgres_rollback;
plugin->gc = &postgres_gc;
return plugin;
}
/**
* Shutdown Postgres database subsystem.
*
* @param cls a `struct TALER_AUDITORDB_Plugin`
* @return NULL (always)
*/
void *
libtaler_plugin_auditordb_postgres_done (void *cls)
{
struct TALER_AUDITORDB_Plugin *plugin = cls;
struct PostgresClosure *pg = plugin->cls;
GNUNET_free (pg->connection_cfg_str);
GNUNET_free (pg);
GNUNET_free (plugin);
return NULL;
}
/* end of plugin_auditordb_postgres.c */