-payment router skeleton
This commit is contained in:
parent
31e554ad46
commit
65943dcb13
1
src/exchange/.gitignore
vendored
1
src/exchange/.gitignore
vendored
@ -9,3 +9,4 @@ test_taler_exchange_wirewatch-postgres
|
||||
test_taler_exchange_httpd_home/.config/taler/account-1.json
|
||||
taler-exchange-closer
|
||||
taler-exchange-transfer
|
||||
taler-exchange-router
|
||||
|
@ -20,6 +20,7 @@ bin_PROGRAMS = \
|
||||
taler-exchange-aggregator \
|
||||
taler-exchange-closer \
|
||||
taler-exchange-httpd \
|
||||
taler-exchange-router \
|
||||
taler-exchange-transfer \
|
||||
taler-exchange-wirewatch
|
||||
|
||||
@ -50,9 +51,9 @@ taler_exchange_closer_LDADD = \
|
||||
-lgnunetutil \
|
||||
$(XLIB)
|
||||
|
||||
taler_exchange_wirewatch_SOURCES = \
|
||||
taler-exchange-wirewatch.c
|
||||
taler_exchange_wirewatch_LDADD = \
|
||||
taler_exchange_router_SOURCES = \
|
||||
taler-exchange-router.c
|
||||
taler_exchange_router_LDADD = \
|
||||
$(LIBGCRYPT_LIBS) \
|
||||
$(top_builddir)/src/json/libtalerjson.la \
|
||||
$(top_builddir)/src/util/libtalerutil.la \
|
||||
@ -76,6 +77,20 @@ taler_exchange_transfer_LDADD = \
|
||||
-lgnunetutil \
|
||||
$(XLIB)
|
||||
|
||||
taler_exchange_wirewatch_SOURCES = \
|
||||
taler-exchange-wirewatch.c
|
||||
taler_exchange_wirewatch_LDADD = \
|
||||
$(LIBGCRYPT_LIBS) \
|
||||
$(top_builddir)/src/json/libtalerjson.la \
|
||||
$(top_builddir)/src/util/libtalerutil.la \
|
||||
$(top_builddir)/src/bank-lib/libtalerbank.la \
|
||||
$(top_builddir)/src/exchangedb/libtalerexchangedb.la \
|
||||
-ljansson \
|
||||
-lgnunetcurl \
|
||||
-lgnunetutil \
|
||||
$(XLIB)
|
||||
|
||||
|
||||
taler_exchange_httpd_SOURCES = \
|
||||
taler-exchange-httpd.c taler-exchange-httpd.h \
|
||||
taler-exchange-httpd_auditors.c taler-exchange-httpd_auditors.h \
|
||||
|
@ -47,6 +47,8 @@ BASE_URL = http://localhost:8081/
|
||||
# How long should the aggregator sleep if it has nothing to do?
|
||||
AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
|
||||
|
||||
ROUTER_IDLE_SLEEP_INTERVAL = 60 s
|
||||
|
||||
# How long should the transfer tool
|
||||
# sleep if it has nothing to do?
|
||||
TRANSFER_IDLE_SLEEP_INTERVAL = 60 s
|
||||
@ -66,6 +68,17 @@ CLOSER_IDLE_SLEEP_INTERVAL = 60 s
|
||||
# aggregation logic will break badly!
|
||||
AGGREGATOR_SHARD_SIZE = 2147483648
|
||||
|
||||
# Values of 0 or above 2^31 disable sharding, which
|
||||
# is a sane default for most use-cases.
|
||||
# When changing this value, you MUST stop all
|
||||
# aggregators and manually run
|
||||
#
|
||||
# $ taler-exchange-dbinit -s
|
||||
#
|
||||
# against the exchange's database. Otherwise, the
|
||||
# aggregation logic will break badly!
|
||||
ROUTER_SHARD_SIZE = 2147483648
|
||||
|
||||
# How long should wirewatch sleep if it has nothing to do?
|
||||
# (Set very aggressively here for the demonstrators to be
|
||||
# super fast.)
|
||||
|
448
src/exchange/taler-exchange-router.c
Normal file
448
src/exchange/taler-exchange-router.c
Normal file
@ -0,0 +1,448 @@
|
||||
/*
|
||||
This file is part of TALER
|
||||
Copyright (C) 2022 Taler Systems SA
|
||||
|
||||
TALER is free software; you can redistribute it and/or modify it under the
|
||||
terms of the GNU Affero General Public License as published by the Free Software
|
||||
Foundation; either version 3, or (at your option) any later version.
|
||||
|
||||
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License along with
|
||||
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file taler-exchange-router.c
|
||||
* @brief Process that routes P2P payments. Responsible for
|
||||
* (1) refunding coins in unmerged purses, (2) merging purses into local reserves;
|
||||
* (3) aggregating remote payments into the respective wad transfers.
|
||||
* Execution of actual wad transfers is still to be done by taler-exchange-transfer,
|
||||
* and watching for incoming wad transfers is done by taler-exchange-wirewatch.
|
||||
* @author Christian Grothoff
|
||||
*/
|
||||
#include "platform.h"
|
||||
#include <gnunet/gnunet_util_lib.h>
|
||||
#include <jansson.h>
|
||||
#include <pthread.h>
|
||||
#include "taler_exchangedb_lib.h"
|
||||
#include "taler_exchangedb_plugin.h"
|
||||
#include "taler_json_lib.h"
|
||||
#include "taler_bank_service.h"
|
||||
|
||||
|
||||
/**
|
||||
* Work shard we are processing.
|
||||
*/
|
||||
struct Shard
|
||||
{
|
||||
|
||||
/**
|
||||
* When did we start processing the shard?
|
||||
*/
|
||||
struct GNUNET_TIME_Timestamp start_time;
|
||||
|
||||
/**
|
||||
* Starting row of the shard.
|
||||
*/
|
||||
uint32_t shard_start;
|
||||
|
||||
/**
|
||||
* Inclusive end row of the shard.
|
||||
*/
|
||||
uint32_t shard_end;
|
||||
|
||||
/**
|
||||
* Number of starting points found in the shard.
|
||||
*/
|
||||
uint64_t work_counter;
|
||||
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* What is the smallest unit we support for wire transfers?
|
||||
* We will need to round down to a multiple of this amount.
|
||||
*/
|
||||
static struct TALER_Amount currency_round_unit;
|
||||
|
||||
/**
|
||||
* What is the base URL of this exchange? Used in the
|
||||
* wire transfer subjects so that merchants and governments
|
||||
* can ask for the list of aggregated deposits.
|
||||
*/
|
||||
static char *exchange_base_url;
|
||||
|
||||
/**
|
||||
* Set to #GNUNET_YES if this exchange does not support KYC checks
|
||||
* and thus P2P transfers are to be made regardless of the
|
||||
* KYC status of the target reserve.
|
||||
*/
|
||||
static int kyc_off;
|
||||
|
||||
/**
|
||||
* The exchange's configuration.
|
||||
*/
|
||||
static const struct GNUNET_CONFIGURATION_Handle *cfg;
|
||||
|
||||
/**
|
||||
* Our database plugin.
|
||||
*/
|
||||
static struct TALER_EXCHANGEDB_Plugin *db_plugin;
|
||||
|
||||
/**
|
||||
* Next task to run, if any.
|
||||
*/
|
||||
static struct GNUNET_SCHEDULER_Task *task;
|
||||
|
||||
/**
|
||||
* How long should we sleep when idle before trying to find more work?
|
||||
*/
|
||||
static struct GNUNET_TIME_Relative router_idle_sleep_interval;
|
||||
|
||||
/**
|
||||
* How big are the shards we are processing? Is an inclusive offset, so every
|
||||
* shard ranges from [X,X+shard_size) exclusive. So a shard covers
|
||||
* shard_size slots. The maximum value for shard_size is INT32_MAX+1.
|
||||
*/
|
||||
static uint32_t shard_size;
|
||||
|
||||
/**
|
||||
* Value to return from main(). 0 on success, non-zero on errors.
|
||||
*/
|
||||
static int global_ret;
|
||||
|
||||
/**
|
||||
* #GNUNET_YES if we are in test mode and should exit when idle.
|
||||
*/
|
||||
static int test_mode;
|
||||
|
||||
|
||||
/**
|
||||
* Select a shard to work on.
|
||||
*
|
||||
* @param cls NULL
|
||||
*/
|
||||
static void
|
||||
run_shard (void *cls);
|
||||
|
||||
|
||||
/**
|
||||
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
|
||||
*
|
||||
* @param cls closure
|
||||
*/
|
||||
static void
|
||||
shutdown_task (void *cls)
|
||||
{
|
||||
(void) cls;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Running shutdown\n");
|
||||
if (NULL != task)
|
||||
{
|
||||
GNUNET_SCHEDULER_cancel (task);
|
||||
task = NULL;
|
||||
}
|
||||
TALER_EXCHANGEDB_plugin_unload (db_plugin);
|
||||
db_plugin = NULL;
|
||||
TALER_EXCHANGEDB_unload_accounts ();
|
||||
cfg = NULL;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Parse the configuration for wirewatch.
|
||||
*
|
||||
* @return #GNUNET_OK on success
|
||||
*/
|
||||
static enum GNUNET_GenericReturnValue
|
||||
parse_wirewatch_config (void)
|
||||
{
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_CONFIGURATION_get_value_string (cfg,
|
||||
"exchange",
|
||||
"BASE_URL",
|
||||
&exchange_base_url))
|
||||
{
|
||||
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
||||
"exchange",
|
||||
"BASE_URL");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_CONFIGURATION_get_value_time (cfg,
|
||||
"exchange",
|
||||
"ROUTER_IDLE_SLEEP_INTERVAL",
|
||||
&router_idle_sleep_interval))
|
||||
{
|
||||
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
||||
"exchange",
|
||||
"ROUTER_IDLE_SLEEP_INTERVAL");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
if ( (GNUNET_OK !=
|
||||
TALER_config_get_amount (cfg,
|
||||
"taler",
|
||||
"CURRENCY_ROUND_UNIT",
|
||||
¤cy_round_unit)) ||
|
||||
( (0 != currency_round_unit.fraction) &&
|
||||
(0 != currency_round_unit.value) ) )
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
|
||||
if (NULL ==
|
||||
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to initialize DB subsystem\n");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
if (GNUNET_OK !=
|
||||
TALER_EXCHANGEDB_load_accounts (cfg,
|
||||
TALER_EXCHANGEDB_ALO_DEBIT))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"No wire accounts configured for debit!\n");
|
||||
TALER_EXCHANGEDB_plugin_unload (db_plugin);
|
||||
db_plugin = NULL;
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
return GNUNET_OK;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Perform a database commit. If it fails, print a warning.
|
||||
*
|
||||
* @return status of commit
|
||||
*/
|
||||
static enum GNUNET_DB_QueryStatus
|
||||
commit_or_warn (void)
|
||||
{
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
|
||||
qs = db_plugin->commit (db_plugin->cls);
|
||||
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
|
||||
return qs;
|
||||
GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
|
||||
? GNUNET_ERROR_TYPE_INFO
|
||||
: GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to commit database transaction!\n");
|
||||
return qs;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Release lock on shard @a s in the database.
|
||||
* On error, terminates this process.
|
||||
*
|
||||
* @param[in] s shard to free (and memory to release)
|
||||
*/
|
||||
static void
|
||||
release_shard (struct Shard *s)
|
||||
{
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
|
||||
qs = db_plugin->release_revolving_shard (
|
||||
db_plugin->cls,
|
||||
"router",
|
||||
s->shard_start,
|
||||
s->shard_end);
|
||||
GNUNET_free (s);
|
||||
switch (qs)
|
||||
{
|
||||
case GNUNET_DB_STATUS_HARD_ERROR:
|
||||
case GNUNET_DB_STATUS_SOFT_ERROR:
|
||||
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
|
||||
GNUNET_break (0);
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
|
||||
/* Strange, but let's just continue */
|
||||
break;
|
||||
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
|
||||
/* normal case */
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
run_routing (void *cls)
|
||||
{
|
||||
struct Shard *s = cls;
|
||||
|
||||
task = NULL;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Checking for ready P2P transfers to route\n");
|
||||
// FIXME: do actual work here!
|
||||
commit_or_warn ();
|
||||
release_shard (s);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_shard,
|
||||
NULL);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Select a shard to work on.
|
||||
*
|
||||
* @param cls NULL
|
||||
*/
|
||||
static void
|
||||
run_shard (void *cls)
|
||||
{
|
||||
struct Shard *s;
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
|
||||
(void) cls;
|
||||
task = NULL;
|
||||
if (GNUNET_SYSERR ==
|
||||
db_plugin->preflight (db_plugin->cls))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to obtain database connection!\n");
|
||||
global_ret = EXIT_FAILURE;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
}
|
||||
s = GNUNET_new (struct Shard);
|
||||
s->start_time = GNUNET_TIME_timestamp_get ();
|
||||
qs = db_plugin->begin_revolving_shard (db_plugin->cls,
|
||||
"router",
|
||||
shard_size,
|
||||
1U + INT32_MAX,
|
||||
&s->shard_start,
|
||||
&s->shard_end);
|
||||
if (0 >= qs)
|
||||
{
|
||||
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
|
||||
{
|
||||
static struct GNUNET_TIME_Relative delay;
|
||||
|
||||
GNUNET_free (s);
|
||||
delay = GNUNET_TIME_randomized_backoff (delay,
|
||||
GNUNET_TIME_UNIT_SECONDS);
|
||||
task = GNUNET_SCHEDULER_add_delayed (delay,
|
||||
&run_shard,
|
||||
NULL);
|
||||
return;
|
||||
}
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to begin shard (%d)!\n",
|
||||
qs);
|
||||
GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs);
|
||||
global_ret = EXIT_FAILURE;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
}
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Starting shard [%u:%u]!\n",
|
||||
(unsigned int) s->shard_start,
|
||||
(unsigned int) s->shard_end);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_routing,
|
||||
s);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* First task.
|
||||
*
|
||||
* @param cls closure, NULL
|
||||
* @param args remaining command-line arguments
|
||||
* @param cfgfile name of the configuration file used (for saving, can be NULL!)
|
||||
* @param c configuration
|
||||
*/
|
||||
static void
|
||||
run (void *cls,
|
||||
char *const *args,
|
||||
const char *cfgfile,
|
||||
const struct GNUNET_CONFIGURATION_Handle *c)
|
||||
{
|
||||
unsigned long long ass;
|
||||
(void) cls;
|
||||
(void) args;
|
||||
(void) cfgfile;
|
||||
|
||||
cfg = c;
|
||||
if (GNUNET_OK != parse_wirewatch_config ())
|
||||
{
|
||||
cfg = NULL;
|
||||
global_ret = EXIT_NOTCONFIGURED;
|
||||
return;
|
||||
}
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_CONFIGURATION_get_value_number (cfg,
|
||||
"exchange",
|
||||
"ROUTER_SHARD_SIZE",
|
||||
&ass))
|
||||
{
|
||||
cfg = NULL;
|
||||
global_ret = EXIT_NOTCONFIGURED;
|
||||
return;
|
||||
}
|
||||
if ( (0 == ass) ||
|
||||
(ass > INT32_MAX) )
|
||||
shard_size = 1U + INT32_MAX;
|
||||
else
|
||||
shard_size = (uint32_t) ass;
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_shard,
|
||||
NULL);
|
||||
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
|
||||
cls);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The main function of the taler-exchange-router.
|
||||
*
|
||||
* @param argc number of arguments from the command line
|
||||
* @param argv command line arguments
|
||||
* @return 0 ok, non-zero on error, see #global_ret
|
||||
*/
|
||||
int
|
||||
main (int argc,
|
||||
char *const *argv)
|
||||
{
|
||||
struct GNUNET_GETOPT_CommandLineOption options[] = {
|
||||
GNUNET_GETOPT_option_timetravel ('T',
|
||||
"timetravel"),
|
||||
GNUNET_GETOPT_option_flag ('t',
|
||||
"test",
|
||||
"run in test mode and exit when idle",
|
||||
&test_mode),
|
||||
GNUNET_GETOPT_option_flag ('y',
|
||||
"kyc-off",
|
||||
"perform wire transfers without KYC checks",
|
||||
&kyc_off),
|
||||
GNUNET_GETOPT_OPTION_END
|
||||
};
|
||||
enum GNUNET_GenericReturnValue ret;
|
||||
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_STRINGS_get_utf8_args (argc, argv,
|
||||
&argc, &argv))
|
||||
return EXIT_INVALIDARGUMENT;
|
||||
TALER_OS_init ();
|
||||
ret = GNUNET_PROGRAM_run (
|
||||
argc, argv,
|
||||
"taler-exchange-router",
|
||||
gettext_noop (
|
||||
"background process that routes P2P transfers"),
|
||||
options,
|
||||
&run, NULL);
|
||||
GNUNET_free_nz ((void *) argv);
|
||||
if (GNUNET_SYSERR == ret)
|
||||
return EXIT_INVALIDARGUMENT;
|
||||
if (GNUNET_NO == ret)
|
||||
return EXIT_SUCCESS;
|
||||
return global_ret;
|
||||
}
|
||||
|
||||
|
||||
/* end of taler-exchange-router.c */
|
Loading…
Reference in New Issue
Block a user