diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 545 | 
1 files changed, 119 insertions, 426 deletions
| diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index c34d47f9..04cf426d 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -28,18 +28,6 @@  #include "taler_json_lib.h"  #include "taler_bank_service.h" -struct AdditionalDeposit -{ -  /** -   * Public key of the coin. -   */ -  struct TALER_CoinSpendPublicKeyP coin_pub; - -  /** -   * Row of the deposit. -   */ -  uint64_t row; -};  /**   * Information about one aggregation process to be executed.  There is @@ -55,11 +43,6 @@ struct AggregationUnit    struct TALER_MerchantPublicKeyP merchant_pub;    /** -   * Public key of the coin. -   */ -  struct TALER_CoinSpendPublicKeyP coin_pub; - -  /**     * Total amount to be transferred, before subtraction of @e fees.wire and rounding down.     */    struct TALER_Amount total_amount; @@ -80,11 +63,6 @@ struct AggregationUnit    struct TALER_WireTransferIdentifierRawP wtid;    /** -   * Row ID of the transaction that started it all. -   */ -  uint64_t row_id; - -  /**     * The current time (which triggered the aggregation and     * defines the wire fee).     */ @@ -101,32 +79,11 @@ struct AggregationUnit    struct TALER_PaytoHashP h_payto;    /** -   * Serial number of the wire target. -   */ -  uint64_t wire_target; - -  /**     * Exchange wire account to be used for the preparation and     * eventual execution of the aggregate wire transfer.     */    const struct TALER_EXCHANGEDB_AccountInfo *wa; -  /** -   * Array of row_ids from the aggregation. -   */ -  struct AdditionalDeposit -    additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; - -  /** -   * Offset specifying how many @e additional_rows are in use. -   */ -  unsigned int rows_offset; - -  /** -   * Set to true if we encountered a refund during #refund_by_coin_cb. -   * Used to wave the deposit fee. -   */ -  bool have_refund;  }; @@ -341,331 +298,6 @@ parse_wirewatch_config (void)  /** - * Callback invoked with information about refunds applicable - * to a particular coin.  Subtract refunded amount(s) from - * the aggregation unit's total amount. - * - * @param cls closure with a `struct AggregationUnit *` - * @param amount_with_fee what was the refunded amount with the fee - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop - */ -static enum GNUNET_GenericReturnValue -refund_by_coin_cb (void *cls, -                   const struct TALER_Amount *amount_with_fee) -{ -  struct AggregationUnit *aux = cls; - -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Aggregator subtracts applicable refund of amount %s\n", -              TALER_amount2s (amount_with_fee)); -  aux->have_refund = true; -  if (0 > -      TALER_amount_subtract (&aux->total_amount, -                             &aux->total_amount, -                             amount_with_fee)) -  { -    GNUNET_break (0); -    return GNUNET_SYSERR; -  } -  return GNUNET_OK; -} - - -/** - * Function called with details about deposits that have been made, - * with the goal of executing the corresponding wire transaction. - * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param merchant_pub public key of the merchant - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @param wire_target target account for the wire transfer - * @param payto_uri URI of the target account - * @return transaction status code,  #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate - */ -static enum GNUNET_DB_QueryStatus -deposit_cb (void *cls, -            uint64_t row_id, -            const struct TALER_MerchantPublicKeyP *merchant_pub, -            const struct TALER_CoinSpendPublicKeyP *coin_pub, -            const struct TALER_Amount *amount_with_fee, -            const struct TALER_Amount *deposit_fee, -            const struct TALER_PrivateContractHashP *h_contract_terms, -            uint64_t wire_target, -            const char *payto_uri) -{ -  struct AggregationUnit *au = cls; -  enum GNUNET_DB_QueryStatus qs; - -  au->merchant_pub = *merchant_pub; -  au->coin_pub = *coin_pub; -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Aggregator processing payment %s with amount %s\n", -              TALER_B2S (coin_pub), -              TALER_amount2s (amount_with_fee)); -  au->row_id = row_id; -  au->total_amount = *amount_with_fee; -  au->have_refund = false; -  qs = db_plugin->select_refunds_by_coin (db_plugin->cls, -                                          coin_pub, -                                          &au->merchant_pub, -                                          h_contract_terms, -                                          &refund_by_coin_cb, -                                          au); -  if (0 > qs) -  { -    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); -    return qs; -  } -  if (! au->have_refund) -  { -    struct TALER_Amount ntotal; - -    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -                "Non-refunded transaction, subtracting deposit fee %s\n", -                TALER_amount2s (deposit_fee)); -    if (0 > -        TALER_amount_subtract (&ntotal, -                               amount_with_fee, -                               deposit_fee)) -    { -      /* This should never happen, issue a warning, but continue processing -         with an amount of zero, least we hang here for good. */ -      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                  "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n", -                  (unsigned long long) row_id, -                  TALER_amount2s (amount_with_fee)); -      GNUNET_assert (GNUNET_OK == -                     TALER_amount_set_zero (au->total_amount.currency, -                                            &au->total_amount)); -    } -    else -    { -      au->total_amount = ntotal; -    } -  } -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Amount after fee is %s\n", -              TALER_amount2s (&au->total_amount)); - -  GNUNET_assert (NULL == au->payto_uri); -  au->payto_uri = GNUNET_strdup (payto_uri); -  TALER_payto_hash (payto_uri, -                    &au->h_payto); -  au->wire_target = wire_target; -  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, -                              &au->wtid, -                              sizeof (au->wtid)); -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n", -              TALER_B2S (&au->wtid), -              TALER_amount2s (amount_with_fee), -              (unsigned long long) row_id); -  au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (payto_uri); -  if (NULL == au->wa) -  { -    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "No exchange account configured for `%s', please fix your setup to continue!\n", -                payto_uri); -    return GNUNET_DB_STATUS_HARD_ERROR; -  } - -  /* make sure we have current fees */ -  au->execution_time = GNUNET_TIME_timestamp_get (); -  { -    struct GNUNET_TIME_Timestamp start_date; -    struct GNUNET_TIME_Timestamp end_date; -    struct TALER_MasterSignatureP master_sig; -    enum GNUNET_DB_QueryStatus qs; - -    qs = db_plugin->get_wire_fee (db_plugin->cls, -                                  au->wa->method, -                                  au->execution_time, -                                  &start_date, -                                  &end_date, -                                  &au->fees, -                                  &master_sig); -    if (0 >= qs) -    { -      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                  "Could not get wire fees for %s at %s. Aborting run.\n", -                  au->wa->method, -                  GNUNET_TIME_timestamp2s (au->execution_time)); -      return GNUNET_DB_STATUS_HARD_ERROR; -    } -  } - -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n", -              (unsigned long long) row_id, -              TALER_B2S (&au->wtid), -              TALER_amount2s (&au->fees.wire)); -  qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, -                                               &au->wtid, -                                               row_id); -  if (qs <= 0) -  { -    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); -    return qs; -  } -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Aggregator marks deposit %llu as done\n", -              (unsigned long long) row_id); -  qs = db_plugin->mark_deposit_done (db_plugin->cls, -                                     coin_pub, -                                     row_id); -  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) -  { -    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); -    return qs; -  } -  return qs; -} - - -/** - * Function called with details about another deposit we - * can aggregate into an existing aggregation unit. - * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @return transaction status code - */ -static enum GNUNET_DB_QueryStatus -aggregate_cb (void *cls, -              uint64_t row_id, -              const struct TALER_CoinSpendPublicKeyP *coin_pub, -              const struct TALER_Amount *amount_with_fee, -              const struct TALER_Amount *deposit_fee, -              const struct TALER_PrivateContractHashP *h_contract_terms) -{ -  struct AggregationUnit *au = cls; -  struct TALER_Amount old; -  enum GNUNET_DB_QueryStatus qs; - -  if (row_id == au->row_id) -    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -  if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) -  { -    /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ -    GNUNET_break (0); -    /* Skip this one, but keep going with the overall transaction */ -    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -  } - -  /* add to total */ -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Adding transaction amount %s from row %llu to aggregation\n", -              TALER_amount2s (amount_with_fee), -              (unsigned long long) row_id); -  /* save the existing total aggregate in 'old', for later */ -  old = au->total_amount; -  /* we begin with the total contribution of the current coin */ -  au->total_amount = *amount_with_fee; -  /* compute contribution of this coin (after fees) */ -  au->have_refund = false; -  qs = db_plugin->select_refunds_by_coin (db_plugin->cls, -                                          coin_pub, -                                          &au->merchant_pub, -                                          h_contract_terms, -                                          &refund_by_coin_cb, -                                          au); -  if (0 > qs) -  { -    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); -    return qs; -  } -  if (! au->have_refund) -  { -    struct TALER_Amount tmp; - -    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -                "Subtracting deposit fee %s for non-refunded coin\n", -                TALER_amount2s (deposit_fee)); -    if (0 > -        TALER_amount_subtract (&tmp, -                               &au->total_amount, -                               deposit_fee)) -    { -      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                  "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n", -                  (unsigned long long) row_id, -                  TALER_amount2s (&au->total_amount)); -      GNUNET_assert (GNUNET_OK == -                     TALER_amount_set_zero (old.currency, -                                            &au->total_amount)); -    } -    else -    { -      au->total_amount = tmp; -    } -  } - -  /* now add the au->total_amount with the (remaining) contribution of -     the current coin to the 'old' value with the current aggregate value */ -  { -    struct TALER_Amount tmp; - -    if (0 > -        TALER_amount_add (&tmp, -                          &au->total_amount, -                          &old)) -    { -      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                  "Overflow or currency incompatibility during aggregation at %llu\n", -                  (unsigned long long) row_id); -      /* Skip this one, but keep going! */ -      au->total_amount = old; -      return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -    } -    au->total_amount = tmp; -  } - -  /* "append" to our list of rows */ -  au->additional_rows[au->rows_offset].coin_pub = *coin_pub; -  au->additional_rows[au->rows_offset].row = row_id; -  au->rows_offset++; -  /* insert into aggregation tracking table */ -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Adding %llu to aggregate %s\n", -              (unsigned long long) row_id, -              TALER_B2S (&au->wtid)); -  qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, -                                               &au->wtid, -                                               row_id); -  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) -  { -    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, -                "Failed to add %llu to aggregate %s: %d\n", -                (unsigned long long) row_id, -                TALER_B2S (&au->wtid), -                qs); -    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); -    return qs; -  } -  qs = db_plugin->mark_deposit_done (db_plugin->cls, -                                     coin_pub, -                                     row_id); -  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) -  { -    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); -    return qs; -  } -  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Aggregator marked deposit %llu as DONE\n", -              (unsigned long long) row_id); -  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -} - - -/**   * Perform a database commit. If it fails, print a warning.   *   * @return status of commit @@ -727,10 +359,17 @@ run_aggregation (void *cls)    struct Shard *s = cls;    struct AggregationUnit au_active;    enum GNUNET_DB_QueryStatus qs; +  struct TALER_Amount trans; +  bool have_transient;    task = NULL;    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Checking for ready deposits to aggregate\n"); +  /* make sure we have current fees */ +  memset (&au_active, +          0, +          sizeof (au_active)); +  au_active.execution_time = GNUNET_TIME_timestamp_get ();    if (GNUNET_OK !=        db_plugin->start_deferred_wire_out (db_plugin->cls))    { @@ -741,16 +380,13 @@ run_aggregation (void *cls)      release_shard (s);      return;    } -  memset (&au_active, -          0, -          sizeof (au_active));    qs = db_plugin->get_ready_deposit (      db_plugin->cls,      s->shard_start,      s->shard_end,      kyc_off ? true : false, -    &deposit_cb, -    &au_active); +    &au_active.merchant_pub, +    &au_active.payto_uri);    switch (qs)    {    case GNUNET_DB_STATUS_HARD_ERROR: @@ -808,22 +444,98 @@ run_aggregation (void *cls)      /* continued below */      break;    } +  au_active.wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( +    au_active.payto_uri); +  if (NULL == au_active.wa) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "No exchange account configured for `%s', please fix your setup to continue!\n", +                au_active.payto_uri); +    global_ret = EXIT_FAILURE; +    GNUNET_SCHEDULER_shutdown (); +    release_shard (s); +    return; +  } + +  { +    struct GNUNET_TIME_Timestamp start_date; +    struct GNUNET_TIME_Timestamp end_date; +    struct TALER_MasterSignatureP master_sig; + +    qs = db_plugin->get_wire_fee (db_plugin->cls, +                                  au_active.wa->method, +                                  au_active.execution_time, +                                  &start_date, +                                  &end_date, +                                  &au_active.fees, +                                  &master_sig); +    if (0 >= qs) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                  "Could not get wire fees for %s at %s. Aborting run.\n", +                  au_active.wa->method, +                  GNUNET_TIME_timestamp2s (au_active.execution_time)); +      global_ret = EXIT_FAILURE; +      GNUNET_SCHEDULER_shutdown (); +      release_shard (s); +      return; +    } +  } +    /* Now try to find other deposits to aggregate */    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Found ready deposit for %s, aggregating by target %llu\n", +              "Found ready deposit for %s, aggregating by target %s\n",                TALER_B2S (&au_active.merchant_pub), -              (unsigned long long) au_active.wire_target); -  qs = db_plugin->iterate_matching_deposits (db_plugin->cls, -                                             &au_active.h_payto, -                                             &au_active.merchant_pub, -                                             &aggregate_cb, -                                             &au_active, -                                             TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT); +              au_active.payto_uri); +  TALER_payto_hash (au_active.payto_uri, +                    &au_active.h_payto); + +  qs = db_plugin->select_aggregation_transient (db_plugin->cls, +                                                &au_active.h_payto, +                                                au_active.wa->section_name, +                                                &au_active.wtid, +                                                &trans); +  switch (qs) +  { +  case GNUNET_DB_STATUS_HARD_ERROR: +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to lookup transient aggregates!\n"); +    cleanup_au (&au_active); +    db_plugin->rollback (db_plugin->cls); +    global_ret = EXIT_FAILURE; +    GNUNET_SCHEDULER_shutdown (); +    release_shard (s); +    return; +  case GNUNET_DB_STATUS_SOFT_ERROR: +    /* serializiability issue, try again */ +    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +                "Serialization issue, trying again later!\n"); +    db_plugin->rollback (db_plugin->cls); +    cleanup_au (&au_active); +    GNUNET_assert (NULL == task); +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     s); +    return; +  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, +                                &au_active.wtid, +                                sizeof (au_active.wtid)); +    have_transient = false; +    break; +  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +    have_transient = true; +    break; +  } +  qs = db_plugin->aggregate (db_plugin->cls, +                             &au_active.h_payto, +                             &au_active.merchant_pub, +                             &au_active.wtid, +                             &au_active.total_amount);    if (GNUNET_DB_STATUS_HARD_ERROR == qs)    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Failed to execute deposit iteration!\n"); +                "Failed to execute aggregation!\n");      cleanup_au (&au_active);      db_plugin->rollback (db_plugin->cls);      global_ret = EXIT_FAILURE; @@ -844,13 +556,17 @@ run_aggregation (void *cls)      return;    }    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -              "Found %d other deposits to combine into wire transfer with fee %s.\n", -              qs, -              TALER_amount2s (&au_active.fees.wire)); +              "Aggregation total is %s.\n", +              TALER_amount2s (&au_active.total_amount));    /* Subtract wire transfer fee and round to the unit supported by the       wire transfer method; Check if after rounding down, we still have       an amount to transfer, and if not mark as 'tiny'. */ +  if (have_transient) +    GNUNET_assert (0 <= +                   TALER_amount_add (&au_active.total_amount, +                                     &au_active.total_amount, +                                     &trans));    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                "Rounding aggregate of %s\n",                TALER_amount2s (&au_active.total_amount)); @@ -867,45 +583,17 @@ run_aggregation (void *cls)                  "Aggregate value too low for transfer (%d/%s)\n",                  qs,                  TALER_amount2s (&au_active.final_amount)); -    /* Rollback ongoing transaction, as we will not use the respective -       WTID and thus need to remove the tracking data */ -    db_plugin->rollback (db_plugin->cls); - -    /* There were results, just the value was too low.  Start another -       transaction to mark all* of the selected deposits as minor! */ -    if (GNUNET_OK != -        db_plugin->start (db_plugin->cls, -                          "aggregator mark tiny transactions")) -    { -      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                  "Failed to start database transaction!\n"); -      global_ret = EXIT_FAILURE; -      cleanup_au (&au_active); -      GNUNET_SCHEDULER_shutdown (); -      release_shard (s); -      return; -    } -    /* Mark transactions by row_id as minor */ -    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -                "Marking %s (%llu) as tiny\n", -                TALER_B2S (&au_active.coin_pub), -                (unsigned long long) au_active.row_id); -    qs = db_plugin->mark_deposit_tiny (db_plugin->cls, -                                       &au_active.coin_pub, -                                       au_active.row_id); -    if (0 < qs) -    { -      for (unsigned int i = 0; i<au_active.rows_offset; i++) -      { -        qs = db_plugin->mark_deposit_tiny (db_plugin->cls, -                                           &au_active.additional_rows[i]. -                                           coin_pub, -                                           au_active.additional_rows[i].row); -        if (0 >= qs) -          break; -      } -    } -    GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs); +    if (have_transient) +      qs = db_plugin->update_aggregation_transient (db_plugin->cls, +                                                    &au_active.h_payto, +                                                    &au_active.wtid, +                                                    &au_active.total_amount); +    else +      qs = db_plugin->create_aggregation_transient (db_plugin->cls, +                                                    &au_active.h_payto, +                                                    au_active.wa->section_name, +                                                    &au_active.wtid, +                                                    &au_active.total_amount);      if (GNUNET_DB_STATUS_SOFT_ERROR == qs)      {        GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -962,8 +650,7 @@ run_aggregation (void *cls)                                                buf_size);      GNUNET_free (buf);    } -  /* Commit the WTID data to 'wire_out' to finally satisfy aggregation -     table constraints */ +  /* Commit the WTID data to 'wire_out'  */    if (qs >= 0)      qs = db_plugin->store_wire_transfer_out (db_plugin->cls,                                               au_active.execution_time, @@ -971,6 +658,12 @@ run_aggregation (void *cls)                                               &au_active.h_payto,                                               au_active.wa->section_name,                                               &au_active.final_amount); + +  if ( (qs >= 0) && +       have_transient) +    qs = db_plugin->delete_aggregation_transient (db_plugin->cls, +                                                  &au_active.h_payto, +                                                  &au_active.wtid);    cleanup_au (&au_active);    if (GNUNET_DB_STATUS_SOFT_ERROR == qs) | 
