clean up wirewatch logic

This commit is contained in:
Christian Grothoff 2020-03-15 21:20:56 +01:00
parent c898a1e13b
commit d3f7cc1184
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 154 additions and 109 deletions

View File

@ -54,9 +54,15 @@ PORT = 8081
BASE_URL = http://localhost:8081/ BASE_URL = http://localhost:8081/
# How long should the aggregator sleep if it has nothing to do? # How long should the aggregator (and closer, and transfer)
# sleep if it has nothing to do?
AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
# How long should wirewatch sleep if it has nothing to do?
# (Set very aggressively here for the demonstrators to be
# super fast.)
WIREWATCH_IDLE_SLEEP_INTERVAL = 1 s
# how long is one signkey valid? # how long is one signkey valid?
SIGNKEY_DURATION = 4 weeks SIGNKEY_DURATION = 4 weeks

View File

@ -1,6 +1,6 @@
/* /*
This file is part of TALER This file is part of TALER
Copyright (C) 2016, 2017, 2018 Taler Systems SA Copyright (C) 2016--2020 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software terms of the GNU Affero General Public License as published by the Free Software
@ -29,12 +29,13 @@
#include "taler_json_lib.h" #include "taler_json_lib.h"
#include "taler_bank_service.h" #include "taler_bank_service.h"
/** #define DEBUG_LOGGING 0
* How long do we sleep before trying again if there
* are no transactions returned by the wire plugin?
*/
#define DELAY GNUNET_TIME_UNIT_SECONDS
/**
* What is the initial batch size we use for credit history
* requests with the bank. See `batch_size` below.
*/
#define INITIAL_BATCH_SIZE 1024
/** /**
* Information we keep for each supported account. * Information we keep for each supported account.
@ -56,11 +57,48 @@ struct WireAccount
*/ */
char *section_name; char *section_name;
/**
* Database session we are using for the current transaction.
*/
struct TALER_EXCHANGEDB_Session *session;
/**
* Active request for history.
*/
struct TALER_BANK_CreditHistoryHandle *hh;
/** /**
* Authentication data. * Authentication data.
*/ */
struct TALER_BANK_AuthenticationData auth; struct TALER_BANK_AuthenticationData auth;
/**
* Until when is processing this wire plugin delayed?
*/
struct GNUNET_TIME_Absolute delayed_until;
/**
* Encoded offset in the wire transfer list from where
* to start the next query with the bank.
*/
uint64_t last_row_off;
/**
* Latest row offset seen in this transaction, becomes
* the new #last_row_off upon commit.
*/
uint64_t latest_row_off;
/**
* How many transactions do we retrieve per batch?
*/
unsigned int batch_size;
/**
* How many transactions did we see in the current batch?
*/
unsigned int current_batch_size;
/** /**
* Are we running from scratch and should re-process all transactions * Are we running from scratch and should re-process all transactions
* for this account? * for this account?
@ -68,9 +106,10 @@ struct WireAccount
int reset_mode; int reset_mode;
/** /**
* Until when is processing this wire plugin delayed? * Should we delay the next request to the wire plugin a bit? Set to
* #GNUNET_NO if we actually did some work.
*/ */
struct GNUNET_TIME_Absolute delayed_until; int delay;
}; };
@ -86,7 +125,8 @@ static struct WireAccount *wa_head;
static struct WireAccount *wa_tail; static struct WireAccount *wa_tail;
/** /**
* Wire plugin we are currently using. * Wire account we are currently processing. This would go away
* if we ever start processing all accounts in parallel.
*/ */
static struct WireAccount *wa_pos; static struct WireAccount *wa_pos;
@ -111,27 +151,25 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/** /**
* Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR * How long should we sleep when idle before trying to find more work?
*/
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
* Value to return from main(). 0 on success, non-zero on
* on serious errors. * on serious errors.
*/ */
static int global_ret; static enum
{
/** GR_SUCCESS = 0,
* Encoded offset in the wire transfer list from where GR_DATABASE_SESSION_FAIL = 1,
* to start the next query with the bank. GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
*/ GR_DATABASE_SELECT_LATEST_HARD_FAIL = 3,
static uint64_t last_row_off; GR_BANK_REQUEST_HISTORY_FAIL = 4,
GR_CONFIGURATION_INVALID = 5,
/** GR_CMD_LINE_UTF8_ERROR = 6,
* Latest row offset seen in this transaction, becomes GR_CMD_LINE_OPTIONS_WRONG = 7,
* the new #last_row_off upon commit. } global_ret;
*/
static uint64_t latest_row_off;
/**
* Should we delay the next request to the wire plugin a bit?
*/
static int delay;
/** /**
* Are we run in testing mode and should only do one pass? * Are we run in testing mode and should only do one pass?
@ -144,25 +182,10 @@ static int test_mode;
static int reset_mode; static int reset_mode;
/** /**
* How many transactions do we retrieve per batch? * Current task waiting for execution, if any.
*/
static unsigned int batch_size = 1024;
/**
* How many transactions did we see in the current batch?
*/
static unsigned int current_batch_size;
/**
* Next task to run, if any.
*/ */
static struct GNUNET_SCHEDULER_Task *task; static struct GNUNET_SCHEDULER_Task *task;
/**
* Active request for history.
*/
static struct TALER_BANK_CreditHistoryHandle *hh;
/** /**
* We're being aborted with CTRL-C (or SIGTERM). Shut down. * We're being aborted with CTRL-C (or SIGTERM). Shut down.
@ -173,11 +196,26 @@ static void
shutdown_task (void *cls) shutdown_task (void *cls)
{ {
(void) cls; (void) cls;
if (NULL != hh)
{ {
TALER_BANK_credit_history_cancel (hh); struct WireAccount *wa;
hh = NULL;
while (NULL != (wa = wa_head))
{
if (NULL != wa->hh)
{
TALER_BANK_credit_history_cancel (wa->hh);
wa->hh = NULL;
} }
GNUNET_CONTAINER_DLL_remove (wa_head,
wa_tail,
wa);
TALER_BANK_auth_free (&wa->auth);
GNUNET_free (wa->section_name);
GNUNET_free (wa);
}
}
wa_pos = NULL;
if (NULL != ctx) if (NULL != ctx)
{ {
GNUNET_CURL_fini (ctx); GNUNET_CURL_fini (ctx);
@ -195,21 +233,6 @@ shutdown_task (void *cls)
} }
TALER_EXCHANGEDB_plugin_unload (db_plugin); TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL; db_plugin = NULL;
{
struct WireAccount *wa;
while (NULL != (wa = wa_head))
{
GNUNET_CONTAINER_DLL_remove (wa_head,
wa_tail,
wa);
TALER_BANK_auth_free (&wa->auth);
GNUNET_free (wa->section_name);
GNUNET_free (wa);
}
}
wa_pos = NULL;
last_row_off = 0;
} }
@ -243,6 +266,7 @@ add_account_cb (void *cls,
return; return;
} }
wa->section_name = GNUNET_strdup (ai->section_name); wa->section_name = GNUNET_strdup (ai->section_name);
wa->batch_size = INITIAL_BATCH_SIZE;
GNUNET_CONTAINER_DLL_insert (wa_head, GNUNET_CONTAINER_DLL_insert (wa_head,
wa_tail, wa_tail,
wa); wa);
@ -258,6 +282,17 @@ add_account_cb (void *cls,
static int static int
exchange_serve_process_config (void) exchange_serve_process_config (void)
{ {
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange",
"WIREWATCH_IDLE_SLEEP_INTERVAL",
&wirewatch_idle_sleep_interval))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
"WIREWATCH_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
if (NULL == if (NULL ==
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
{ {
@ -292,7 +327,7 @@ find_transfers (void *cls);
* Callbacks of this type are used to serve the result of asking * Callbacks of this type are used to serve the result of asking
* the bank for the transaction history. * the bank for the transaction history.
* *
* @param cls closure with the `struct TALER_EXCHANGEDB_Session *` * @param cls closure with the `struct WioreAccount *` we are processing
* @param http_status HTTP status code from the server * @param http_status HTTP status code from the server
* @param ec taler error code * @param ec taler error code
* @param serial_id identification of the position at which we are querying * @param serial_id identification of the position at which we are querying
@ -308,13 +343,14 @@ history_cb (void *cls,
const struct TALER_BANK_CreditDetails *details, const struct TALER_BANK_CreditDetails *details,
const json_t *json) const json_t *json)
{ {
struct TALER_EXCHANGEDB_Session *session = cls; struct WireAccount *wa = cls;
struct TALER_EXCHANGEDB_Session *session = wa->session;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
(void) json; (void) json;
if (NULL == details) if (NULL == details)
{ {
hh = NULL; wa->hh = NULL;
if (TALER_EC_NONE != ec) if (TALER_EC_NONE != ec)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@ -331,8 +367,8 @@ history_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Got DB soft error for commit\n"); "Got DB soft error for commit\n");
/* reduce transaction size to reduce rollback probability */ /* reduce transaction size to reduce rollback probability */
if (2 > current_batch_size) if (2 > wa->current_batch_size)
current_batch_size /= 2; wa->current_batch_size /= 2;
/* try again */ /* try again */
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers, task = GNUNET_SCHEDULER_add_now (&find_transfers,
@ -342,27 +378,28 @@ history_cb (void *cls,
if (0 < qs) if (0 < qs)
{ {
/* transaction success, update #last_row_off */ /* transaction success, update #last_row_off */
last_row_off = latest_row_off; wa->last_row_off = wa->latest_row_off;
latest_row_off = 0; wa->latest_row_off = 0; /* should not be needed */
wa->session = NULL; /* should not be needed */
/* if successful at limit, try increasing transaction batch size (AIMD) */ /* if successful at limit, try increasing transaction batch size (AIMD) */
if (current_batch_size == batch_size) if ( (wa->current_batch_size == wa->batch_size) &&
batch_size++; (UINT_MAX > wa->batch_size) )
wa->batch_size++;
} }
GNUNET_break (0 <= qs); GNUNET_break (0 <= qs);
if ( (GNUNET_YES == delay) && if ( (GNUNET_YES == wa->delay) &&
(test_mode) && (test_mode) &&
(NULL == wa_pos->next) ) (NULL == wa->next) )
{ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown due to test mode!\n"); "Shutdown due to test mode!\n");
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK; return GNUNET_OK;
} }
if (GNUNET_YES == delay) if (GNUNET_YES == wa->delay)
{ {
wa_pos->delayed_until wa->delayed_until
= GNUNET_TIME_relative_to_absolute (DELAY); = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa_pos = wa_pos->next; wa_pos = wa_pos->next;
if (NULL == wa_pos) if (NULL == wa_pos)
wa_pos = wa_head; wa_pos = wa_head;
@ -381,13 +418,14 @@ history_cb (void *cls,
/** /**
* Debug block. * Debug block.
*/ */
#if DEBUG_LOGGING
{ {
/* Should be 53, give 80 just to be redundant. */ /** Should be 53, give 80 just to be extra conservative (and aligned). */
#define PUBSIZE 80 #define PUBSIZE 80
char wtid_s[PUBSIZE]; char wtid_s[PUBSIZE];
GNUNET_break GNUNET_break (NULL !=
(NULL != GNUNET_STRINGS_data_to_string (&details->reserve_pub, GNUNET_STRINGS_data_to_string (&details->reserve_pub,
sizeof (details->reserve_pub), sizeof (details->reserve_pub),
&wtid_s[0], &wtid_s[0],
PUBSIZE)); PUBSIZE));
@ -395,15 +433,17 @@ history_cb (void *cls,
"Plain text subject (= reserve_pub): %s\n", "Plain text subject (= reserve_pub): %s\n",
wtid_s); wtid_s);
} }
#endif
current_batch_size++; if (wa->current_batch_size < UINT_MAX)
wa->current_batch_size++;
qs = db_plugin->reserves_in_insert (db_plugin->cls, qs = db_plugin->reserves_in_insert (db_plugin->cls,
session, session,
&details->reserve_pub, &details->reserve_pub,
&details->amount, &details->amount,
details->execution_date, details->execution_date,
details->debit_account_url, details->debit_account_url,
wa_pos->section_name, wa->section_name,
serial_id); serial_id);
if (GNUNET_DB_STATUS_HARD_ERROR == qs) if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{ {
@ -425,8 +465,8 @@ history_cb (void *cls,
NULL); NULL);
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
wa->delay = GNUNET_NO;
latest_row_off = serial_id; wa->latest_row_off = serial_id;
return GNUNET_OK; return GNUNET_OK;
} }
@ -446,12 +486,11 @@ find_transfers (void *cls)
task = NULL; task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Checking for incoming wire transfers\n"); "Checking for incoming wire transfers\n");
if (NULL == (session = db_plugin->get_session (db_plugin->cls))) if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database session!\n"); "Failed to obtain database session!\n");
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_SESSION_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -464,7 +503,7 @@ find_transfers (void *cls)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n"); "Failed to start database transaction!\n");
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -473,14 +512,14 @@ find_transfers (void *cls)
qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls, qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
session, session,
wa_pos->section_name, wa_pos->section_name,
&last_row_off); &wa_pos->last_row_off);
if (GNUNET_DB_STATUS_HARD_ERROR == qs) if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain starting point for montoring from database!\n"); "Failed to obtain starting point for montoring from database!\n");
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -493,28 +532,28 @@ find_transfers (void *cls)
NULL); NULL);
return; return;
} }
}
wa_pos->reset_mode = GNUNET_NO; wa_pos->reset_mode = GNUNET_NO;
delay = GNUNET_YES; }
current_batch_size = 0; wa_pos->delay = GNUNET_YES;
wa_pos->current_batch_size = 0; /* reset counter */
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"wirewatch: requesting incoming history from %s\n", "wirewatch: requesting incoming history from %s\n",
wa_pos->auth.wire_gateway_url); wa_pos->auth.wire_gateway_url);
wa_pos->session = session;
hh = TALER_BANK_credit_history (ctx, wa_pos->hh = TALER_BANK_credit_history (ctx,
&wa_pos->auth, &wa_pos->auth,
last_row_off, wa_pos->last_row_off,
batch_size, wa_pos->batch_size,
&history_cb, &history_cb,
session); wa_pos);
if (NULL == hh) if (NULL == wa_pos->hh)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start request for account history!\n"); "Failed to start request for account history!\n");
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GR_BANK_REQUEST_HISTORY_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -542,7 +581,7 @@ run (void *cls,
if (GNUNET_OK != if (GNUNET_OK !=
exchange_serve_process_config ()) exchange_serve_process_config ())
{ {
global_ret = 1; global_ret = GR_CONFIGURATION_INVALID;
return; return;
} }
wa_pos = wa_head; wa_pos = wa_head;
@ -567,7 +606,7 @@ run (void *cls,
* *
* @param argc number of arguments from the command line * @param argc number of arguments from the command line
* @param argv command line arguments * @param argv command line arguments
* @return 0 ok, 1 on error * @return 0 ok, non-zero on error
*/ */
int int
main (int argc, main (int argc,
@ -590,7 +629,7 @@ main (int argc,
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv, GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv)) &argc, &argv))
return 2; return GR_CMD_LINE_UTF8_ERROR;
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_PROGRAM_run (argc, argv, GNUNET_PROGRAM_run (argc, argv,
"taler-exchange-wirewatch", "taler-exchange-wirewatch",
@ -600,7 +639,7 @@ main (int argc,
&run, NULL)) &run, NULL))
{ {
GNUNET_free ((void *) argv); GNUNET_free ((void *) argv);
return 1; return GR_CMD_LINE_OPTIONS_WRONG;
} }
GNUNET_free ((void *) argv); GNUNET_free ((void *) argv);
return global_ret; return global_ret;