clean up aggregator logic, make it more robust against invariant failures

This commit is contained in:
Christian Grothoff 2020-03-14 22:56:14 +01:00
parent 6aca928cf8
commit ce44b4a028
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 117 additions and 108 deletions

@ -1 +1 @@
Subproject commit 934a6a18301e81c4fd1b3a8cda2dc13dca4741cc Subproject commit ca53235ccfa0458ebf11c204888ca370e20ec3f5

View File

@ -16,7 +16,7 @@
/** /**
* @file taler-exchange-aggregator.c * @file taler-exchange-aggregator.c
* @brief Process that aggregates outgoing transactions and executes them * @brief Process that aggregates outgoing transactions and prepares their execution
* @author Christian Grothoff * @author Christian Grothoff
*/ */
#include "platform.h" #include "platform.h"
@ -70,7 +70,7 @@ struct AggregationUnit
/** /**
* Row ID of the transaction that started it all. * Row ID of the transaction that started it all.
*/ */
unsigned long long row_id; uint64_t row_id;
/** /**
* The current time (which triggered the aggregation and * The current time (which triggered the aggregation and
@ -100,10 +100,9 @@ struct AggregationUnit
struct TALER_BANK_PrepareHandle *ph; struct TALER_BANK_PrepareHandle *ph;
/** /**
* Array of #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT row_ids from the * Array of row_ids from the aggregation.
* aggregation.
*/ */
unsigned long long *additional_rows; uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
/** /**
* Offset specifying how many @e additional_rows are in use. * Offset specifying how many @e additional_rows are in use.
@ -123,11 +122,6 @@ struct AggregationUnit
}; };
/**
* Which currency is used by this exchange?
*/
static char *exchange_currency_string;
/** /**
* What is the smallest unit we support for wire transfers? * What is the smallest unit we support for wire transfers?
* We will need to round down to a multiple of this amount. * We will need to round down to a multiple of this amount.
@ -162,10 +156,23 @@ static struct GNUNET_SCHEDULER_Task *task;
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
/** /**
* Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR * Value to return from main(). 0 on success, non-zero on erorrs.
* on serious errors.
*/ */
static int global_ret; static enum
{
GR_SUCCESS = 0,
GR_DATABASE_SESSION_FAIL = 1,
GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
GR_DATABASE_READY_DEPOSIT_HARD_FAIL = 3,
GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL = 4,
GR_DATABASE_TINY_MARK_HARD_FAIL = 5,
GR_DATABASE_PREPARE_HARD_FAIL = 6,
GR_DATABASE_PREPARE_COMMIT_HARD_FAIL = 7,
GR_INVARIANT_FAILURE = 8,
GR_CONFIGURATION_INVALID = 9,
GR_CMD_LINE_UTF8_ERROR = 9,
GR_CMD_LINE_OPTIONS_WRONG = 10,
} global_ret;
/** /**
* #GNUNET_YES if we are in test mode and should exit when idle. * #GNUNET_YES if we are in test mode and should exit when idle.
@ -192,7 +199,6 @@ static void
cleanup_au (struct AggregationUnit *au) cleanup_au (struct AggregationUnit *au)
{ {
GNUNET_assert (NULL != au); GNUNET_assert (NULL != au);
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire) if (NULL != au->wire)
json_decref (au->wire); json_decref (au->wire);
memset (au, memset (au,
@ -230,7 +236,7 @@ shutdown_task (void *cls)
* @return #GNUNET_OK on success * @return #GNUNET_OK on success
*/ */
static int static int
parse_wirewatch_config () parse_wirewatch_config (void)
{ {
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg, GNUNET_CONFIGURATION_get_value_string (cfg,
@ -254,33 +260,11 @@ parse_wirewatch_config ()
"AGGREGATOR_IDLE_SLEEP_INTERVAL"); "AGGREGATOR_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg,
"taler",
"CURRENCY",
&exchange_currency_string))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"taler",
"CURRENCY");
return GNUNET_SYSERR;
}
if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Currency `%s' longer than the allowed limit of %u characters.",
exchange_currency_string,
(unsigned int) TALER_CURRENCY_LEN);
return GNUNET_SYSERR;
}
if ( (GNUNET_OK != if ( (GNUNET_OK !=
TALER_config_get_amount (cfg, TALER_config_get_amount (cfg,
"taler", "taler",
"CURRENCY_ROUND_UNIT", "CURRENCY_ROUND_UNIT",
&currency_round_unit)) || &currency_round_unit)) ||
(0 != strcasecmp (exchange_currency_string,
currency_round_unit.currency)) ||
( (0 != currency_round_unit.fraction) && ( (0 != currency_round_unit.fraction) &&
(0 != currency_round_unit.value) ) ) (0 != currency_round_unit.value) ) )
{ {
@ -396,19 +380,29 @@ deposit_cb (void *cls,
} }
if (GNUNET_NO == au->have_refund) if (GNUNET_NO == au->have_refund)
{ {
struct TALER_Amount ntotal;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Non-refunded transaction, subtracting deposit fee %s\n", "Non-refunded transaction, subtracting deposit fee %s\n",
TALER_amount2s (deposit_fee)); TALER_amount2s (deposit_fee));
if (GNUNET_SYSERR == if (GNUNET_SYSERR ==
TALER_amount_subtract (&au->total_amount, TALER_amount_subtract (&ntotal,
amount_with_fee, amount_with_fee,
deposit_fee)) deposit_fee))
{ {
/* This should never happen, issue a warning, but continue processing
with an amount of zero, least we hang here for good. */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Fatally malformed record at row %llu over %s\n", "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n",
(unsigned long long) row_id, (unsigned long long) row_id,
TALER_amount2s (amount_with_fee)); TALER_amount2s (amount_with_fee));
return GNUNET_DB_STATUS_HARD_ERROR; GNUNET_assert (GNUNET_OK ==
TALER_amount_get_zero (au->total_amount.currency,
&au->total_amount));
}
else
{
au->total_amount = ntotal;
} }
} }
@ -440,13 +434,16 @@ deposit_cb (void *cls,
url = TALER_JSON_wire_to_payto (au->wire); url = TALER_JSON_wire_to_payto (au->wire);
au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url); au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url);
GNUNET_free (url);
}
if (NULL == au->wa) if (NULL == au->wa)
{ {
GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No exchange account configured for `%s', please fix your setup to continue!\n",
url);
GNUNET_free (url);
return GNUNET_DB_STATUS_HARD_ERROR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
GNUNET_free (url);
}
/* make sure we have current fees */ /* make sure we have current fees */
au->execution_time = GNUNET_TIME_absolute_get (); au->execution_time = GNUNET_TIME_absolute_get ();
@ -462,7 +459,8 @@ deposit_cb (void *cls,
if (NULL == af) if (NULL == af)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Could not get or persist wire fees. Aborting run.\n"); "Could not get or persist wire fees for %s. Aborting run.\n",
GNUNET_STRINGS_absolute_time_to_string (au->execution_time));
return GNUNET_DB_STATUS_HARD_ERROR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
au->wire_fee = af->wire_fee; au->wire_fee = af->wire_fee;
@ -549,17 +547,6 @@ aggregate_cb (void *cls,
"Adding transaction amount %s from row %llu to aggregation\n", "Adding transaction amount %s from row %llu to aggregation\n",
TALER_amount2s (amount_with_fee), TALER_amount2s (amount_with_fee),
(unsigned long long) row_id); (unsigned long long) row_id);
if (GNUNET_OK !=
TALER_amount_add (&au->total_amount,
&au->total_amount,
amount_with_fee))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Overflow or currency incompatibility during aggregation at %llu\n",
(unsigned long long) row_id);
/* Skip this one, but keep going! */
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
au->have_refund = GNUNET_NO; au->have_refund = GNUNET_NO;
qs = db_plugin->select_refunds_by_coin (db_plugin->cls, qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
au->session, au->session,
@ -580,22 +567,43 @@ aggregate_cb (void *cls,
TALER_amount2s (deposit_fee)); TALER_amount2s (deposit_fee));
if (GNUNET_SYSERR == if (GNUNET_SYSERR ==
TALER_amount_subtract (&delta, TALER_amount_subtract (&delta,
&au->total_amount, amount_with_fee,
deposit_fee)) deposit_fee))
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Fatally malformed record at %llu over amount %s\n", "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n",
(unsigned long long) row_id, (unsigned long long) row_id,
TALER_amount2s (&au->total_amount)); TALER_amount2s (&au->total_amount));
return GNUNET_DB_STATUS_HARD_ERROR;
} }
au->total_amount = delta; else
{
GNUNET_assert (GNUNET_OK ==
TALER_amount_get_zero (au->total_amount.currency,
&delta));
}
}
else
{
delta = *amount_with_fee;
}
{
struct TALER_Amount tmp;
if (GNUNET_OK !=
TALER_amount_add (&tmp,
&au->total_amount,
&delta))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Overflow or currency incompatibility during aggregation at %llu\n",
(unsigned long long) row_id);
/* Skip this one, but keep going! */
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
au->total_amount = tmp;
} }
if (NULL == au->additional_rows)
au->additional_rows = GNUNET_new_array (
TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT,
unsigned long long);
/* "append" to our list of rows */ /* "append" to our list of rows */
au->additional_rows[au->rows_offset++] = row_id; au->additional_rows[au->rows_offset++] = row_id;
/* insert into aggregation tracking table */ /* insert into aggregation tracking table */
@ -659,22 +667,16 @@ run_aggregation (void *cls)
struct AggregationUnit au_active; struct AggregationUnit au_active;
struct TALER_EXCHANGEDB_Session *session; struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
const struct GNUNET_SCHEDULER_TaskContext *tc;
void *buf;
size_t buf_size;
(void) cls; (void) cls;
task = NULL; task = NULL;
tc = GNUNET_SCHEDULER_get_task_context ();
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for ready deposits to aggregate\n"); "Checking for ready deposits to aggregate\n");
if (NULL == (session = db_plugin->get_session (db_plugin->cls))) if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database session!\n"); "Failed to obtain database session!\n");
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_SESSION_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -684,7 +686,7 @@ run_aggregation (void *cls)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n"); "Failed to start database transaction!\n");
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -705,7 +707,7 @@ run_aggregation (void *cls)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n"); "Failed to execute deposit iteration!\n");
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_READY_DEPOSIT_HARD_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -754,7 +756,7 @@ run_aggregation (void *cls)
cleanup_au (&au_active); cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -803,7 +805,7 @@ run_aggregation (void *cls)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n"); "Failed to start database transaction!\n");
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
cleanup_au (&au_active); cleanup_au (&au_active);
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
@ -841,6 +843,7 @@ run_aggregation (void *cls)
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
cleanup_au (&au_active); cleanup_au (&au_active);
global_ret = GR_DATABASE_TINY_MARK_HARD_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -864,6 +867,11 @@ run_aggregation (void *cls)
TALER_B2S (&au_active.merchant_pub)); TALER_B2S (&au_active.merchant_pub));
GNUNET_free (amount_s); GNUNET_free (amount_s);
} }
{
void *buf;
size_t buf_size;
{ {
char *url; char *url;
@ -876,8 +884,7 @@ run_aggregation (void *cls)
&buf_size); &buf_size);
GNUNET_free (url); GNUNET_free (url);
} }
GNUNET_free_non_null (au_active.additional_rows);
au_active.additional_rows = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Storing %u bytes of wire prepare data\n", "Storing %u bytes of wire prepare data\n",
(unsigned int) buf_size); (unsigned int) buf_size);
@ -888,6 +895,7 @@ run_aggregation (void *cls)
buf, buf,
buf_size); buf_size);
GNUNET_free (buf); GNUNET_free (buf);
}
/* Commit the WTID data to 'wire_out' to finally satisfy aggregation /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
table constraints */ table constraints */
if (qs >= 0) if (qs >= 0)
@ -918,7 +926,7 @@ run_aggregation (void *cls)
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
/* die hard */ /* die hard */
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_PREPARE_HARD_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -940,7 +948,7 @@ run_aggregation (void *cls)
return; return;
case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0); GNUNET_break (0);
global_ret = GNUNET_SYSERR; global_ret = GR_DATABASE_PREPARE_COMMIT_HARD_FAIL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
@ -952,7 +960,7 @@ run_aggregation (void *cls)
return; return;
default: default:
GNUNET_break (0); GNUNET_break (0);
global_ret = GNUNET_SYSERR; global_ret = GR_INVARIANT_FAILURE;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
@ -981,7 +989,7 @@ run (void *cls,
if (GNUNET_OK != parse_wirewatch_config ()) if (GNUNET_OK != parse_wirewatch_config ())
{ {
cfg = NULL; cfg = NULL;
global_ret = 1; global_ret = GR_CONFIGURATION_INVALID;
return; return;
} }
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
@ -997,7 +1005,7 @@ run (void *cls,
* *
* @param argc number of arguments from the command line * @param argc number of arguments from the command line
* @param argv command line arguments * @param argv command line arguments
* @return 0 ok, 1 on error * @return 0 ok, non-zero on error, see #global_ret
*/ */
int int
main (int argc, main (int argc,
@ -1014,9 +1022,10 @@ main (int argc,
GNUNET_GETOPT_OPTION_END GNUNET_GETOPT_OPTION_END
}; };
if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv)) &argc, &argv))
return 2; return GR_CMD_LINE_UTF8_ERROR;
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_PROGRAM_run (argc, argv, GNUNET_PROGRAM_run (argc, argv,
"taler-exchange-aggregator", "taler-exchange-aggregator",
@ -1026,7 +1035,7 @@ main (int argc,
&run, NULL)) &run, NULL))
{ {
GNUNET_free ((void *) argv); GNUNET_free ((void *) argv);
return 1; return GR_CMD_LINE_OPTIONS_WRONG;
} }
GNUNET_free ((void *) argv); GNUNET_free ((void *) argv);
return global_ret; return global_ret;