diff options
| author | Christian Grothoff <christian@grothoff.org> | 2020-03-12 06:11:48 +0100 | 
|---|---|---|
| committer | Christian Grothoff <christian@grothoff.org> | 2020-03-12 08:17:46 +0100 | 
| commit | a1db41e09a618c3a9797242ee593da1331175c14 (patch) | |
| tree | 7f0f123c7dcdb521df82eb8b18074a9c73e461f5 /src/exchange/taler-exchange-aggregator.c | |
| parent | 1896c1dfb58b9e11bd2b4d3822823a623de7004a (diff) | |
aggregator clean up
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 269 | 
1 files changed, 143 insertions, 126 deletions
| diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 90f9f28c..5f99a472 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -18,6 +18,19 @@   * @file taler-exchange-aggregator.c   * @brief Process that aggregates outgoing transactions and executes them   * @author Christian Grothoff + * + * Note: + * It might be simpler and theoretically more performant to split up + * this process into three: + * - one that runs the 'pending' wire transfers + * - one that performs aggregation + * - one that closes (expired) reserves + * + * They would have some (minor) code duplication to load the database and wire + * plugins and account data, and this would also slightly complicate + * operations by having to launch three processes. OTOH, those processes could + * then fail independently, which might also be a good thing.  In any case, + * doing this is not expected to be complicated.   */  #include "platform.h"  #include <gnunet/gnunet_util_lib.h> @@ -30,7 +43,7 @@  /** - * Information we keep for each supported account. + * Information we keep for each supported account of the exchange.   */  struct WireAccount  { @@ -70,6 +83,8 @@ struct WireAccount  /**   * Data we keep to #run_transfers().  There is at most   * one of these around at any given point in time. + * Note that this limits parallelism, and we might want + * to revise this decision at a later point.   */  struct WirePrepareData  { @@ -100,6 +115,8 @@ struct WirePrepareData  /**   * Information about one aggregation process to be executed.  There is   * at most one of these around at any given point in time. + * Note that this limits parallelism, and we might want + * to revise this decision at a later point.   */  struct AggregationUnit  { @@ -139,7 +156,8 @@ struct AggregationUnit    unsigned long long row_id;    /** -   * The current time. +   * The current time (which triggered the aggregation and +   * defines the wire fee).     */    struct GNUNET_TIME_Absolute execution_time; @@ -149,7 +167,8 @@ struct AggregationUnit    json_t *wire;    /** -   * Wire account to be used for the preparation. +   * Exchange wire account to be used for the preparation and +   * eventual execution of the aggregate wire transfer.     */    struct WireAccount *wa; @@ -164,13 +183,13 @@ struct AggregationUnit    struct TALER_BANK_PrepareHandle *ph;    /** -   * Array of #aggregation_limit row_ids from the +   * Array of #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT row_ids from the     * aggregation.     */    unsigned long long *additional_rows;    /** -   * Offset specifying how many #additional_rows are in use. +   * Offset specifying how many @e additional_rows are in use.     */    unsigned int rows_offset; @@ -222,32 +241,35 @@ static struct CloseTransferContext *ctc;  static char *exchange_currency_string;  /** - * How many fractional digits does the currency use? + * What is the smallest unit we support for wire transfers? + * We will need to round down to a multiple of this amount.   */  static struct TALER_Amount currency_round_unit;  /** - * What is the base URL of this exchange? + * What is the base URL of this exchange?  Used in the + * wire transfer subjects to that merchants and governments + * can ask for the list of aggregated deposits.   */  static char *exchange_base_url;  /** - * The exchange's configuration (global) + * The exchange's configuration.   */ -static struct GNUNET_CONFIGURATION_Handle *cfg; +static const struct GNUNET_CONFIGURATION_Handle *cfg;  /** - * Our DB plugin. + * Our database plugin.   */  static struct TALER_EXCHANGEDB_Plugin *db_plugin;  /** - * Head of list wire accounts of the exchange. + * Head of list of wire accounts of the exchange.   */  static struct WireAccount *wa_head;  /** - * Head of list wire accounts of the exchange. + * Tail of list of wire accounts of the exchange.   */  static struct WireAccount *wa_tail; @@ -263,13 +285,7 @@ static struct GNUNET_SCHEDULER_Task *task;  static struct WirePrepareData *wpd;  /** - * If we are currently aggregating transactions, information about the - * active aggregation is here. Otherwise, this variable is NULL. - */ -static struct AggregationUnit *au; - -/** - * Handle to the context for interacting with the bank. + * Handle to the context for interacting with the bank / wire gateway.   */  static struct GNUNET_CURL_Context *ctx; @@ -296,21 +312,11 @@ static int test_mode;  /**   * Did #run_reserve_closures() have any work during its last run? + * Used to detect when we should go to sleep for a while to avoid + * busy waiting.   */  static int reserves_idle; -/** - * Limit on the number of transactions we aggregate at once.  Note - * that the limit must be big enough to ensure that when transactions - * of the smallest possible unit are aggregated, they do surpass the - * "tiny" threshold beyond which we never trigger a wire transaction! - * - * Note: do not change here, Postgres requires us to hard-code the - * LIMIT in the prepared statement. - */ -static unsigned int aggregation_limit = -  TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; -  /**   * Main work function that finds and triggers transfers for reserves @@ -336,15 +342,14 @@ run_aggregation (void *cls);   * Execute the wire transfers that we have committed to   * do.   * - * @param cls pointer to an `int` which we will return from main() + * @param cls NULL   */  static void  run_transfers (void *cls);  /** - * Find the record valid at time @a now in the fee - * structure. + * Find the record valid at time @a now in the fee structure.   *   * @param wa wire transfer fee data structure to update   * @param now timestamp to update fees to @@ -356,7 +361,6 @@ advance_fees (struct WireAccount *wa,  {    struct TALER_EXCHANGEDB_AggregateFees *af; -  /* First, try to see if we have current fee information in memory */    af = wa->af;    while ( (NULL != af) &&            (af->end_date.abs_value_us < now.abs_value_us) ) @@ -416,8 +420,9 @@ update_fees (struct WireAccount *wa,    if (NULL != af)      return af;    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -              "Failed to find current wire transfer fees for `%s'\n", -              wa->method); +              "Failed to find current wire transfer fees for `%s' at %s\n", +              wa->method, +              GNUNET_STRINGS_absolute_time_to_string (now));    return NULL;  } @@ -435,6 +440,9 @@ find_account_by_method (const char *method)      if (0 == strcmp (method,                       wa->method))        return wa; +  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +              "No wire account known for method `%s'\n", +              method);    return NULL;  } @@ -454,9 +462,9 @@ find_account_by_payto_uri (const char *url)    method = TALER_payto_get_method (url);    if (NULL == method)    { -    fprintf (stderr, -             "Invalid payto:// URL `%s'\n", -             url); +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Invalid payto:// URL `%s'\n", +                url);      return NULL;    }    wa = find_account_by_method (method); @@ -496,6 +504,15 @@ add_account_cb (void *cls,      return;    }    wa->method = TALER_payto_get_method (payto_uri); +  if (NULL == wa->method) +  { +    GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, +                               ai->section_name, +                               "PAYTO_URI", +                               "could not obtain wire method from URI"); +    GNUNET_free (wa); +    return; +  }    GNUNET_free (payto_uri);    if (GNUNET_OK !=        TALER_BANK_auth_parse_cfg (cfg, @@ -517,21 +534,20 @@ add_account_cb (void *cls,  /** - * Free data stored in #au. + * Free data stored in @a au, but not @a au itself (stack allocated). + * + * @param au aggreation unit to clean up   */  static void -cleanup_au (void) +cleanup_au (struct AggregationUnit *au)  { -  if (NULL == au) -    return; +  GNUNET_assert (NULL != au);    GNUNET_free_non_null (au->additional_rows);    if (NULL != au->wire) -  {      json_decref (au->wire); -    au->wire = NULL; -  } -  GNUNET_free (au); -  au = NULL; +  memset (au, +          0, +          sizeof (*au));  } @@ -573,12 +589,6 @@ shutdown_task (void *cls)      GNUNET_free (wpd);      wpd = NULL;    } -  if (NULL != au) -  { -    db_plugin->rollback (db_plugin->cls, -                         au->session); -    cleanup_au (); -  }    if (NULL != ctc)    {      db_plugin->rollback (db_plugin->cls, @@ -605,7 +615,6 @@ shutdown_task (void *cls)        GNUNET_free (wa);      }    } -  GNUNET_CONFIGURATION_destroy (cfg);    cfg = NULL;  } @@ -643,20 +652,20 @@ parse_wirewatch_config ()    if (GNUNET_OK !=        GNUNET_CONFIGURATION_get_value_string (cfg,                                               "taler", -                                             "currency", +                                             "CURRENCY",                                               &exchange_currency_string))    {      GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,                                 "taler", -                               "currency"); +                               "CURRENCY");      return GNUNET_SYSERR;    }    if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN)    { -    fprintf (stderr, -             "Currency `%s' longer than the allowed limit of %u characters.", -             exchange_currency_string, -             (unsigned int) TALER_CURRENCY_LEN); +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Currency `%s' longer than the allowed limit of %u characters.", +                exchange_currency_string, +                (unsigned int) TALER_CURRENCY_LEN);      return GNUNET_SYSERR;    } @@ -678,8 +687,8 @@ parse_wirewatch_config ()    if (NULL ==        (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))    { -    fprintf (stderr, -             "Failed to initialize DB subsystem\n"); +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to initialize DB subsystem\n");      return GNUNET_SYSERR;    }    TALER_EXCHANGEDB_find_accounts (cfg, @@ -687,8 +696,8 @@ parse_wirewatch_config ()                                    NULL);    if (NULL == wa_head)    { -    fprintf (stderr, -             "No wire accounts configured for debit!\n"); +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "No wire accounts configured for debit!\n");      TALER_EXCHANGEDB_plugin_unload (db_plugin);      db_plugin = NULL;      return GNUNET_SYSERR; @@ -732,7 +741,7 @@ refund_by_coin_cb (void *cls,   * Function called with details about deposits that have been made,   * with the goal of executing the corresponding wire transaction.   * - * @param cls NULL + * @param cls a `struct AggregationUnit`   * @param row_id identifies database entry   * @param merchant_pub public key of the merchant   * @param coin_pub public key of the coin @@ -755,6 +764,7 @@ deposit_cb (void *cls,              struct GNUNET_TIME_Absolute wire_deadline,              const json_t *wire)  { +  struct AggregationUnit *au = cls;    enum GNUNET_DB_QueryStatus qs;    (void) cls; @@ -886,7 +896,7 @@ deposit_cb (void *cls,   * Function called with details about another deposit we   * can aggregate into an existing aggregation unit.   * - * @param cls NULL + * @param cls a `struct AggregationUnit`   * @param row_id identifies database entry   * @param merchant_pub public key of the merchant   * @param coin_pub public key of the coin @@ -909,16 +919,25 @@ aggregate_cb (void *cls,                struct GNUNET_TIME_Absolute wire_deadline,                const json_t *wire)  { +  struct AggregationUnit *au = cls;    struct TALER_Amount delta;    enum GNUNET_DB_QueryStatus qs; -  (void) cls;    /* NOTE: potential optimization: use custom SQL API to not       fetch these: */    (void) wire_deadline; /* checked by SQL */    (void) wire; /* must match */    GNUNET_break (0 == GNUNET_memcmp (&au->merchant_pub,                                      merchant_pub)); + +  if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) +  { +    /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ +    GNUNET_break (0); +    /* Skip this one, but keep going with the overall transaction */ +    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; +  } +    /* compute contribution of this coin after fees */    /* add to total */    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -968,16 +987,10 @@ aggregate_cb (void *cls,      au->total_amount = delta;    } -  if (au->rows_offset >= aggregation_limit) -  { -    /* Bug: we asked for at most #aggregation_limit results! */ -    GNUNET_break (0); -    /* Skip this one, but keep going. */ -    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -  }    if (NULL == au->additional_rows) -    au->additional_rows = GNUNET_new_array (aggregation_limit, -                                            unsigned long long); +    au->additional_rows = GNUNET_new_array ( +      TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, +      unsigned long long);    /* "append" to our list of rows */    au->additional_rows[au->rows_offset++] = row_id;    /* insert into aggregation tracking table */ @@ -990,9 +1003,6 @@ aggregate_cb (void *cls,      GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);      return qs;    } -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Aggregator marks aggregated deposit %llu as DONE\n", -              (unsigned long long) row_id);    qs = db_plugin->mark_deposit_done (db_plugin->cls,                                       au->session,                                       row_id); @@ -1002,7 +1012,7 @@ aggregate_cb (void *cls,      return qs;    }    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Added row %llu with %s to aggregation\n", +              "Aggregator marked deposit %llu over %s as DONE\n",                (unsigned long long) row_id,                TALER_amount2s (&delta));    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; @@ -1097,7 +1107,9 @@ expired_reserve_cb (void *cls,    wa = find_account_by_payto_uri (account_payto_uri);    if (NULL == wa)    { -    GNUNET_break (0); +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "No wire account configured to deal with target URI `%s'\n", +                account_payto_uri);      global_ret = GNUNET_SYSERR;      GNUNET_SCHEDULER_shutdown ();      return GNUNET_DB_STATUS_HARD_ERROR; @@ -1127,7 +1139,7 @@ expired_reserve_cb (void *cls,    if ( (GNUNET_SYSERR == ret) ||         (GNUNET_NO == ret) )    { -    /* Closing fee higher than remaining balance, close +    /* Closing fee higher than or equal to remaining balance, close         without wire transfer. */      closing_fee = left;      GNUNET_assert (GNUNET_OK == @@ -1345,6 +1357,7 @@ static void  run_aggregation (void *cls)  {    static unsigned int swap; +  struct AggregationUnit au_active;    struct TALER_EXCHANGEDB_Session *session;    enum GNUNET_DB_QueryStatus qs;    const struct GNUNET_SCHEDULER_TaskContext *tc; @@ -1383,15 +1396,17 @@ run_aggregation (void *cls)      GNUNET_SCHEDULER_shutdown ();      return;    } -  au = GNUNET_new (struct AggregationUnit); -  au->session = session; +  memset (&au_active, +          0, +          sizeof (au_active)); +  au_active.session = session;    qs = db_plugin->get_ready_deposit (db_plugin->cls,                                       session,                                       &deposit_cb, -                                     au); +                                     &au_active);    if (0 >= qs)    { -    cleanup_au (); +    cleanup_au (&au_active);      db_plugin->rollback (db_plugin->cls,                           session);      if (GNUNET_DB_STATUS_HARD_ERROR == qs) @@ -1444,20 +1459,20 @@ run_aggregation (void *cls)    /* Now try to find other deposits to aggregate */    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Found ready deposit for %s, aggregating\n", -              TALER_B2S (&au->merchant_pub)); +              TALER_B2S (&au_active.merchant_pub));    qs = db_plugin->iterate_matching_deposits (db_plugin->cls,                                               session, -                                             &au->h_wire, -                                             &au->merchant_pub, +                                             &au_active.h_wire, +                                             &au_active.merchant_pub,                                               &aggregate_cb, -                                             au, -                                             aggregation_limit); +                                             &au_active, +                                             TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT);    if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) || -       (GNUNET_YES == au->failed) ) +       (GNUNET_YES == au_active.failed) )    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Failed to execute deposit iteration!\n"); -    cleanup_au (); +    cleanup_au (&au_active);      db_plugin->rollback (db_plugin->cls,                           session);      global_ret = GNUNET_SYSERR; @@ -1471,6 +1486,7 @@ run_aggregation (void *cls)                  "Serialization issue, trying again later!\n");      db_plugin->rollback (db_plugin->cls,                           session); +    cleanup_au (&au_active);      GNUNET_assert (NULL == task);      task = GNUNET_SCHEDULER_add_now (&run_aggregation,                                       NULL); @@ -1481,19 +1497,19 @@ run_aggregation (void *cls)       wire transfer method; Check if after rounding down, we still have       an amount to transfer, and if not mark as 'tiny'. */    if ( (GNUNET_OK != -        TALER_amount_subtract (&au->final_amount, -                               &au->total_amount, -                               &au->wire_fee)) || +        TALER_amount_subtract (&au_active.final_amount, +                               &au_active.total_amount, +                               &au_active.wire_fee)) ||         (GNUNET_SYSERR == -        TALER_amount_round_down (&au->final_amount, +        TALER_amount_round_down (&au_active.final_amount,                                   ¤cy_round_unit)) || -       ( (0 == au->final_amount.value) && -         (0 == au->final_amount.fraction) ) ) +       ( (0 == au_active.final_amount.value) && +         (0 == au_active.final_amount.fraction) ) )    {      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Aggregate value too low for transfer (%d/%s)\n",                  qs, -                TALER_amount2s (&au->final_amount)); +                TALER_amount2s (&au_active.final_amount));      /* Rollback ongoing transaction, as we will not use the respective         WTID and thus need to remove the tracking data */      db_plugin->rollback (db_plugin->cls, @@ -1509,21 +1525,21 @@ run_aggregation (void *cls)        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                    "Failed to start database transaction!\n");        global_ret = GNUNET_SYSERR; -      cleanup_au (); +      cleanup_au (&au_active);        GNUNET_SCHEDULER_shutdown ();        return;      }      /* Mark transactions by row_id as minor */      qs = db_plugin->mark_deposit_tiny (db_plugin->cls,                                         session, -                                       au->row_id); +                                       au_active.row_id);      if (0 <= qs)      { -      for (unsigned int i = 0; i<au->rows_offset; i++) +      for (unsigned int i = 0; i<au_active.rows_offset; i++)        {          qs = db_plugin->mark_deposit_tiny (db_plugin->cls,                                             session, -                                           au->additional_rows[i]); +                                           au_active.additional_rows[i]);          if (0 > qs)            break;        } @@ -1534,7 +1550,7 @@ run_aggregation (void *cls)                    "Serialization issue, trying again later!\n");        db_plugin->rollback (db_plugin->cls,                             session); -      cleanup_au (); +      cleanup_au (&au_active);        /* start again */        GNUNET_assert (NULL == task);        task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -1545,13 +1561,14 @@ run_aggregation (void *cls)      {        db_plugin->rollback (db_plugin->cls,                             session); -      cleanup_au (); +      cleanup_au (&au_active);        GNUNET_SCHEDULER_shutdown ();        return;      }      /* commit */      (void) commit_or_warn (session); -    cleanup_au (); +    cleanup_au (&au_active); +      /* start again */      GNUNET_assert (NULL == task);      task = GNUNET_SCHEDULER_add_now (&run_aggregation, @@ -1561,34 +1578,34 @@ run_aggregation (void *cls)    {      char *amount_s; -    amount_s = TALER_amount_to_string (&au->final_amount); +    amount_s = TALER_amount_to_string (&au_active.final_amount);      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Preparing wire transfer of %s to %s\n",                  amount_s, -                TALER_B2S (&au->merchant_pub)); +                TALER_B2S (&au_active.merchant_pub));      GNUNET_free (amount_s);    }    {      char *url; -    url = TALER_JSON_wire_to_payto (au->wire); +    url = TALER_JSON_wire_to_payto (au_active.wire);      TALER_BANK_prepare_transfer (url, -                                 &au->final_amount, +                                 &au_active.final_amount,                                   exchange_base_url, -                                 &au->wtid, +                                 &au_active.wtid,                                   &buf,                                   &buf_size);      GNUNET_free (url);    } -  GNUNET_free_non_null (au->additional_rows); -  au->additional_rows = NULL; +  GNUNET_free_non_null (au_active.additional_rows); +  au_active.additional_rows = NULL;    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                "Storing %u bytes of wire prepare data\n",                (unsigned int) buf_size);    /* Commit our intention to execute the wire transfer! */    qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,                                              session, -                                            au->wa->method, +                                            au_active.wa->method,                                              buf,                                              buf_size);    GNUNET_free (buf); @@ -1597,12 +1614,13 @@ run_aggregation (void *cls)    if (qs >= 0)      qs = db_plugin->store_wire_transfer_out (db_plugin->cls,                                               session, -                                             au->execution_time, -                                             &au->wtid, -                                             au->wire, -                                             au->wa->section_name, -                                             &au->final_amount); -  cleanup_au (); +                                             au_active.execution_time, +                                             &au_active.wtid, +                                             au_active.wire, +                                             au_active.wa->section_name, +                                             &au_active.final_amount); +  cleanup_au (&au_active); +    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)    {      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1912,10 +1930,9 @@ run (void *cls,    (void) args;    (void) cfgfile; -  cfg = GNUNET_CONFIGURATION_dup (c); +  cfg = c;    if (GNUNET_OK != parse_wirewatch_config ())    { -    GNUNET_CONFIGURATION_destroy (cfg);      cfg = NULL;      global_ret = 1;      return; @@ -1966,7 +1983,7 @@ main (int argc,        GNUNET_PROGRAM_run (argc, argv,                            "taler-exchange-aggregator",                            gettext_noop ( -                            "background process that aggregates and executes wire transfers to merchants"), +                            "background process that aggregates and executes wire transfers"),                            options,                            &run, NULL))    { | 
