diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 448 | 
1 files changed, 199 insertions, 249 deletions
| diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 71a3efd7..2704f591 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -1,6 +1,6 @@  /*    This file is part of TALER -  Copyright (C) 2016-2018 Taler Systems SA +  Copyright (C) 2016-2020 Taler Systems SA    TALER is free software; you can redistribute it and/or modify it under the    terms of the GNU Affero General Public License as published by the Free Software @@ -26,6 +26,7 @@  #include "taler_exchangedb_lib.h"  #include "taler_exchangedb_plugin.h"  #include "taler_json_lib.h" +#include "taler_bank_service.h"  #include "taler_wire_lib.h" @@ -45,9 +46,14 @@ struct WireAccount    struct WireAccount *prev;    /** -   * Handle to the plugin. +   * Account information.     */ -  struct TALER_WIRE_Plugin *wire_plugin; +  struct TALER_Account account; + +  /** +   * Authentication data. +   */ +  struct TALER_BANK_AuthenticationData auth;    /**     * Wire transfer fee structure. @@ -59,6 +65,11 @@ struct WireAccount     */    char *section_name; +  /** +   * Name of the wire method underlying the account. +   */ +  char *method; +  }; @@ -77,7 +88,7 @@ struct WirePrepareData    /**     * Wire execution handle.     */ -  struct TALER_WIRE_ExecuteHandle *eh; +  struct TALER_BANK_WireExecuteHandle *eh;    /**     * Wire plugin used for this preparation. @@ -187,10 +198,6 @@ struct AggregationUnit   */  struct CloseTransferContext  { -  /** -   * Handle for preparing the wire transfer. -   */ -  struct TALER_WIRE_PrepareHandle *ph;    /**     * Our database session. @@ -263,6 +270,16 @@ static struct WirePrepareData *wpd;  static struct AggregationUnit *au;  /** + * Handle to the context for interacting with the bank. + */ +static struct GNUNET_CURL_Context *ctx; + +/** + * Scheduler context for running the @e ctx. + */ +static struct GNUNET_CURL_RescheduleContext *rc; + +/**   * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR   * on serious errors.   */ @@ -339,7 +356,7 @@ update_fees (struct WireAccount *wa,      return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;    /* Let's try to load it from disk... */    wa->af = TALER_EXCHANGEDB_fees_read (cfg, -                                       wa->wire_plugin->method); +                                       wa->method);    advance_fees (wa,                  now);    for (struct TALER_EXCHANGEDB_AggregateFees *p = wa->af; @@ -348,7 +365,7 @@ update_fees (struct WireAccount *wa,    {      qs = db_plugin->insert_wire_fee (db_plugin->cls,                                       session, -                                     wa->wire_plugin->method, +                                     wa->method,                                       p->start_date,                                       p->end_date,                                       &p->wire_fee, @@ -365,7 +382,7 @@ update_fees (struct WireAccount *wa,      return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                "Failed to find current wire transfer fees for `%s'\n", -              wa->wire_plugin->method); +              wa->method);    return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;  } @@ -381,7 +398,7 @@ find_account_by_method (const char *method)  {    for (struct WireAccount *wa = wa_head; NULL != wa; wa = wa->next)      if (0 == strcmp (method, -                     wa->wire_plugin->method)) +                     wa->method))        return wa;    return NULL;  } @@ -431,13 +448,40 @@ add_account_cb (void *cls,    if (GNUNET_YES != ai->debit_enabled)      return; /* not enabled for us, skip */    wa = GNUNET_new (struct WireAccount); -  wa->wire_plugin = TALER_WIRE_plugin_load (cfg, -                                            ai->plugin_name); -  if (NULL == wa->wire_plugin) +  if (GNUNET_OK != +      GNUNET_CONFIGURATION_get_value_string (cfg, +                                             ai->section_name, +                                             "METHOD", +                                             &wa->method))    { -    fprintf (stderr, -             "Failed to load wire plugin for `%s'\n", -             ai->plugin_name); +    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, +                               ai->section_name, +                               "METHOD"); +    GNUNET_free (wa); +    return; +  } +  if (GNUNET_OK != +      TALER_BANK_auth_parse_cfg (cfg, +                                 ai->section_name, +                                 &wa->auth)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, +                "Failed to load account `%s'\n", +                ai->section_name); +    GNUNET_free (wa->method); +    GNUNET_free (wa); +    return; +  } +  if (GNUNET_OK != +      TALER_BANK_account_parse_cfg (cfg, +                                    ai->section_name, +                                    &wa->account)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, +                "Failed to load account `%s'\n", +                ai->section_name); +    TALER_BANK_auth_free (&wa->auth); +    GNUNET_free (wa->method);      GNUNET_free (wa);      return;    } @@ -476,6 +520,16 @@ static void  shutdown_task (void *cls)  {    (void) cls; +  if (NULL != ctx) +  { +    GNUNET_CURL_fini (ctx); +    ctx = NULL; +  } +  if (NULL != rc) +  { +    GNUNET_CURL_gnunet_rc_destroy (rc); +    rc = NULL; +  }    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Running shutdown\n");    if (NULL != task) @@ -487,9 +541,7 @@ shutdown_task (void *cls)    {      if (NULL != wpd->eh)      { -      wpd->wa->wire_plugin->execute_wire_transfer_cancel ( -        wpd->wa->wire_plugin->cls, -        wpd->eh); +      TALER_BANK_execute_wire_transfer_cancel (wpd->eh);        wpd->eh = NULL;      }      db_plugin->rollback (db_plugin->cls, @@ -499,23 +551,12 @@ shutdown_task (void *cls)    }    if (NULL != au)    { -    if (NULL != au->ph) -    { -      au->wa->wire_plugin->prepare_wire_transfer_cancel ( -        au->wa->wire_plugin->cls, -        au->ph); -      au->ph = NULL; -    }      db_plugin->rollback (db_plugin->cls,                           au->session);      cleanup_au ();    }    if (NULL != ctc)    { -    ctc->wa->wire_plugin->prepare_wire_transfer_cancel ( -      ctc->wa->wire_plugin->cls, -      ctc->ph); -    ctc->ph = NULL;      db_plugin->rollback (db_plugin->cls,                           ctc->session);      GNUNET_free (ctc->method); @@ -532,9 +573,11 @@ shutdown_task (void *cls)        GNUNET_CONTAINER_DLL_remove (wa_head,                                     wa_tail,                                     wa); -      TALER_WIRE_plugin_unload (wa->wire_plugin); +      TALER_WIRE_account_free (&wa->account); +      TALER_BANK_auth_free (&wa->auth);        TALER_EXCHANGEDB_fees_free (wa->af);        GNUNET_free (wa->section_name); +      GNUNET_free (wa->method);        GNUNET_free (wa);      }    } @@ -922,20 +965,6 @@ aggregate_cb (void *cls,  /** - * Function to be called with the prepared transfer data - * when running an aggregation on a merchant. - * - * @param cls closure with the `struct AggregationUnit` - * @param buf transaction data to persist, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -prepare_cb (void *cls, -            const char *buf, -            size_t buf_size); - - -/**   * Main work function that finds and triggers transfers for reserves   * closures.   * @@ -989,83 +1018,6 @@ commit_or_warn (struct TALER_EXCHANGEDB_Session *session)  /** - * Function to be called with the prepared transfer data - * when closing a reserve. - * - * @param cls closure with a `struct CloseTransferContext` - * @param buf transaction data to persist, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -prepare_close_cb (void *cls, -                  const char *buf, -                  size_t buf_size) -{ -  enum GNUNET_DB_QueryStatus qs; - -  GNUNET_assert (cls == ctc); - -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, -              "Prepared for reserve closing\n"); -  ctc->ph = NULL; -  if (NULL == buf) -  { -    GNUNET_break (0); /* why? how to best recover? */ -    db_plugin->rollback (db_plugin->cls, -                         ctc->session); -    /* start again */ -    GNUNET_free (ctc->method); -    GNUNET_free (ctc); -    ctc = NULL; -    task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); -    return; -  } - -  /* Commit our intention to execute the wire transfer! */ -  qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, -                                            ctc->session, -                                            ctc->method, -                                            buf, -                                            buf_size); -  if (GNUNET_DB_STATUS_HARD_ERROR == qs) -  { -    GNUNET_break (0); -    db_plugin->rollback (db_plugin->cls, -                         ctc->session); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    GNUNET_free (ctc->method); -    GNUNET_free (ctc); -    ctc = NULL; -    return; -  } -  if (GNUNET_DB_STATUS_SOFT_ERROR == qs) -  { -    db_plugin->rollback (db_plugin->cls, -                         ctc->session); -    /* start again */ -    task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); -    GNUNET_free (ctc->method); -    GNUNET_free (ctc); -    ctc = NULL; -    return; -  } - -  /* finally commit */ -  (void) commit_or_warn (ctc->session); -  GNUNET_free (ctc->method); -  GNUNET_free (ctc); -  ctc = NULL; -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, -              "Reserve closure committed, running transfer\n"); -  task = GNUNET_SCHEDULER_add_now (&run_transfers, -                                   NULL); -} - - -/**   * Closure for #expired_reserve_cb().   */  struct ExpiredReserveContext @@ -1113,6 +1065,8 @@ expired_reserve_cb (void *cls,    int ret;    enum GNUNET_DB_QueryStatus qs;    struct WireAccount *wa; +  void *buf; +  size_t buf_size;    /* NOTE: potential optimization: use custom SQL API to not       fetch this: */ @@ -1121,7 +1075,7 @@ expired_reserve_cb (void *cls,    now = GNUNET_TIME_absolute_get ();    (void) GNUNET_TIME_round_abs (&now); -  /* lookup wire plugin */ +  /* lookup account we should use */    wa = find_account_by_url (account_details);    if (NULL == wa)    { @@ -1161,6 +1115,18 @@ expired_reserve_cb (void *cls,                     TALER_amount_get_zero (left->currency,                                            &amount_without_fee));    } +  /* round down to enable transfer */ +  if (GNUNET_SYSERR == +      TALER_WIRE_amount_round (&amount_without_fee)) +  { +    GNUNET_break (0); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    return GNUNET_DB_STATUS_HARD_ERROR; +  } +  if ( (0 == amount_without_fee.value) && +       (0 == amount_without_fee.fraction) ) +    ret = GNUNET_NO;    /* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to       be future-compatible, we use the memset + min construction */ @@ -1171,61 +1137,23 @@ expired_reserve_cb (void *cls,            reserve_pub,            GNUNET_MIN (sizeof (wtid),                        sizeof (*reserve_pub))); - -  qs = db_plugin->insert_reserve_closed (db_plugin->cls, -                                         session, -                                         reserve_pub, -                                         now, -                                         account_details, -                                         &wtid, -                                         left, -                                         closing_fee); - +  if (GNUNET_SYSERR != ret) +    qs = db_plugin->insert_reserve_closed (db_plugin->cls, +                                           session, +                                           reserve_pub, +                                           now, +                                           account_details, +                                           &wtid, +                                           left, +                                           closing_fee); +  else +    ret = GNUNET_DB_STATUS_HARD_ERROR;    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Closing reserve %s over %s (%d, %d)\n",                TALER_B2S (reserve_pub),                TALER_amount2s (left),                ret,                qs); -  if ( (GNUNET_OK == ret) && -       (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) ) -  { -    /* success, perform wire transfer */ -    if (GNUNET_SYSERR == -        wa->wire_plugin->amount_round (wa->wire_plugin->cls, -                                       &amount_without_fee)) -    { -      GNUNET_break (0); -      global_ret = GNUNET_SYSERR; -      GNUNET_SCHEDULER_shutdown (); -      return GNUNET_DB_STATUS_HARD_ERROR; -    } -    ctc = GNUNET_new (struct CloseTransferContext); -    ctc->wa = wa; -    ctc->session = session; -    ctc->method = TALER_WIRE_payto_get_method (account_details); -    ctc->ph -      = wa->wire_plugin->prepare_wire_transfer (wa->wire_plugin->cls, -                                                wa->section_name, -                                                account_details, -                                                &amount_without_fee, -                                                exchange_base_url, -                                                &wtid, -                                                &prepare_close_cb, -                                                ctc); -    if (NULL == ctc->ph) -    { -      GNUNET_break (0); -      global_ret = GNUNET_SYSERR; -      GNUNET_SCHEDULER_shutdown (); -      GNUNET_free (ctc->method); -      GNUNET_free (ctc); -      ctc = NULL; -      return GNUNET_DB_STATUS_HARD_ERROR; -    } -    erc->async_cont = GNUNET_YES; -    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -  }    /* Check for hard failure */    if ( (GNUNET_SYSERR == ret) ||         (GNUNET_DB_STATUS_HARD_ERROR == qs) ) @@ -1235,10 +1163,59 @@ expired_reserve_cb (void *cls,      GNUNET_SCHEDULER_shutdown ();      return GNUNET_DB_STATUS_HARD_ERROR;    } -  /* Reserve balance was almost zero OR soft error */ -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, -              "Reserve was virtually empty, moving on\n"); -  return qs; +  if ( (GNUNET_OK != ret) || +       (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) ) +  { +    /* Reserve balance was almost zero OR soft error */ +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Reserve was virtually empty, moving on\n"); +    (void) commit_or_warn (ctc->session); +    GNUNET_free (ctc->method); +    GNUNET_free (ctc); +    ctc = NULL; +    task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                     NULL); +    return qs; +  } + +  /* success, perform wire transfer */ +  ctc = GNUNET_new (struct CloseTransferContext); +  ctc->wa = wa; +  ctc->session = session; +  ctc->method = TALER_WIRE_payto_get_method (account_details); +  TALER_BANK_prepare_wire_transfer (account_details, +                                    &amount_without_fee, +                                    exchange_base_url, +                                    &wtid, +                                    &buf, +                                    &buf_size); +  /* Commit our intention to execute the wire transfer! */ +  qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, +                                            ctc->session, +                                            ctc->method, +                                            buf, +                                            buf_size); +  GNUNET_free (buf); +  if (GNUNET_DB_STATUS_HARD_ERROR == qs) +  { +    GNUNET_break (0); +    GNUNET_free (ctc->method); +    GNUNET_free (ctc); +    ctc = NULL; +    return GNUNET_DB_STATUS_HARD_ERROR; +  } +  if (GNUNET_DB_STATUS_SOFT_ERROR == qs) +  { +    /* start again */ +    GNUNET_free (ctc->method); +    GNUNET_free (ctc); +    ctc = NULL; +    return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS; +  } +  erc->async_cont = GNUNET_YES; +  task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                   NULL); +  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;  } @@ -1344,6 +1321,8 @@ run_aggregation (void *cls)    struct TALER_EXCHANGEDB_Session *session;    enum GNUNET_DB_QueryStatus qs;    const struct GNUNET_SCHEDULER_TaskContext *tc; +  void *buf; +  size_t buf_size;    (void) cls;    task = NULL; @@ -1470,8 +1449,7 @@ run_aggregation (void *cls)                                 &au->total_amount,                                 &au->wire_fee)) ||         (GNUNET_SYSERR == -        au->wa->wire_plugin->amount_round (au->wa->wire_plugin->cls, -                                           &au->final_amount)) || +        TALER_WIRE_amount_round (&au->final_amount)) ||         ( (0 == au->final_amount.value) &&           (0 == au->final_amount.fraction) ) )    { @@ -1555,70 +1533,26 @@ run_aggregation (void *cls)      char *url;      url = TALER_JSON_wire_to_payto (au->wire); -    au->ph = au->wa->wire_plugin->prepare_wire_transfer ( -      au->wa->wire_plugin->cls, -      au->wa->section_name, -      url, -      &au->final_amount, -      exchange_base_url, -      &au->wtid, -      &prepare_cb, -      au); +    TALER_BANK_prepare_wire_transfer (url, +                                      &au->final_amount, +                                      exchange_base_url, +                                      &au->wtid, +                                      &buf, +                                      &buf_size);      GNUNET_free (url);    } -  if (NULL == au->ph) -  { -    /* something went very wrong, likely bad configuration, -       abort */ -    db_plugin->rollback (db_plugin->cls, -                         session); -    cleanup_au (); -    GNUNET_SCHEDULER_shutdown (); -    return; -  } -  /* otherwise we continue with #prepare_cb(), see below */ -} - - -/** - * Function to be called with the prepared transfer data. - * - * @param cls NULL - * @param buf transaction data to persist, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -prepare_cb (void *cls, -            const char *buf, -            size_t buf_size) -{ -  struct TALER_EXCHANGEDB_Session *session = au->session; -  enum GNUNET_DB_QueryStatus qs; - -  (void) cls;    GNUNET_free_non_null (au->additional_rows);    au->additional_rows = NULL; -  if (NULL == buf) -  { -    GNUNET_break (0); /* why? how to best recover? */ -    db_plugin->rollback (db_plugin->cls, -                         session); -    /* start again */ -    task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); -    cleanup_au (); -    return; -  } -    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                "Storing %u bytes of wire prepare data\n",                (unsigned int) buf_size);    /* Commit our intention to execute the wire transfer! */    qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,                                              session, -                                            au->wa->wire_plugin->method, +                                            au->wa->method,                                              buf,                                              buf_size); +  GNUNET_free (buf);    /* Commit the WTID data to 'wire_out' to finally satisfy aggregation       table constraints */    if (qs >= 0) @@ -1691,29 +1625,30 @@ prepare_cb (void *cls,   * Function called with the result from the execute step.   *   * @param cls NULL - * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure - * @param serial_id unique ID of the wire transfer in the bank's records; UINT64_MAX on error - * @param emsg NULL on success, otherwise an error message + * @param http_status_code #MHD_HTTP_OK on success + * @param ec taler error code + * @param row_id unique ID of the wire transfer in the bank's records + * @param wire_timestamp when did the transfer happen   */  static void  wire_confirm_cb (void *cls, -                 int success, -                 const void *row_id, -                 size_t row_id_size, -                 const char *emsg) +                 unsigned int http_status_code, +                 enum TALER_ErrorCode ec, +                 uint64_t row_id, +                 struct GNUNET_TIME_Absolute wire_timestamp)  {    struct TALER_EXCHANGEDB_Session *session = wpd->session;    enum GNUNET_DB_QueryStatus qs;    (void) cls;    (void) row_id; -  (void) row_id_size;    wpd->eh = NULL; -  if (GNUNET_SYSERR == success) +  if (MHD_HTTP_OK != http_status_code)    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Wire transaction failed: %s\n", -                emsg); +                "Wire transaction failed: %u/%d\n", +                http_status_code, +                ec);      db_plugin->rollback (db_plugin->cls,                           session);      global_ret = GNUNET_SYSERR; @@ -1792,6 +1727,8 @@ wire_prepare_cb (void *cls,                   const char *buf,                   size_t buf_size)  { +  struct WireAccount *wa; +    (void) cls;    wpd->row_id = rowid;    GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1811,12 +1748,15 @@ wire_prepare_cb (void *cls,      wpd = NULL;      return;    } -  wpd->eh = wpd->wa->wire_plugin->execute_wire_transfer ( -    wpd->wa->wire_plugin->cls, -    buf, -    buf_size, -    &wire_confirm_cb, -    NULL); +  wa = wpd->wa; +  wpd->eh = TALER_BANK_execute_wire_transfer (ctx, +                                              wa->account.details.x_taler_bank. +                                              account_base_url, +                                              &wa->auth, +                                              buf, +                                              buf_size, +                                              &wire_confirm_cb, +                                              NULL);    if (NULL == wpd->eh)    {      GNUNET_break (0); /* why? how to best recover? */ @@ -1927,6 +1867,7 @@ run (void *cls,    (void) cls;    (void) args;    (void) cfgfile; +    if (GNUNET_OK !=        GNUNET_CONFIGURATION_get_value_string (c,                                               "exchange", @@ -1947,6 +1888,15 @@ run (void *cls,      global_ret = 1;      return;    } +  ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, +                          &rc); +  rc = GNUNET_CURL_gnunet_rc_create (ctx); +  if (NULL == ctx) +  { +    GNUNET_break (0); +    return; +  } +    task = GNUNET_SCHEDULER_add_now (&run_transfers,                                     NULL);    GNUNET_SCHEDULER_add_shutdown (&shutdown_task, | 
