From ce44b4a02849a4f9f3e9cf3fd2e76da4ab2b0e64 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 14 Mar 2020 22:56:14 +0100 Subject: [PATCH] clean up aggregator logic, make it more robust against invariant failures --- doc/prebuilt | 2 +- src/exchange/taler-exchange-aggregator.c | 223 ++++++++++++----------- 2 files changed, 117 insertions(+), 108 deletions(-) diff --git a/doc/prebuilt b/doc/prebuilt index 934a6a183..ca53235cc 160000 --- a/doc/prebuilt +++ b/doc/prebuilt @@ -1 +1 @@ -Subproject commit 934a6a18301e81c4fd1b3a8cda2dc13dca4741cc +Subproject commit ca53235ccfa0458ebf11c204888ca370e20ec3f5 diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 59db4daef..c3b94b3d8 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -16,7 +16,7 @@ /** * @file taler-exchange-aggregator.c - * @brief Process that aggregates outgoing transactions and executes them + * @brief Process that aggregates outgoing transactions and prepares their execution * @author Christian Grothoff */ #include "platform.h" @@ -70,7 +70,7 @@ struct AggregationUnit /** * Row ID of the transaction that started it all. */ - unsigned long long row_id; + uint64_t row_id; /** * The current time (which triggered the aggregation and @@ -100,10 +100,9 @@ struct AggregationUnit struct TALER_BANK_PrepareHandle *ph; /** - * Array of #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT row_ids from the - * aggregation. + * Array of row_ids from the aggregation. */ - unsigned long long *additional_rows; + uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; /** * Offset specifying how many @e additional_rows are in use. @@ -123,11 +122,6 @@ struct AggregationUnit }; -/** - * Which currency is used by this exchange? - */ -static char *exchange_currency_string; - /** * What is the smallest unit we support for wire transfers? * We will need to round down to a multiple of this amount. @@ -162,10 +156,23 @@ static struct GNUNET_SCHEDULER_Task *task; static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; /** - * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR - * on serious errors. + * Value to return from main(). 0 on success, non-zero on erorrs. */ -static int global_ret; +static enum +{ + GR_SUCCESS = 0, + GR_DATABASE_SESSION_FAIL = 1, + GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2, + GR_DATABASE_READY_DEPOSIT_HARD_FAIL = 3, + GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL = 4, + GR_DATABASE_TINY_MARK_HARD_FAIL = 5, + GR_DATABASE_PREPARE_HARD_FAIL = 6, + GR_DATABASE_PREPARE_COMMIT_HARD_FAIL = 7, + GR_INVARIANT_FAILURE = 8, + GR_CONFIGURATION_INVALID = 9, + GR_CMD_LINE_UTF8_ERROR = 9, + GR_CMD_LINE_OPTIONS_WRONG = 10, +} global_ret; /** * #GNUNET_YES if we are in test mode and should exit when idle. @@ -192,7 +199,6 @@ static void cleanup_au (struct AggregationUnit *au) { GNUNET_assert (NULL != au); - GNUNET_free_non_null (au->additional_rows); if (NULL != au->wire) json_decref (au->wire); memset (au, @@ -230,7 +236,7 @@ shutdown_task (void *cls) * @return #GNUNET_OK on success */ static int -parse_wirewatch_config () +parse_wirewatch_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, @@ -254,33 +260,11 @@ parse_wirewatch_config () "AGGREGATOR_IDLE_SLEEP_INTERVAL"); return GNUNET_SYSERR; } - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cfg, - "taler", - "CURRENCY", - &exchange_currency_string)) - { - GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, - "taler", - "CURRENCY"); - return GNUNET_SYSERR; - } - if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Currency `%s' longer than the allowed limit of %u characters.", - exchange_currency_string, - (unsigned int) TALER_CURRENCY_LEN); - return GNUNET_SYSERR; - } - if ( (GNUNET_OK != TALER_config_get_amount (cfg, "taler", "CURRENCY_ROUND_UNIT", ¤cy_round_unit)) || - (0 != strcasecmp (exchange_currency_string, - currency_round_unit.currency)) || ( (0 != currency_round_unit.fraction) && (0 != currency_round_unit.value) ) ) { @@ -396,19 +380,29 @@ deposit_cb (void *cls, } if (GNUNET_NO == au->have_refund) { + struct TALER_Amount ntotal; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Non-refunded transaction, subtracting deposit fee %s\n", TALER_amount2s (deposit_fee)); if (GNUNET_SYSERR == - TALER_amount_subtract (&au->total_amount, + TALER_amount_subtract (&ntotal, amount_with_fee, deposit_fee)) { + /* This should never happen, issue a warning, but continue processing + with an amount of zero, least we hang here for good. */ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at row %llu over %s\n", + "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n", (unsigned long long) row_id, TALER_amount2s (amount_with_fee)); - return GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_assert (GNUNET_OK == + TALER_amount_get_zero (au->total_amount.currency, + &au->total_amount)); + } + else + { + au->total_amount = ntotal; } } @@ -440,13 +434,16 @@ deposit_cb (void *cls, url = TALER_JSON_wire_to_payto (au->wire); au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url); + if (NULL == au->wa) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No exchange account configured for `%s', please fix your setup to continue!\n", + url); + GNUNET_free (url); + return GNUNET_DB_STATUS_HARD_ERROR; + } GNUNET_free (url); } - if (NULL == au->wa) - { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } /* make sure we have current fees */ au->execution_time = GNUNET_TIME_absolute_get (); @@ -462,7 +459,8 @@ deposit_cb (void *cls, if (NULL == af) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Could not get or persist wire fees. Aborting run.\n"); + "Could not get or persist wire fees for %s. Aborting run.\n", + GNUNET_STRINGS_absolute_time_to_string (au->execution_time)); return GNUNET_DB_STATUS_HARD_ERROR; } au->wire_fee = af->wire_fee; @@ -549,17 +547,6 @@ aggregate_cb (void *cls, "Adding transaction amount %s from row %llu to aggregation\n", TALER_amount2s (amount_with_fee), (unsigned long long) row_id); - if (GNUNET_OK != - TALER_amount_add (&au->total_amount, - &au->total_amount, - amount_with_fee)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Overflow or currency incompatibility during aggregation at %llu\n", - (unsigned long long) row_id); - /* Skip this one, but keep going! */ - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } au->have_refund = GNUNET_NO; qs = db_plugin->select_refunds_by_coin (db_plugin->cls, au->session, @@ -580,22 +567,43 @@ aggregate_cb (void *cls, TALER_amount2s (deposit_fee)); if (GNUNET_SYSERR == TALER_amount_subtract (&delta, - &au->total_amount, + amount_with_fee, deposit_fee)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at %llu over amount %s\n", + "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n", (unsigned long long) row_id, TALER_amount2s (&au->total_amount)); - return GNUNET_DB_STATUS_HARD_ERROR; } - au->total_amount = delta; + else + { + GNUNET_assert (GNUNET_OK == + TALER_amount_get_zero (au->total_amount.currency, + &delta)); + } + } + else + { + delta = *amount_with_fee; + } + + { + struct TALER_Amount tmp; + + if (GNUNET_OK != + TALER_amount_add (&tmp, + &au->total_amount, + &delta)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Overflow or currency incompatibility during aggregation at %llu\n", + (unsigned long long) row_id); + /* Skip this one, but keep going! */ + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + au->total_amount = tmp; } - if (NULL == au->additional_rows) - au->additional_rows = GNUNET_new_array ( - TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, - unsigned long long); /* "append" to our list of rows */ au->additional_rows[au->rows_offset++] = row_id; /* insert into aggregation tracking table */ @@ -659,22 +667,16 @@ run_aggregation (void *cls) struct AggregationUnit au_active; struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; - const struct GNUNET_SCHEDULER_TaskContext *tc; - void *buf; - size_t buf_size; (void) cls; task = NULL; - tc = GNUNET_SCHEDULER_get_task_context (); - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for ready deposits to aggregate\n"); if (NULL == (session = db_plugin->get_session (db_plugin->cls))) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain database session!\n"); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_SESSION_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -684,7 +686,7 @@ run_aggregation (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -705,7 +707,7 @@ run_aggregation (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute deposit iteration!\n"); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_READY_DEPOSIT_HARD_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -754,7 +756,7 @@ run_aggregation (void *cls) cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls, session); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -803,7 +805,7 @@ run_aggregation (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL; cleanup_au (&au_active); GNUNET_SCHEDULER_shutdown (); return; @@ -841,6 +843,7 @@ run_aggregation (void *cls) db_plugin->rollback (db_plugin->cls, session); cleanup_au (&au_active); + global_ret = GR_DATABASE_TINY_MARK_HARD_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -864,30 +867,35 @@ run_aggregation (void *cls) TALER_B2S (&au_active.merchant_pub)); GNUNET_free (amount_s); } - { - char *url; - url = TALER_JSON_wire_to_payto (au_active.wire); - TALER_BANK_prepare_transfer (url, - &au_active.final_amount, - exchange_base_url, - &au_active.wtid, - &buf, - &buf_size); - GNUNET_free (url); + { + void *buf; + size_t buf_size; + + { + char *url; + + url = TALER_JSON_wire_to_payto (au_active.wire); + TALER_BANK_prepare_transfer (url, + &au_active.final_amount, + exchange_base_url, + &au_active.wtid, + &buf, + &buf_size); + GNUNET_free (url); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Storing %u bytes of wire prepare data\n", + (unsigned int) buf_size); + /* Commit our intention to execute the wire transfer! */ + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + session, + au_active.wa->method, + buf, + buf_size); + GNUNET_free (buf); } - GNUNET_free_non_null (au_active.additional_rows); - au_active.additional_rows = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Storing %u bytes of wire prepare data\n", - (unsigned int) buf_size); - /* Commit our intention to execute the wire transfer! */ - qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - session, - au_active.wa->method, - buf, - buf_size); - GNUNET_free (buf); /* Commit the WTID data to 'wire_out' to finally satisfy aggregation table constraints */ if (qs >= 0) @@ -918,7 +926,7 @@ run_aggregation (void *cls) db_plugin->rollback (db_plugin->cls, session); /* die hard */ - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_PREPARE_HARD_FAIL; GNUNET_SCHEDULER_shutdown (); return; } @@ -940,7 +948,7 @@ run_aggregation (void *cls) return; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); - global_ret = GNUNET_SYSERR; + global_ret = GR_DATABASE_PREPARE_COMMIT_HARD_FAIL; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: @@ -952,7 +960,7 @@ run_aggregation (void *cls) return; default: GNUNET_break (0); - global_ret = GNUNET_SYSERR; + global_ret = GR_INVARIANT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } @@ -981,7 +989,7 @@ run (void *cls, if (GNUNET_OK != parse_wirewatch_config ()) { cfg = NULL; - global_ret = 1; + global_ret = GR_CONFIGURATION_INVALID; return; } GNUNET_assert (NULL == task); @@ -997,7 +1005,7 @@ run (void *cls, * * @param argc number of arguments from the command line * @param argv command line arguments - * @return 0 ok, 1 on error + * @return 0 ok, non-zero on error, see #global_ret */ int main (int argc, @@ -1014,9 +1022,10 @@ main (int argc, GNUNET_GETOPT_OPTION_END }; - if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, - &argc, &argv)) - return 2; + if (GNUNET_OK != + GNUNET_STRINGS_get_utf8_args (argc, argv, + &argc, &argv)) + return GR_CMD_LINE_UTF8_ERROR; if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "taler-exchange-aggregator", @@ -1026,7 +1035,7 @@ main (int argc, &run, NULL)) { GNUNET_free ((void *) argv); - return 1; + return GR_CMD_LINE_OPTIONS_WRONG; } GNUNET_free ((void *) argv); return global_ret;