exchange/src/exchange/taler-exchange-wirewatch.c
2021-12-11 14:03:08 +01:00

878 lines
24 KiB
C

/*
This file is part of TALER
Copyright (C) 2016--2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
/**
* @file taler-exchange-wirewatch.c
* @brief Process that watches for wire transfers to the exchange's bank account
* @author Christian Grothoff
*/
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
#include <jansson.h>
#include <pthread.h>
#include <microhttpd.h>
#include "taler_exchangedb_lib.h"
#include "taler_exchangedb_plugin.h"
#include "taler_json_lib.h"
#include "taler_bank_service.h"
/**
* How long to wait for an HTTP reply if there
* are no transactions pending at the server?
*/
#define LONGPOLL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
/**
* What is the maximum batch size we use for credit history
* requests with the bank. See `batch_size` below.
*/
#define MAXIMUM_BATCH_SIZE 1024
/**
* Information we keep for each supported account.
*/
struct WireAccount
{
/**
* Accounts are kept in a DLL.
*/
struct WireAccount *next;
/**
* Plugins are kept in a DLL.
*/
struct WireAccount *prev;
/**
* Information about this account.
*/
const struct TALER_EXCHANGEDB_AccountInfo *ai;
/**
* Active request for history.
*/
struct TALER_BANK_CreditHistoryHandle *hh;
/**
* Until when is processing this wire plugin delayed?
*/
struct GNUNET_TIME_Absolute delayed_until;
/**
* Encoded offset in the wire transfer list from where
* to start the next query with the bank.
*/
uint64_t batch_start;
/**
* Latest row offset seen in this transaction, becomes
* the new #batch_start upon commit.
*/
uint64_t latest_row_off;
/**
* Offset where our current shard begins (inclusive).
*/
uint64_t shard_start;
/**
* Offset where our current shard ends (exclusive).
*/
uint64_t shard_end;
/**
* When did we start with the shard?
*/
struct GNUNET_TIME_Absolute shard_start_time;
/**
* Name of our job in the shard table.
*/
char *job_name;
/**
* How many transactions do we retrieve per batch?
*/
unsigned int batch_size;
/**
* How much do we incremnt @e batch_size on success?
*/
unsigned int batch_thresh;
/**
* How many transactions did we see in the current batch?
*/
unsigned int current_batch_size;
/**
* Should we delay the next request to the wire plugin a bit? Set to
* false if we actually did some work.
*/
bool delay;
/**
* Did we start a transaction yet?
*/
bool started_transaction;
};
/**
* Head of list of loaded wire plugins.
*/
static struct WireAccount *wa_head;
/**
* Tail of list of loaded wire plugins.
*/
static struct WireAccount *wa_tail;
/**
* Wire account we are currently processing. This would go away
* if we ever start processing all accounts in parallel.
*/
static struct WireAccount *wa_pos;
/**
* Handle to the context for interacting with the bank.
*/
static struct GNUNET_CURL_Context *ctx;
/**
* Scheduler context for running the @e ctx.
*/
static struct GNUNET_CURL_RescheduleContext *rc;
/**
* The exchange's configuration (global)
*/
static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
* Our DB plugin.
*/
static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
* How long should we sleep when idle before trying to find more work?
* Also used for how long we wait to grab a shard before trying it again.
* The value should be set to a bit above the average time it takes to
* process a shard.
*/
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
* How long did we take to finish the last shard?
*/
static struct GNUNET_TIME_Relative shard_delay;
/**
* Modulus to apply to group shards. The shard size must ultimately be a
* multiple of the batch size. Thus, if this is not a multiple of the
* #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
*/
static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
/**
* How many workers should we plan our scheduling with?
*/
static unsigned int max_workers = 16;
/**
* Value to return from main(). 0 on success, non-zero on
* on serious errors.
*/
static int global_ret;
/**
* Are we run in testing mode and should only do one pass?
*/
static int test_mode;
/**
* Current task waiting for execution, if any.
*/
static struct GNUNET_SCHEDULER_Task *task;
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
*
* @param cls closure
*/
static void
shutdown_task (void *cls)
{
(void) cls;
{
struct WireAccount *wa;
while (NULL != (wa = wa_head))
{
if (NULL != wa->hh)
{
TALER_BANK_credit_history_cancel (wa->hh);
wa->hh = NULL;
}
GNUNET_CONTAINER_DLL_remove (wa_head,
wa_tail,
wa);
if (wa->started_transaction)
{
db_plugin->rollback (db_plugin->cls);
wa->started_transaction = false;
}
// FIXME: delete shard lock here (#7124)
GNUNET_free (wa->job_name);
GNUNET_free (wa);
}
}
wa_pos = NULL;
if (NULL != ctx)
{
GNUNET_CURL_fini (ctx);
ctx = NULL;
}
if (NULL != rc)
{
GNUNET_CURL_gnunet_rc_destroy (rc);
rc = NULL;
}
if (NULL != task)
{
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts ();
cfg = NULL;
}
/**
* Function called with information about a wire account. Adds the
* account to our list (if it is enabled and we can load the plugin).
*
* @param cls closure, NULL
* @param ai account information
*/
static void
add_account_cb (void *cls,
const struct TALER_EXCHANGEDB_AccountInfo *ai)
{
struct WireAccount *wa;
(void) cls;
if (! ai->credit_enabled)
return; /* not enabled for us, skip */
wa = GNUNET_new (struct WireAccount);
wa->ai = ai;
GNUNET_asprintf (&wa->job_name,
"wirewatch-%s",
ai->section_name);
wa->batch_size = MAXIMUM_BATCH_SIZE;
if (0 != shard_size % wa->batch_size)
wa->batch_size = shard_size;
GNUNET_CONTAINER_DLL_insert (wa_head,
wa_tail,
wa);
}
/**
* Parse configuration parameters for the exchange server into the
* corresponding global variables.
*
* @return #GNUNET_OK on success
*/
static enum GNUNET_GenericReturnValue
exchange_serve_process_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange",
"WIREWATCH_IDLE_SLEEP_INTERVAL",
&wirewatch_idle_sleep_interval))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
"WIREWATCH_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
if (NULL ==
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to initialize DB subsystem\n");
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
TALER_EXCHANGEDB_load_accounts (cfg,
TALER_EXCHANGEDB_ALO_CREDIT
| TALER_EXCHANGEDB_ALO_AUTHDATA))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No wire accounts configured for credit!\n");
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
return GNUNET_SYSERR;
}
TALER_EXCHANGEDB_find_accounts (&add_account_cb,
NULL);
GNUNET_assert (NULL != wa_head);
return GNUNET_OK;
}
/**
* Query for incoming wire transfers.
*
* @param cls NULL
*/
static void
find_transfers (void *cls);
/**
* We encountered a serialization error.
* Rollback the transaction and try again
*
* @param wa account we are transacting on
*/
static void
handle_soft_error (struct WireAccount *wa)
{
db_plugin->rollback (db_plugin->cls);
wa->started_transaction = false;
if (1 < wa->batch_size)
{
wa->batch_thresh = wa->batch_size;
wa->batch_size /= 2;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Reduced batch size to %llu due to serialization issue\n",
(unsigned long long) wa->batch_size);
}
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
}
/**
* We are done with a shard, move on to the next one.
*
* @param wa wire account for which we completed a shard
*/
static void
shard_completed (struct WireAccount *wa)
{
/* transaction success, update #last_row_off */
wa->batch_start = wa->latest_row_off;
if (wa->batch_size < MAXIMUM_BATCH_SIZE)
{
int delta;
delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
if (delta < 0)
delta = -delta;
wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
wa->batch_size + delta + 1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
}
if (wa->delay)
{
wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa_pos = wa_pos->next;
if (NULL == wa_pos)
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
}
GNUNET_assert (NULL == task);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Will look for more transfers in %s\n",
GNUNET_STRINGS_relative_time_to_string (
GNUNET_TIME_absolute_get_remaining (wa_pos->delayed_until),
GNUNET_YES));
task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
&find_transfers,
NULL);
}
/**
* We are finished with the current shard. Update the database, marking the
* shard as finished.
*
* @param wa wire account to commit for
* @return true on success
*/
static bool
mark_shard_done (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
if (wa->shard_end > wa->latest_row_off)
return false; /* actually, not done! */
/* shard is complete, mark this as well */
qs = db_plugin->complete_shard (db_plugin->cls,
wa->job_name,
wa->shard_start,
wa->shard_end);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
GNUNET_SCHEDULER_shutdown ();
return false;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for complete_shard. Rolling back.\n");
handle_soft_error (wa);
return false;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* already existed, ok, let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
break;
}
return true;
}
/**
* We are finished with the current transaction, try
* to commit and then schedule the next iteration.
*
* @param wa wire account to commit for
*/
static void
do_commit (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
wa->started_transaction = false;
mark_shard_done (wa);
qs = db_plugin->commit (db_plugin->cls);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* reduce transaction size to reduce rollback probability */
handle_soft_error (wa);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
break;
}
shard_completed (wa);
}
/**
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
*
* @param cls closure with the `struct WioreAccount *` we are processing
* @param http_status HTTP status code from the server
* @param ec taler error code
* @param serial_id identification of the position at which we are querying
* @param details details about the wire transfer
* @param json raw JSON response
* @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
*/
static enum GNUNET_GenericReturnValue
history_cb (void *cls,
unsigned int http_status,
enum TALER_ErrorCode ec,
uint64_t serial_id,
const struct TALER_BANK_CreditDetails *details,
const json_t *json)
{
struct WireAccount *wa = cls;
enum GNUNET_DB_QueryStatus qs;
(void) json;
if (NULL == details)
{
wa->hh = NULL;
if (TALER_EC_NONE != ec)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Error fetching history: ec=%u, http_status=%u\n",
(unsigned int) ec,
http_status);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"History response complete\n");
}
if (wa->started_transaction)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"End of list. Committing progress!\n");
do_commit (wa);
}
else
{
if ( (wa->delay) &&
(test_mode) &&
(NULL == wa->next) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown due to test mode!\n");
GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK;
}
else
{
shard_completed (wa);
}
}
return GNUNET_OK; /* will be ignored anyway */
}
if (serial_id < wa->latest_row_off)
{
/* we are done with the current shard, commit and stop this iteration! */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Serial ID %llu not monotonic (got %llu before). Failing!\n",
(unsigned long long) serial_id,
(unsigned long long) wa->latest_row_off);
if (wa->started_transaction)
{
wa->started_transaction = false;
db_plugin->rollback (db_plugin->cls);
}
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
}
if (serial_id > wa->shard_end)
{
/* we are done with the current shard, commit and stop this iteration! */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serial ID %llu past shard end at %llu, ending iteration early!\n",
(unsigned long long) serial_id,
(unsigned long long) wa->shard_end);
wa->latest_row_off = serial_id - 1;
wa->delay = false;
if (wa->started_transaction)
{
do_commit (wa);
}
else
{
if (mark_shard_done (wa))
shard_completed (wa);
}
wa->hh = NULL;
return GNUNET_SYSERR;
}
if (! wa->started_transaction)
{
if (GNUNET_OK !=
db_plugin->start_read_committed (db_plugin->cls,
"wirewatch check for incoming wire transfers"))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
}
wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();
wa->started_transaction = true;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding wire transfer over %s with (hashed) subject `%s'\n",
TALER_amount2s (&details->amount),
TALER_B2S (&details->reserve_pub));
/* FIXME-PERFORMANCE: Consider using Postgres multi-valued insert here,
for up to 15x speed-up according to
https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006
(Note: this may require changing both the
plugin API as well as modifying how this function is called.) */
qs = db_plugin->reserves_in_insert (db_plugin->cls,
&details->reserve_pub,
&details->amount,
details->execution_date,
details->debit_account_uri,
wa->ai->section_name,
serial_id);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
wa->started_transaction = false;
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for reserves_in_insert. Rolling back.\n");
handle_soft_error (wa);
wa->hh = NULL;
return GNUNET_SYSERR;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* already existed, ok, let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
break;
}
wa->delay = false;
wa->latest_row_off = serial_id;
return GNUNET_OK;
}
/**
* Query for incoming wire transfers.
*
* @param cls NULL
*/
static void
find_transfers (void *cls)
{
enum GNUNET_DB_QueryStatus qs;
unsigned int limit;
(void) cls;
task = NULL;
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database connection!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
if (wa_pos->shard_end <= wa_pos->batch_start)
{
uint64_t start;
uint64_t end;
struct GNUNET_TIME_Relative delay;
/* advance to next shard */
if (0 == max_workers)
delay = GNUNET_TIME_UNIT_ZERO;
else
delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
GNUNET_CRYPTO_QUALITY_WEAK,
4 * GNUNET_TIME_relative_max (
wirewatch_idle_sleep_interval,
GNUNET_TIME_relative_multiply (shard_delay,
max_workers)).rel_value_us);
qs = db_plugin->begin_shard (db_plugin->cls,
wa_pos->job_name,
delay,
shard_size,
&start,
&end);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain starting point for montoring from database!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Serialization error tying to obtain shard, will try again in %s!\n",
GNUNET_STRINGS_relative_time_to_string (
wirewatch_idle_sleep_interval,
GNUNET_YES));
task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
&find_transfers,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"No shard available, will try again in %s!\n",
GNUNET_STRINGS_relative_time_to_string (
wirewatch_idle_sleep_interval,
GNUNET_YES));
task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
&find_transfers,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
wa_pos->shard_start = start;
wa_pos->shard_end = end;
wa_pos->batch_start = start;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting with shard at [%llu,%llu) locked for %s\n",
(unsigned long long) start,
(unsigned long long) end,
GNUNET_STRINGS_relative_time_to_string (delay,
GNUNET_YES));
break;
}
}
limit = GNUNET_MIN (wa_pos->batch_size,
wa_pos->shard_end - wa_pos->batch_start);
GNUNET_assert (NULL == wa_pos->hh);
wa_pos->latest_row_off = wa_pos->batch_start;
wa_pos->hh = TALER_BANK_credit_history (ctx,
wa_pos->ai->auth,
wa_pos->batch_start,
limit,
test_mode
? GNUNET_TIME_UNIT_ZERO
: LONGPOLL_TIMEOUT,
&history_cb,
wa_pos);
if (NULL == wa_pos->hh)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start request for account history!\n");
if (wa_pos->started_transaction)
{
db_plugin->rollback (db_plugin->cls);
wa_pos->started_transaction = false;
}
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
}
/**
* First task.
*
* @param cls closure, NULL
* @param args remaining command-line arguments
* @param cfgfile name of the configuration file used (for saving, can be NULL!)
* @param c configuration
*/
static void
run (void *cls,
char *const *args,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
(void) cls;
(void) args;
(void) cfgfile;
cfg = c;
if (GNUNET_OK !=
exchange_serve_process_config ())
{
global_ret = EXIT_NOTCONFIGURED;
return;
}
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
&rc);
rc = GNUNET_CURL_gnunet_rc_create (ctx);
if (NULL == ctx)
{
GNUNET_break (0);
return;
}
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
}
/**
* The main function of taler-exchange-wirewatch
*
* @param argc number of arguments from the command line
* @param argv command line arguments
* @return 0 ok, non-zero on error
*/
int
main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_option_uint ('S',
"size",
"SIZE",
"Size to process per shard (default: 1024)",
&shard_size),
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_option_flag ('t',
"test",
"run in test mode and exit when idle",
&test_mode),
GNUNET_GETOPT_option_uint ('w',
"workers",
"COUNT",
"Plan work load with up to COUNT worker processes (default: 16)",
&max_workers),
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;
if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv))
return EXIT_INVALIDARGUMENT;
TALER_OS_init ();
ret = GNUNET_PROGRAM_run (
argc, argv,
"taler-exchange-wirewatch",
gettext_noop (
"background process that watches for incoming wire transfers from customers"),
options,
&run, NULL);
GNUNET_free_nz ((void *) argv);
if (GNUNET_SYSERR == ret)
return EXIT_INVALIDARGUMENT;
if (GNUNET_NO == ret)
return EXIT_SUCCESS;
return global_ret;
}
/* end of taler-exchange-wirewatch.c */