-add support for event notifications to exchangedb plugin

This commit is contained in:
Christian Grothoff 2021-08-13 22:35:13 +02:00
parent 756998a6d5
commit 41aa1ed41d
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 2601 additions and 2237 deletions

View File

@ -27,7 +27,9 @@
#include "taler_pq_lib.h"
#include "taler_json_lib.h"
#include "taler_exchangedb_plugin.h"
#include <poll.h>
#include <pthread.h>
#include <sys/eventfd.h>
#include <libpq-fe.h>
#include "plugin_exchangedb_common.c"
@ -99,6 +101,11 @@ struct TALER_EXCHANGEDB_Session
*/
const char *transaction_name;
/**
* Did we initialize the prepared statements
* for this session?
*/
bool init;
};
@ -150,6 +157,34 @@ struct PostgresClosure
* Handle for the main() thread of the program.
*/
pthread_t main_self;
/**
* Thread responsible for processing database event
* notifications.
*/
pthread_t event_thread;
/**
* Lock for @e listener_count access.
*/
pthread_mutex_t event_lock;
/**
* Number of registered listerners. @e event_thread
* should terminate if this value reaches 0.
*/
uint64_t listener_count;
/**
* Additional FD to signal the @e event_thread
* (used to stop it).
*/
int event_fd;
/**
* Current Postges socket we watch on for notifications.
*/
int pg_sock;
};
@ -223,50 +258,15 @@ db_conn_destroy (void *cls)
/**
* Get the thread-local database-handle.
* Connect to the db if the connection does not exist yet.
* Initialize prepared statements for @a sess.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @return the database connection, or NULL on error
* @param[in,out] sess session to initialize
* @return #GNUNET_OK on success
*/
static struct TALER_EXCHANGEDB_Session *
postgres_get_session (void *cls)
static enum GNUNET_GenericReturnValue
init_session (struct TALER_EXCHANGEDB_Session *sess)
{
struct PostgresClosure *pc = cls;
struct GNUNET_PQ_Context *db_conn;
struct TALER_EXCHANGEDB_Session *session;
if (pthread_equal (pc->main_self,
pthread_self ()))
session = pc->main_session;
else
session = pthread_getspecific (pc->db_conn_threadlocal);
if (NULL != session)
{
if (NULL == session->transaction_name)
GNUNET_PQ_reconnect_if_down (session->conn);
return session;
}
{
#if AUTO_EXPLAIN
/* Enable verbose logging to see where queries do not
properly use indices */
struct GNUNET_PQ_ExecuteStatement es[] = {
GNUNET_PQ_make_try_execute ("LOAD 'auto_explain';"),
GNUNET_PQ_make_try_execute ("SET auto_explain.log_min_duration=50;"),
GNUNET_PQ_make_try_execute ("SET auto_explain.log_timing=TRUE;"),
GNUNET_PQ_make_try_execute ("SET auto_explain.log_analyze=TRUE;"),
/* https://wiki.postgresql.org/wiki/Serializable suggests to really
force the default to 'serializable' if SSI is to be used. */
GNUNET_PQ_make_try_execute (
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;"),
GNUNET_PQ_make_try_execute ("SET enable_sort=OFF;"),
GNUNET_PQ_make_try_execute ("SET enable_seqscan=OFF;"),
GNUNET_PQ_EXECUTE_STATEMENT_END
};
#else
struct GNUNET_PQ_ExecuteStatement *es = NULL;
#endif
enum GNUNET_GenericReturnValue ret;
struct GNUNET_PQ_PreparedStatement ps[] = {
/* Used in #postgres_insert_denomination_info() and
#postgres_add_denomination_key() */
@ -2501,16 +2501,81 @@ postgres_get_session (void *cls)
GNUNET_PQ_PREPARED_STATEMENT_END
};
ret = GNUNET_PQ_prepare_statements (sess->conn,
ps);
if (GNUNET_OK != ret)
return ret;
sess->init = true;
return GNUNET_OK;
}
/**
* Get the thread-local database-handle.
* Connect to the db if the connection does not exist yet.
*
* @param pc the plugin-specific state
* @param skip_prepare true if we should skip prepared statement setup
* @return the database connection, or NULL on error
*/
static struct TALER_EXCHANGEDB_Session *
internal_get_session (struct PostgresClosure *pc,
bool skip_prepare)
{
struct GNUNET_PQ_Context *db_conn;
struct TALER_EXCHANGEDB_Session *session;
if (pthread_equal (pc->main_self,
pthread_self ()))
session = pc->main_session;
else
session = pthread_getspecific (pc->db_conn_threadlocal);
if (NULL != session)
{
if (NULL == session->transaction_name)
GNUNET_PQ_reconnect_if_down (session->conn);
return session;
}
{
#if AUTO_EXPLAIN
/* Enable verbose logging to see where queries do not
properly use indices */
struct GNUNET_PQ_ExecuteStatement es[] = {
GNUNET_PQ_make_try_execute ("LOAD 'auto_explain';"),
GNUNET_PQ_make_try_execute ("SET auto_explain.log_min_duration=50;"),
GNUNET_PQ_make_try_execute ("SET auto_explain.log_timing=TRUE;"),
GNUNET_PQ_make_try_execute ("SET auto_explain.log_analyze=TRUE;"),
/* https://wiki.postgresql.org/wiki/Serializable suggests to really
force the default to 'serializable' if SSI is to be used. */
GNUNET_PQ_make_try_execute (
"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;"),
GNUNET_PQ_make_try_execute ("SET enable_sort=OFF;"),
GNUNET_PQ_make_try_execute ("SET enable_seqscan=OFF;"),
GNUNET_PQ_EXECUTE_STATEMENT_END
};
#else
struct GNUNET_PQ_ExecuteStatement *es = NULL;
#endif
db_conn = GNUNET_PQ_connect_with_cfg (pc->cfg,
"exchangedb-postgres",
NULL,
es,
ps);
NULL);
}
if (NULL == db_conn)
return NULL;
session = GNUNET_new (struct TALER_EXCHANGEDB_Session);
session->conn = db_conn;
if ( (! skip_prepare) &&
(GNUNET_OK !=
init_session (session)) )
{
GNUNET_break (0);
GNUNET_PQ_disconnect (db_conn);
GNUNET_free (session);
return NULL;
}
if (pthread_equal (pc->main_self,
pthread_self ()))
{
@ -2531,6 +2596,34 @@ postgres_get_session (void *cls)
}
/**
* Get the thread-local database-handle.
* Connect to the db if the connection does not exist yet.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @return the database connection, or NULL on error
*/
static struct TALER_EXCHANGEDB_Session *
postgres_get_session (void *cls)
{
struct PostgresClosure *pc = cls;
struct TALER_EXCHANGEDB_Session *sess;
sess = internal_get_session (pc,
false);
if (! sess->init)
{
if (GNUNET_OK !=
init_session (sess))
{
GNUNET_break (0);
return NULL;
}
}
return sess;
}
/**
* Do a pre-flight check that we are not in an uncommitted transaction.
* If we are, try to commit the previous transaction and output a warning.
@ -2719,6 +2812,194 @@ postgres_preflight (void *cls,
}
/**
* Main function of the thread that processes events.
*
* @param cls a `struct PostgresClosure *`
*/
static void *
handle_events (void *cls)
{
struct PostgresClosure *pg = cls;
struct pollfd pfds[] = {
{
.fd = pg->event_fd,
.events = POLLIN
},
{
.fd = pg->pg_sock,
.events = POLLIN
}
};
nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2;
GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock));
while (0 != pg->listener_count)
{
int ret;
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
ret = poll (pfds,
nfds,
-1 /* no timeout */);
if (-1 == ret)
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"poll");
for (int i = 0; i<ret; i++)
{
if ( (pg->event_fd == pfds[i].fd) &&
(0 != (POLLIN & pfds[i].revents)) )
{
/* consume signal */
uint64_t val;
GNUNET_break (sizeof (uint64_t) ==
read (pg->event_fd,
&val,
sizeof (val)));
}
if ( (pg->pg_sock == pfds[i].fd) &&
(0 != (POLLIN & pfds[i].revents)) )
{
GNUNET_assert (NULL != pg->main_session);
GNUNET_PQ_event_do_poll (pg->main_session->conn);
}
}
GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock));
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
return NULL;
}
/**
* Function called whenever the socket needed for
* notifications from postgres changes.
*
* @param cls closure
* @param fd socket to listen on, -1 for none
*/
static void
pq_socket_cb (void *cls,
int fd)
{
struct PostgresClosure *pg = cls;
uint64_t val = 1;
pg->pg_sock = fd;
GNUNET_break (sizeof (uint64_t) ==
write (pg->event_fd,
&val,
sizeof (val)));
}
/**
* Register callback to be invoked on events of type @a es.
*
* @param cls database context to use
* @param session connection to use
* @param es specification of the event to listen for
* @param cb function to call when the event happens, possibly
* multiple times (until cancel is invoked)
* @param cb_cls closure for @a cb
* @return handle useful to cancel the listener
*/
static struct GNUNET_DB_EventHandler *
postgres_event_listen (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct GNUNET_DB_EventHeaderP *es,
GNUNET_DB_EventCallback cb,
void *cb_cls)
{
struct PostgresClosure *pg = cls;
struct GNUNET_DB_EventHandler *eh;
GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock));
pg->listener_count++;
if (1 == pg->listener_count)
{
GNUNET_assert (0 ==
pthread_create (&pg->event_thread,
NULL,
&handle_events,
pg));
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
eh = GNUNET_PQ_event_listen (session->conn,
es,
cb,
cb_cls);
GNUNET_assert (NULL != eh);
return eh;
}
/**
* Stop notifications.
*
* @param eh handle to unregister.
*/
static void
postgres_event_listen_cancel (void *cls,
struct GNUNET_DB_EventHandler *eh)
{
struct PostgresClosure *pg = cls;
GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock));
pg->listener_count--;
if (0 == pg->listener_count)
{
uint64_t val = 1;
void *ret;
GNUNET_break (sizeof (uint64_t) ==
write (pg->event_fd,
&val,
sizeof (val)));
GNUNET_break (0 ==
pthread_join (pg->event_thread,
&ret));
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
GNUNET_PQ_event_listen_cancel (eh);
}
/**
* Notify all that listen on @a es of an event.
*
* @param cls database context to use
* @param session connection to use
* @param es specification of the event to generate
* @param extra additional event data provided
* @param extra_size number of bytes in @a extra
*/
static void
postgres_event_notify (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct GNUNET_DB_EventHeaderP *es,
const void *extra,
size_t extra_size)
{
struct PostgresClosure *pg = cls;
(void) pg;
return GNUNET_PQ_event_notify (session->conn,
es,
extra,
extra_size);
}
/**
* Insert a denomination key's public information into the database for
* reference by auditors and other consistency checks.
@ -10682,6 +10963,36 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
GNUNET_free (pg);
return NULL;
}
{
struct TALER_EXCHANGEDB_Session *session;
session = internal_get_session (pg,
true);
if (NULL == session)
{
GNUNET_free (pg->currency);
GNUNET_free (pg->sql_dir);
GNUNET_free (pg);
return NULL;
}
pg->event_fd = eventfd (0, 0);
if (-1 == pg->event_fd)
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
"eventfd");
GNUNET_free (pg->currency);
GNUNET_free (pg->sql_dir);
GNUNET_free (pg);
return NULL;
}
GNUNET_assert (0 ==
pthread_mutex_init (&pg->event_lock,
NULL));
GNUNET_PQ_event_set_socket_callback (session->conn,
&pq_socket_cb,
pg);
}
plugin = GNUNET_new (struct TALER_EXCHANGEDB_Plugin);
plugin->cls = pg;
plugin->get_session = &postgres_get_session;
@ -10692,6 +11003,9 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->commit = &postgres_commit;
plugin->preflight = &postgres_preflight;
plugin->rollback = &postgres_rollback;
plugin->event_listen = &postgres_event_listen;
plugin->event_listen_cancel = &postgres_event_listen_cancel;
plugin->event_notify = &postgres_event_notify;
plugin->insert_denomination_info = &postgres_insert_denomination_info;
plugin->get_denomination_info = &postgres_get_denomination_info;
plugin->iterate_denomination_info = &postgres_iterate_denomination_info;
@ -10845,7 +11159,11 @@ libtaler_plugin_exchangedb_postgres_done (void *cls)
/* If we launched a session for the main thread,
kill it here before we unload */
GNUNET_assert (0 == pg->listener_count);
db_conn_destroy (pg->main_session);
GNUNET_break (0 ==
close (pg->event_fd));
pthread_mutex_destroy (&pg->event_lock);
GNUNET_free (pg->sql_dir);
GNUNET_free (pg->currency);
GNUNET_free (pg);

