From 02716c4084c76630f35a5fcc4d2ef4e17d7e1b00 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 16 May 2022 15:43:40 +0200 Subject: [PATCH] -add skeleton logic for purse expiration --- src/exchange/.gitignore | 1 + src/exchange/Makefile.am | 14 + src/exchange/exchange.conf | 7 + src/exchange/taler-exchange-expire.c | 479 ++++++++++++++++++++ src/exchange/taler-exchange-router.c | 3 +- src/exchangedb/plugin_exchangedb_postgres.c | 21 + src/include/taler_exchangedb_plugin.h | 15 + 7 files changed, 538 insertions(+), 2 deletions(-) create mode 100644 src/exchange/taler-exchange-expire.c diff --git a/src/exchange/.gitignore b/src/exchange/.gitignore index c12ee0117..bcfdb7e82 100644 --- a/src/exchange/.gitignore +++ b/src/exchange/.gitignore @@ -10,3 +10,4 @@ test_taler_exchange_httpd_home/.config/taler/account-1.json taler-exchange-closer taler-exchange-transfer taler-exchange-router +taler-exchange-expire diff --git a/src/exchange/Makefile.am b/src/exchange/Makefile.am index 21cc12282..24fb7e3d7 100644 --- a/src/exchange/Makefile.am +++ b/src/exchange/Makefile.am @@ -19,6 +19,7 @@ pkgcfg_DATA = \ bin_PROGRAMS = \ taler-exchange-aggregator \ taler-exchange-closer \ + taler-exchange-expire \ taler-exchange-httpd \ taler-exchange-router \ taler-exchange-transfer \ @@ -51,6 +52,19 @@ taler_exchange_closer_LDADD = \ -lgnunetutil \ $(XLIB) +taler_exchange_expire_SOURCES = \ + taler-exchange-expire.c +taler_exchange_expire_LDADD = \ + $(LIBGCRYPT_LIBS) \ + $(top_builddir)/src/json/libtalerjson.la \ + $(top_builddir)/src/util/libtalerutil.la \ + $(top_builddir)/src/bank-lib/libtalerbank.la \ + $(top_builddir)/src/exchangedb/libtalerexchangedb.la \ + -ljansson \ + -lgnunetcurl \ + -lgnunetutil \ + $(XLIB) + taler_exchange_router_SOURCES = \ taler-exchange-router.c taler_exchange_router_LDADD = \ diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf index 9c68208ac..df136d9eb 100644 --- a/src/exchange/exchange.conf +++ b/src/exchange/exchange.conf @@ -47,8 +47,15 @@ BASE_URL = http://localhost:8081/ # How long should the aggregator sleep if it has nothing to do? AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s +# FIXME: document! ROUTER_IDLE_SLEEP_INTERVAL = 60 s +# How big is an individual shard to be processed +# by taler-exchange-expire (in time). It may take +# this much time for an expired purse to be really +# cleaned up and the coins refunded. +EXPIRE_SHARD_SIZE = 5 m + # How long should the transfer tool # sleep if it has nothing to do? TRANSFER_IDLE_SLEEP_INTERVAL = 60 s diff --git a/src/exchange/taler-exchange-expire.c b/src/exchange/taler-exchange-expire.c new file mode 100644 index 000000000..c7691930b --- /dev/null +++ b/src/exchange/taler-exchange-expire.c @@ -0,0 +1,479 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see +*/ + +/** + * @file taler-exchange-expire.c + * @brief Process that cleans up expired purses + * @author Christian Grothoff + */ +#include "platform.h" +#include +#include +#include +#include "taler_exchangedb_lib.h" +#include "taler_exchangedb_plugin.h" +#include "taler_json_lib.h" +#include "taler_bank_service.h" + + +/** + * Work shard we are processing. + */ +struct Shard +{ + + /** + * When did we start processing the shard? + */ + struct GNUNET_TIME_Timestamp start_time; + + /** + * Starting row of the shard. + */ + struct GNUNET_TIME_Absolute shard_start; + + /** + * Inclusive end row of the shard. + */ + struct GNUNET_TIME_Absolute shard_end; + + /** + * Number of starting points found in the shard. + */ + uint64_t work_counter; + +}; + + +/** + * The exchange's configuration. + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Our database plugin. + */ +static struct TALER_EXCHANGEDB_Plugin *db_plugin; + +/** + * Next task to run, if any. + */ +static struct GNUNET_SCHEDULER_Task *task; + +/** + * How long should we sleep when idle before trying to find more work? + */ +static struct GNUNET_TIME_Relative expire_idle_sleep_interval; + +/** + * How big are the shards we are processing? Is an inclusive offset, so every + * shard ranges from [X,X+shard_size) exclusive. So a shard covers + * shard_size slots. + */ +static struct GNUNET_TIME_Relative shard_size; + +/** + * Value to return from main(). 0 on success, non-zero on errors. + */ +static int global_ret; + +/** + * #GNUNET_YES if we are in test mode and should exit when idle. + */ +static int test_mode; + + +/** + * Select a shard to work on. + * + * @param cls NULL + */ +static void +run_shard (void *cls); + + +/** + * We're being aborted with CTRL-C (or SIGTERM). Shut down. + * + * @param cls closure + */ +static void +shutdown_task (void *cls) +{ + (void) cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Running shutdown\n"); + if (NULL != task) + { + GNUNET_SCHEDULER_cancel (task); + task = NULL; + } + TALER_EXCHANGEDB_plugin_unload (db_plugin); + db_plugin = NULL; + cfg = NULL; +} + + +/** + * Parse the configuration for expire. + * + * @return #GNUNET_OK on success + */ +static enum GNUNET_GenericReturnValue +parse_expire_config (void) +{ + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_time (cfg, + "exchange", + "EXPIRE_IDLE_SLEEP_INTERVAL", + &expire_idle_sleep_interval)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "exchange", + "EXPIRE_IDLE_SLEEP_INTERVAL"); + return GNUNET_SYSERR; + } + if (NULL == + (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to initialize DB subsystem\n"); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Perform a database commit. If it fails, print a warning. + * + * @return status of commit + */ +static enum GNUNET_DB_QueryStatus +commit_or_warn (void) +{ + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->commit (db_plugin->cls); + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + return qs; + GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) + ? GNUNET_ERROR_TYPE_INFO + : GNUNET_ERROR_TYPE_ERROR, + "Failed to commit database transaction!\n"); + return qs; +} + + +/** + * Release lock on shard @a s in the database. + * On error, terminates this process. + * + * @param[in] s shard to free (and memory to release) + */ +static void +release_shard (struct Shard *s) +{ + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->complete_shard ( + db_plugin->cls, + "expire", + s->shard_start.abs_value_us, + s->shard_end.abs_value_us); + GNUNET_free (s); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Strange, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Purse expiration shard completed with %llu purses\n", + (unsigned long long) s->work_counter); + /* normal case */ + break; + } +} + + +/** + * Release lock on shard @a s in the database due to an abort of the + * operation. On error, terminates this process. + * + * @param[in] s shard to free (and memory to release) + */ +static void +abort_shard (struct Shard *s) +{ + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->abort_shard (db_plugin->cls, + "expire", + s->shard_start.abs_value_us, + s->shard_end.abs_value_us); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to abort shard (%d)!\n", + qs); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } +} + + +/** + * Main function that processes the work in one shard. + * + * @param[in] cls a `struct Shard` to process + */ +static void +run_expire (void *cls) +{ + struct Shard *s = cls; + enum GNUNET_DB_QueryStatus qs; + + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for expired purses\n"); + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (db_plugin->start (db_plugin->cls, + "expire-purse")) + { + global_ret = EXIT_FAILURE; + db_plugin->rollback (db_plugin->cls); + abort_shard (s); + GNUNET_SCHEDULER_shutdown (); + return; + } + qs = db_plugin->expire_purse (db_plugin->cls, + s->shard_start, + s->shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + db_plugin->rollback (db_plugin->cls); + abort_shard (s); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + db_plugin->rollback (db_plugin->cls); + abort_shard (s); + task = GNUNET_SCHEDULER_add_now (&run_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + if (0 > commit_or_warn ()) + { + db_plugin->rollback (db_plugin->cls); + abort_shard (s); + } + else + { + release_shard (s); + } + task = GNUNET_SCHEDULER_add_now (&run_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* commit, and go again immediately */ + s->work_counter++; + (void) commit_or_warn (); + task = GNUNET_SCHEDULER_add_now (&run_expire, + s); + } +} + + +/** + * Select a shard to work on. + * + * @param cls NULL + */ +static void +run_shard (void *cls) +{ + struct Shard *s; + enum GNUNET_DB_QueryStatus qs; + + (void) cls; + task = NULL; + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + s = GNUNET_new (struct Shard); + s->start_time = GNUNET_TIME_timestamp_get (); + qs = db_plugin->begin_shard (db_plugin->cls, + "expire", + shard_size, + shard_size.rel_value_us, + &s->shard_start.abs_value_us, + &s->shard_end.abs_value_us); + if (0 >= qs) + { + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + static struct GNUNET_TIME_Relative delay; + + GNUNET_free (s); + delay = GNUNET_TIME_randomized_backoff (delay, + GNUNET_TIME_UNIT_SECONDS); + task = GNUNET_SCHEDULER_add_delayed (delay, + &run_shard, + NULL); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin shard (%d)!\n", + qs); + GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_TIME_absolute_is_future (s->shard_end)) + { + task = GNUNET_SCHEDULER_add_at (s->shard_end, + &run_shard, + NULL); + abort_shard (s); + return; + } + /* If this is a first-time run, we immediately + try to catch up with the present */ + if (GNUNET_TIME_absolute_is_zero (s->shard_start)) + s->shard_end = GNUNET_TIME_absolute_get (); + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting shard [%llu:%llu]!\n", + (unsigned long long) s->shard_start.abs_value_us, + (unsigned long long) s->shard_end.abs_value_us); + task = GNUNET_SCHEDULER_add_now (&run_expire, + s); +} + + +/** + * First task. + * + * @param cls closure, NULL + * @param args remaining command-line arguments + * @param cfgfile name of the configuration file used (for saving, can be NULL!) + * @param c configuration + */ +static void +run (void *cls, + char *const *args, + const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *c) +{ + (void) cls; + (void) args; + (void) cfgfile; + + cfg = c; + if (GNUNET_OK != parse_expire_config ()) + { + cfg = NULL; + global_ret = EXIT_NOTCONFIGURED; + return; + } + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_time (cfg, + "exchange", + "EXPIRE_SHARD_SIZE", + &shard_size)) + { + cfg = NULL; + global_ret = EXIT_NOTCONFIGURED; + return; + } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_shard, + NULL); + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + cls); +} + + +/** + * The main function of the taler-exchange-expire. + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, non-zero on error, see #global_ret + */ +int +main (int argc, + char *const *argv) +{ + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_timetravel ('T', + "timetravel"), + GNUNET_GETOPT_option_flag ('t', + "test", + "run in test mode and exit when idle", + &test_mode), + GNUNET_GETOPT_OPTION_END + }; + enum GNUNET_GenericReturnValue ret; + + if (GNUNET_OK != + GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) + return EXIT_INVALIDARGUMENT; + TALER_OS_init (); + ret = GNUNET_PROGRAM_run ( + argc, argv, + "taler-exchange-expire", + gettext_noop ( + "background process that expires purses"), + options, + &run, NULL); + GNUNET_free_nz ((void *) argv); + if (GNUNET_SYSERR == ret) + return EXIT_INVALIDARGUMENT; + if (GNUNET_NO == ret) + return EXIT_SUCCESS; + return global_ret; +} + + +/* end of taler-exchange-expire.c */ diff --git a/src/exchange/taler-exchange-router.c b/src/exchange/taler-exchange-router.c index ca4499e3e..0816dfdba 100644 --- a/src/exchange/taler-exchange-router.c +++ b/src/exchange/taler-exchange-router.c @@ -17,8 +17,7 @@ /** * @file taler-exchange-router.c * @brief Process that routes P2P payments. Responsible for - * (1) refunding coins in unmerged purses, (2) merging purses into local reserves; - * (3) aggregating remote payments into the respective wad transfers. + * aggregating remote payments into the respective wad transfers. * Execution of actual wad transfers is still to be done by taler-exchange-transfer, * and watching for incoming wad transfers is done by taler-exchange-wirewatch. * @author Christian Grothoff diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 4d5efb9c8..ab282f4ff 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -13554,6 +13554,25 @@ postgres_insert_purse_request ( } +/** + * Function called to clean up one expired purse. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param start_time select purse expired after this time + * @param end_time select purse expired before this time + * @return transaction status code (#GNUNET_DB_STATUS_SUCCESS_NO_RESULTS if no purse expired in the given time interval). + */ +static enum GNUNET_DB_QueryStatus +postgres_expire_purse ( + void *cls, + struct GNUNET_TIME_Absolute start_time, + struct GNUNET_TIME_Absolute end_time) +{ + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; +} + + /** * Function called to obtain information about a purse. * @@ -14283,6 +14302,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) = &postgres_insert_purse_request; plugin->select_purse_request = &postgres_select_purse_request; + plugin->expire_purse + = &postgres_expire_purse; plugin->select_purse = &postgres_select_purse; plugin->select_purse_by_merge_pub diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 52e684f62..213fe114d 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -4603,6 +4603,21 @@ struct TALER_EXCHANGEDB_Plugin bool *in_conflict); + /** + * Function called to clean up one expired purse. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param start_time select purse expired after this time + * @param end_time select purse expired before this time + * @return transaction status code (#GNUNET_DB_STATUS_SUCCESS_NO_RESULTS if no purse expired in the given time interval). + */ + enum GNUNET_DB_QueryStatus + (*expire_purse)( + void *cls, + struct GNUNET_TIME_Absolute start_time, + struct GNUNET_TIME_Absolute end_time); + + /** * Function called to obtain information about a purse. *