View File

@ -2145,6 +2145,52 @@ struct TALER_EXCHANGEDB_Plugin
struct TALER_EXCHANGEDB_Session *session);
/**
* Register callback to be invoked on events of type @a es.
*
* @param cls database context to use
* @param session connection to use
* @param es specification of the event to listen for
* @param cb function to call when the event happens, possibly
* multiple times (until cancel is invoked)
* @param cb_cls closure for @a cb
* @return handle useful to cancel the listener
*/
struct GNUNET_DB_EventHandler *
(*event_listen)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct GNUNET_DB_EventHeaderP *es,
GNUNET_DB_EventCallback cb,
void *cb_cls);
/**
* Stop notifications.
*
* @param cls database context to use
* @param eh handle to unregister.
*/
void
(*event_listen_cancel)(void *cls,
struct GNUNET_DB_EventHandler *eh);
/**
* Notify all that listen on @a es of an event.
*
* @param cls database context to use
* @param session connection to use
* @param es specification of the event to generate
* @param extra additional event data provided
* @param extra_size number of bytes in @a extra
*/
void
(*event_notify)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
const struct GNUNET_DB_EventHeaderP *es,
const void *extra,
size_t extra_size);
/**
* Insert information about a denomination key and in particular
* the properties (value, fees, expiration times) the coins signed