diff options
| author | Christian Grothoff <grothoff@gnunet.org> | 2022-03-27 10:32:28 +0200 | 
|---|---|---|
| committer | Christian Grothoff <grothoff@gnunet.org> | 2022-03-27 10:32:28 +0200 | 
| commit | d0a69da8954fd72f361795c2e007bad3fe5accd1 (patch) | |
| tree | 821cc25e6f614ecfcd97d77bbc75ee24293887a7 /src | |
| parent | 646c9ad0611e5320a460206dec4fdfd516dc8f64 (diff) | |
towards removing tiny bit
Diffstat (limited to 'src')
| -rw-r--r-- | src/exchangedb/drop0001.sql | 2 | ||||
| -rw-r--r-- | src/exchangedb/exchange-0001.sql | 69 | ||||
| -rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 360 | ||||
| -rw-r--r-- | src/exchangedb/test_exchangedb.c | 166 | ||||
| -rw-r--r-- | src/include/taler_exchangedb_plugin.h | 92 | 
5 files changed, 582 insertions, 107 deletions
| diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql index 225c817a..3f43a569 100644 --- a/src/exchangedb/drop0001.sql +++ b/src/exchangedb/drop0001.sql @@ -82,9 +82,9 @@ DROP TABLE IF EXISTS denominations CASCADE;  DROP TABLE IF EXISTS cs_nonce_locks CASCADE;  DROP FUNCTION IF EXISTS add_constraints_to_cs_nonce_locks_partition; -DROP TABLE IF EXISTS deposits_by_coin CASCADE;  DROP TABLE IF EXISTS global_fee CASCADE;  DROP TABLE IF EXISTS recoup_by_reserve CASCADE; +DROP TABLE IF EXISTS aggregation_transient CASCADE;  DROP TABLE IF EXISTS partners CASCADE; diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql index b2fb52ac..e6902ed1 100644 --- a/src/exchangedb/exchange-0001.sql +++ b/src/exchangedb/exchange-0001.sql @@ -772,7 +772,7 @@ CREATE TABLE IF NOT EXISTS deposits_by_ready_default  CREATE TABLE IF NOT EXISTS deposits_for_matching    (refund_deadline INT8 NOT NULL -  ,shard INT8 NOT NULL +  ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)    ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) ON DELETE CASCADE    ,deposit_serial_id INT8    ) @@ -782,7 +782,7 @@ COMMENT ON TABLE deposits_for_matching  CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index    ON deposits_for_matching -  (refund_deadline ASC, shard, coin_pub); +  (refund_deadline ASC, merchant_pub, coin_pub);  CREATE TABLE IF NOT EXISTS deposits_for_matching_default    PARTITION OF deposits_for_matching @@ -818,12 +818,12 @@ BEGIN    THEN      INSERT INTO deposits_for_matching        (refund_deadline -      ,shard +      ,merchant_pub        ,coin_pub        ,deposit_serial_id)      VALUES        (NEW.refund_deadline -      ,NEW.shard +      ,NEW.merchant_pub        ,NEW.coin_pub        ,NEW.deposit_serial_id);    END IF; @@ -866,7 +866,7 @@ BEGIN    THEN      DELETE FROM deposits_for_matching       WHERE refund_deadline = OLD.refund_deadline -       AND shard = OLD.shard +       AND merchant_pub = OLD.merchant_pub         AND coin_pub = OLD.coin_pub         AND deposit_serial_id = OLD.deposit_serial_id;    END IF; @@ -887,12 +887,12 @@ BEGIN    THEN      INSERT INTO deposits_for_matching        (refund_deadline -      ,shard +      ,merchant_pub        ,coin_pub        ,deposit_serial_id)      VALUES        (NEW.refund_deadline -      ,NEW.shard +      ,NEW.merchant_pub        ,NEW.coin_pub        ,NEW.deposit_serial_id);    END IF; @@ -930,7 +930,7 @@ BEGIN    THEN      DELETE FROM deposits_for_matching       WHERE refund_deadline = OLD.refund_deadline -       AND shard = OLD.shard +       AND merchant_pub = OLD.merchant_pub         AND coin_pub = OLD.coin_pub         AND deposit_serial_id = OLD.deposit_serial_id;    END IF; @@ -1040,21 +1040,64 @@ $$;  SELECT add_constraints_to_wire_out_partition('default'); +CREATE OR REPLACE FUNCTION wire_out_delete_trigger() +  RETURNS trigger +  LANGUAGE plpgsql +  AS $$ +BEGIN +  DELETE FROM aggregation_tracking +   WHERE wtid_raw = OLD.wtid_raw; +  RETURN OLD; +END $$; +COMMENT ON FUNCTION wire_out_delete_trigger() +  IS 'Replicate reserve_out deletions into aggregation_tracking. This replaces an earlier use of an ON DELETE CASCADE that required a DEFERRABLE constraint and conflicted with nice partitioning.'; + +CREATE TRIGGER wire_out_on_delete +  AFTER DELETE +    ON wire_out +   FOR EACH ROW EXECUTE FUNCTION wire_out_delete_trigger(); + + + +-- ------------------------------ aggregation_transient ---------------------------------------- + +-- Note: this table is not yet used; it is designed +-- to allow us to get rid of the 'tiny BOOL' and +-- the associated need to look at tiny +-- deposits repeatedly. +CREATE TABLE IF NOT EXISTS aggregation_transient +  (amount_val INT8 NOT NULL +  ,amount_frac INT4 NOT NULL +  ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32) +  ,exchange_account_section TEXT NOT NULL +  ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32) +  ) +  PARTITION BY HASH (wire_target_h_payto); +COMMENT ON TABLE aggregation_transient +  IS 'aggregations currently happening (lacking wire_out, usually because the amount is too low); this table is not replicated'; +COMMENT ON COLUMN aggregation_transient.amount_val +  IS 'Sum of all of the aggregated deposits (without deposit fees)'; +COMMENT ON COLUMN aggregation_transient.wtid_raw +  IS 'identifier of the wire transfer'; + +CREATE TABLE IF NOT EXISTS aggregation_transient_default +  PARTITION OF aggregation_transient +  FOR VALUES WITH (MODULUS 1, REMAINDER 0); + +  -- ------------------------------ aggregation_tracking ---------------------------------------- --- FIXME-URGENT: add colum coin_pub to select by coin_pub + deposit_serial_id for more efficient deposit lookup!? --- Or which direction(s) is this table used? Is the partitioning sane??  CREATE TABLE IF NOT EXISTS aggregation_tracking    (aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE    ,deposit_serial_id INT8 PRIMARY KEY -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE -  ,wtid_raw BYTEA NOT NULL CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE +  ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)    )    PARTITION BY HASH (deposit_serial_id);  COMMENT ON TABLE aggregation_tracking    IS 'mapping from wire transfer identifiers (WTID) to deposits (and back)';  COMMENT ON COLUMN aggregation_tracking.wtid_raw -  IS 'We first create entries in the aggregation_tracking table and then finally the wire_out entry once we know the total amount. Hence the constraint must be deferrable and we cannot use a wireout_uuid here, because we do not have it when these rows are created. Changing the logic to first INSERT a dummy row into wire_out and then UPDATEing that row in the same transaction would theoretically reduce per-deposit storage costs by 5 percent (24/~460 bytes).'; +  IS 'identifier of the wire transfer';  CREATE TABLE IF NOT EXISTS aggregation_tracking_default    PARTITION OF aggregation_tracking @@ -1070,7 +1113,7 @@ BEGIN    EXECUTE FORMAT (      'ALTER TABLE aggregation_tracking_' || partition_suffix || ' '        'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key ' -        'UNIQUE (aggregation_serial_id) ' +        'UNIQUE (aggregation_serial_id);'    );  END  $$; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index c7bdae39..1709f17e 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -1188,7 +1188,7 @@ prepare_statements (struct PostgresClosure *pg)        "  ,dbr.shard ASC"        " LIMIT 1;",        4), -    /* Used in #postgres_iterate_matching_deposits() */ +    /* FIXME: deprecated; Used in #postgres_iterate_matching_deposits() */      GNUNET_PQ_make_prepare (        "deposits_iterate_matching",        "SELECT" @@ -1207,14 +1207,115 @@ prepare_statements (struct PostgresClosure *pg)        "    JOIN denominations denom"        "      USING (denominations_serial)"        " WHERE dfm.refund_deadline<$3" -      "  AND dfm.shard=$4" +      "  AND dfm.merchant_pub=$1"        "  AND dep.merchant_pub=$1"        "  AND dep.wire_target_h_payto=$2"        " LIMIT "        TALER_QUOTE (          TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";", +      3), + +    /* Used in #postgres_aggregate() */ +    GNUNET_PQ_make_prepare ( +      "aggregate", +      "WITH rdy AS (" /* find deposits ready */ +      "  SELECT" +      "    coin_pub" +      "    FROM deposits_for_matching" +      "    WHERE refund_deadline<$1" +      "      AND merchant_pub=$2" +      "    ORDER BY refund_deadline ASC" /* ordering is not critical */ +      "    LIMIT " +      TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) +      " )" +      " ,dep AS (" /* restrict to our merchant and account */ +      "  UPDATE deposits" +      "     SET done=TRUE" +      "   WHERE coin_pub IN (SELECT coin_pub FROM rdy)" +      "     AND merchant_pub=$2" +      "     AND wire_target_h_payto=$3" +      "   RETURNING" +      "     deposit_serial_id" +      "    ,coin_pub" +      "    ,amount_with_fee_val AS amount_val" +      "    ,amount_with_fee_frac AS amount_frac)" +      " ,ref AS (" /* find applicable refunds */ +      "  SELECT" +      "    amount_with_fee_val AS refund_val" +      "   ,amount_with_fee_frac AS refund_frac" +      "   ,coin_pub" +      "    FROM refunds" +      "   WHERE coin_pub IN (SELECT coin_pub FROM dep)" +      "     AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))" +      " ,fees AS (" /* find deposit fees for non-refunded deposits */ +      "  SELECT" +      "    denom.fee_deposit_val AS fee_val" +      "   ,denom.fee_deposit_frac AS fee_frac" +      "    FROM known_coins kc" +      "    JOIN denominations denom" +      "      USING (denominations_serial)" +      "    WHERE coin_pub IN (SELECT coin_pub FROM dep)" +      "      AND coin_pub NOT IN (SELECT coin_pub FROM ref))" +      " ,dummy AS (" /* add deposits to aggregation_tracking */ +      "    INSERT INTO aggregation_tracking" +      "    (deposit_serial_id" +      "    ,wtid_raw)" +      "    SELECT deposit_serial_id,$4" +      "      FROM dep)" +      "SELECT" /* calculate totals (deposits, refunds and fees) */ +      "  CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" +      " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" +      " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value" +      " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction" +      " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value" +      " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction" +      " FROM dep " +      "   FULL OUTER JOIN ref ON (FALSE)" +      "   FULL OUTER JOIN fees ON (FALSE);", +      4), + + +    /* Used in #postgres_create_aggregation_transient() */ +    GNUNET_PQ_make_prepare ( +      "create_aggregation_transient", +      "INSERT INTO aggregation_transient" +      " (amount_val" +      " ,amount_frac" +      " ,wire_target_h_payto" +      " ,exchange_account_section" +      " ,wtid_raw)" +      " VALUES ($1, $2, $3, $4, $5);", +      5), +    /* Used in #postgres_select_aggregation_transient() */ +    GNUNET_PQ_make_prepare ( +      "select_aggregation_transient", +      "SELECT" +      "  amount_val" +      " ,amount_frac" +      " ,wtid_raw" +      " FROM aggregation_transient" +      " WHERE wire_target_h_payto=$1" +      "   AND exchange_account_section=$2;", +      2), +    /* Used in #postgres_update_aggregation_transient() */ +    GNUNET_PQ_make_prepare ( +      "update_aggregation_transient", +      "UPDATE aggregation_transient" +      " SET amount_val=$1" +      "    ,amount_frac=$2" +      " WHERE wire_target_h_payto=$3" +      "   AND wtid_raw=$4",        4), -    /* Used in #postgres_mark_deposit_tiny() */ +    /* Used in #postgres_delete_aggregation_transient() */ +    GNUNET_PQ_make_prepare ( +      "delete_aggregation_transient", +      "DELETE FROM aggregation_transient" +      " WHERE wire_target_h_payto=$1" +      "   AND wtid_raw=$2", +      2), + + +    /* FIXME-deprecated: Used in #postgres_mark_deposit_tiny() */      GNUNET_PQ_make_prepare (        "mark_deposit_tiny",        "UPDATE deposits" @@ -1222,7 +1323,7 @@ prepare_statements (struct PostgresClosure *pg)        " WHERE coin_pub=$1"        "   AND deposit_serial_id=$2",        2), -    /* Used in #postgres_mark_deposit_done() */ +    /* FIXME-deprecated: Used in #postgres_mark_deposit_done() */      GNUNET_PQ_make_prepare (        "mark_deposit_done",        "UPDATE deposits" @@ -1230,6 +1331,7 @@ prepare_statements (struct PostgresClosure *pg)        " WHERE coin_pub=$1"        "   AND deposit_serial_id=$2;",        2), +      /* Used in #postgres_get_coin_transactions() to obtain information         about how a coin has been spend with /deposit requests. */      GNUNET_PQ_make_prepare ( @@ -2835,8 +2937,8 @@ prepare_statements (struct PostgresClosure *pg)      GNUNET_PQ_make_prepare (        "insert_into_table_refunds",        "INSERT INTO refunds" -      "(coin_pub" -      ",refund_serial_id" +      "(refund_serial_id" +      ",coin_pub"        ",merchant_sig"        ",rtransaction_id"        ",amount_with_fee_val" @@ -5869,6 +5971,240 @@ postgres_have_deposit2 (  /** + * Aggregate all matching deposits for @a h_payto and + * @a merchant_pub, returning the total amounts. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param merchant_pub public key of the merchant + * @param wtid wire transfer ID to set for the aggregate + * @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_aggregate ( +  void *cls, +  const struct TALER_PaytoHashP *h_payto, +  const struct TALER_MerchantPublicKeyP *merchant_pub, +  const struct TALER_WireTransferIdentifierRawP *wtid, +  struct TALER_Amount *total) +{ +  struct PostgresClosure *pg = cls; +  struct GNUNET_TIME_Absolute now = {0}; +  struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_absolute_time (&now), +    GNUNET_PQ_query_param_auto_from_type (merchant_pub), +    GNUNET_PQ_query_param_auto_from_type (h_payto), +    GNUNET_PQ_query_param_auto_from_type (wtid), +    GNUNET_PQ_query_param_end +  }; +  uint64_t sum_deposit_value; +  uint64_t sum_deposit_frac; +  uint64_t sum_refund_value; +  uint64_t sum_refund_frac; +  uint64_t sum_fee_value; +  uint64_t sum_fee_frac; +  struct GNUNET_PQ_ResultSpec rs[] = { +    GNUNET_PQ_result_spec_uint64 ("sum_deposit_value", +                                  &sum_deposit_value), +    GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction", +                                  &sum_deposit_frac), +    GNUNET_PQ_result_spec_uint64 ("sum_refund_value", +                                  &sum_refund_value), +    GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction", +                                  &sum_refund_frac), +    GNUNET_PQ_result_spec_uint64 ("sum_fee_value", +                                  &sum_fee_value), +    GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction", +                                  &sum_fee_frac), +    GNUNET_PQ_result_spec_end +  }; +  enum GNUNET_DB_QueryStatus qs; +  struct TALER_Amount sum_deposit; +  struct TALER_Amount sum_refund; +  struct TALER_Amount sum_fee; +  struct TALER_Amount delta; + +  now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (), +                                         pg->aggregator_shift); +  qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, +                                                 "aggregate", +                                                 params, +                                                 rs); +  if (qs < 0) +  { +    GNUNET_assert (GNUNET_DB_STATUS_SOFT_ERROR == qs); +    return qs; +  } +  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) +  { +    GNUNET_assert (GNUNET_OK == +                   TALER_amount_set_zero (pg->currency, +                                          total)); +    return qs; +  } +  GNUNET_assert (GNUNET_OK == +                 TALER_amount_set_zero (pg->currency, +                                        &sum_deposit)); +  GNUNET_assert (GNUNET_OK == +                 TALER_amount_set_zero (pg->currency, +                                        &sum_refund)); +  GNUNET_assert (GNUNET_OK == +                 TALER_amount_set_zero (pg->currency, +                                        &sum_fee)); +  sum_deposit.value    = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE +                         + sum_deposit_value; +  sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE; +  sum_refund.value     = sum_refund_frac  / TALER_AMOUNT_FRAC_BASE +                         + sum_refund_value; +  sum_refund.fraction  = sum_refund_frac  % TALER_AMOUNT_FRAC_BASE; +  sum_fee.value        = sum_fee_frac     / TALER_AMOUNT_FRAC_BASE +                         + sum_fee_value; +  sum_fee.fraction     = sum_fee_frac     % TALER_AMOUNT_FRAC_BASE; \ +  GNUNET_assert (0 <= +                 TALER_amount_subtract (&delta, +                                        &sum_deposit, +                                        &sum_refund)); +  GNUNET_assert (0 <= +                 TALER_amount_subtract (total, +                                        &delta, +                                        &sum_fee)); +  return qs; +} + + +/** + * Create a new entry in the transient aggregation table. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param exchange_account_section exchange account to use + * @param wtid the raw wire transfer identifier to be used + * @param total amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_create_aggregation_transient ( +  void *cls, +  const struct TALER_PaytoHashP *h_payto, +  const char *exchange_account_section, +  const struct TALER_WireTransferIdentifierRawP *wtid, +  const struct TALER_Amount *total) +{ +  struct PostgresClosure *pg = cls; +  struct GNUNET_PQ_QueryParam params[] = { +    TALER_PQ_query_param_amount (total), +    GNUNET_PQ_query_param_auto_from_type (h_payto), +    GNUNET_PQ_query_param_string (exchange_account_section), +    GNUNET_PQ_query_param_auto_from_type (wtid), +    GNUNET_PQ_query_param_end +  }; + +  return GNUNET_PQ_eval_prepared_non_select (pg->conn, +                                             "create_aggregation_transient", +                                             params); +} + + +/** + * Find existing entry in the transient aggregation table. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param exchange_account_section exchange account to use + * @param[out] wtid set to the raw wire transfer identifier to be used + * @param[out] total existing amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_select_aggregation_transient ( +  void *cls, +  const struct TALER_PaytoHashP *h_payto, +  const char *exchange_account_section, +  struct TALER_WireTransferIdentifierRawP *wtid, +  struct TALER_Amount *total) +{ +  struct PostgresClosure *pg = cls; +  struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_auto_from_type (h_payto), +    GNUNET_PQ_query_param_string (exchange_account_section), +    GNUNET_PQ_query_param_end +  }; +  struct GNUNET_PQ_ResultSpec rs[] = { +    TALER_PQ_RESULT_SPEC_AMOUNT ("amount", +                                 total), +    GNUNET_PQ_result_spec_auto_from_type ("wtid_raw", +                                          wtid), +    GNUNET_PQ_result_spec_end +  }; + +  return GNUNET_PQ_eval_prepared_singleton_select (pg->conn, +                                                   "select_aggregation_transient", +                                                   params, +                                                   rs); +} + + +/** + * Update existing entry in the transient aggregation table. + * @a h_payto is only needed for query performance. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param wtid the raw wire transfer identifier to update + * @param total new total amount to be wired in the future + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_update_aggregation_transient ( +  void *cls, +  const struct TALER_PaytoHashP *h_payto, +  const struct TALER_WireTransferIdentifierRawP *wtid, +  const struct TALER_Amount *total) +{ +  struct PostgresClosure *pg = cls; +  struct GNUNET_PQ_QueryParam params[] = { +    TALER_PQ_query_param_amount (total), +    GNUNET_PQ_query_param_auto_from_type (h_payto), +    GNUNET_PQ_query_param_auto_from_type (wtid), +    GNUNET_PQ_query_param_end +  }; + +  return GNUNET_PQ_eval_prepared_non_select (pg->conn, +                                             "update_aggregation_transient", +                                             params); +} + + +/** + * Delete existing entry in the transient aggregation table. + * @a h_payto is only needed for query performance. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param h_payto destination of the wire transfer + * @param wtid the raw wire transfer identifier to update + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_delete_aggregation_transient ( +  void *cls, +  const struct TALER_PaytoHashP *h_payto, +  const struct TALER_WireTransferIdentifierRawP *wtid) +{ +  struct PostgresClosure *pg = cls; +  struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_auto_from_type (h_payto), +    GNUNET_PQ_query_param_auto_from_type (wtid), +    GNUNET_PQ_query_param_end +  }; + +  return GNUNET_PQ_eval_prepared_non_select (pg->conn, +                                             "delete_aggregation_transient", +                                             params); +} + + +/**   * Mark a deposit as tiny, thereby declaring that it cannot be   * executed by itself and should no longer be returned by   * @e iterate_ready_deposits() @@ -6147,12 +6483,10 @@ postgres_iterate_matching_deposits (  {    struct PostgresClosure *pg = cls;    struct GNUNET_TIME_Absolute now = {0}; -  uint64_t shard = compute_shard (merchant_pub);    struct GNUNET_PQ_QueryParam params[] = {      GNUNET_PQ_query_param_auto_from_type (merchant_pub),      GNUNET_PQ_query_param_auto_from_type (h_payto),      GNUNET_PQ_query_param_absolute_time (&now), -    GNUNET_PQ_query_param_uint64 (&shard),      GNUNET_PQ_query_param_end    };    struct MatchingDepositContext mdc = { @@ -9299,7 +9633,6 @@ refunds_serial_helper_cb (void *cls,    struct RefundsSerialContext *rsc = cls;    struct PostgresClosure *pg = rsc->pg; -  fprintf (stderr, "Got %u results\n", num_results);    for (unsigned int i = 0; i<num_results; i++)    {      struct TALER_EXCHANGEDB_Refund refund; @@ -13081,6 +13414,15 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)    plugin->get_known_coin = &postgres_get_known_coin;    plugin->get_coin_denomination = &postgres_get_coin_denomination;    plugin->have_deposit2 = &postgres_have_deposit2; +  plugin->aggregate = &postgres_aggregate; +  plugin->create_aggregation_transient +    = &postgres_create_aggregation_transient; +  plugin->select_aggregation_transient +    = &postgres_select_aggregation_transient; +  plugin->update_aggregation_transient +    = &postgres_update_aggregation_transient; +  plugin->delete_aggregation_transient +    = &postgres_delete_aggregation_transient;    plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny;    plugin->mark_deposit_done = &postgres_mark_deposit_done;    plugin->get_ready_deposit = &postgres_get_ready_deposit; diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 79b09e0e..9e2e8a48 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -675,48 +675,6 @@ deposit_cb (void *cls,  /** - * Function called with details about deposits that - * have been made.  Called in the test on the - * deposit given in @a cls. - * - * @param cls closure a `struct TALER_EXCHANGEDB_Deposit *` - * @param rowid unique ID for the deposit in our DB, used for marking - *              it as 'tiny' or 'done' - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate - */ -static enum GNUNET_DB_QueryStatus -matching_deposit_cb (void *cls, -                     uint64_t rowid, -                     const struct TALER_CoinSpendPublicKeyP *coin_pub, -                     const struct TALER_Amount *amount_with_fee, -                     const struct TALER_Amount *deposit_fee, -                     const struct TALER_PrivateContractHashP *h_contract_terms) -{ -  struct TALER_EXCHANGEDB_Deposit *deposit = cls; - -  deposit_rowid = rowid; -  if ( (0 != TALER_amount_cmp (amount_with_fee, -                               &deposit->amount_with_fee)) || -       (0 != TALER_amount_cmp (deposit_fee, -                               &deposit->deposit_fee)) || -       (0 != GNUNET_memcmp (h_contract_terms, -                            &deposit->h_contract_terms)) || -       (0 != GNUNET_memcmp (coin_pub, -                            &deposit->coin.coin_pub)) ) -  { -    GNUNET_break (0); -    return GNUNET_DB_STATUS_HARD_ERROR; -  } - -  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -} - - -/**   * Callback for #select_deposits_above_serial_id ()   *   * @param cls closure @@ -1055,7 +1013,7 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit *deposit)                      &h_payto);    auditor_row_cnt = 0;    memset (&wire_out_wtid, -          42, +          41,            sizeof (wire_out_wtid));    wire_out_date = GNUNET_TIME_timestamp_get ();    GNUNET_assert (GNUNET_OK == @@ -1109,14 +1067,6 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit *deposit)                                                  &coin_fee2,                                                  &kyc));    } -  /* insert WT data */ -  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != -          plugin->insert_aggregation_tracking (plugin->cls, -                                               &wire_out_wtid, -                                               deposit_rowid)); - -  /* Now let's fix the transient constraint violation by -     putting in the WTID into the wire_out table */    {      struct TALER_ReservePublicKeyP rpub;      struct TALER_EXCHANGEDB_KycStatus kyc; @@ -2270,44 +2220,92 @@ run (void *cls)                                       &deposit_cb,                                       &deposit));    FAILIF (8 == result); -  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != -          plugin->iterate_matching_deposits (plugin->cls, -                                             &wire_target_h_payto, -                                             &deposit.merchant_pub, -                                             &matching_deposit_cb, -                                             &deposit, -                                             2)); +  { +    struct TALER_Amount total; +    struct TALER_WireTransferIdentifierRawP wtid; + +    memset (&wtid, +            41, +            sizeof (wtid)); +    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != +            plugin->aggregate (plugin->cls, +                               &wire_target_h_payto, +                               &deposit.merchant_pub, +                               &wtid, +                               &total)); +  }    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=            plugin->commit (plugin->cls));    FAILIF (GNUNET_OK !=            plugin->start (plugin->cls, -                         "test-2")); -  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != -          plugin->mark_deposit_tiny (plugin->cls, -                                     &deposit.coin.coin_pub, -                                     deposit_rowid)); -  FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != -          plugin->get_ready_deposit (plugin->cls, -                                     0, -                                     INT32_MAX, -                                     true, -                                     &deposit_cb, -                                     &deposit)); -  plugin->rollback (plugin->cls); -  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != -          plugin->get_ready_deposit (plugin->cls, -                                     0, -                                     INT32_MAX, -                                     true, -                                     &deposit_cb, -                                     &deposit)); -  FAILIF (GNUNET_OK != -          plugin->start (plugin->cls,                           "test-3")); -  FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != -          plugin->mark_deposit_done (plugin->cls, -                                     &deposit.coin.coin_pub, -                                     deposit_rowid)); +  { +    struct TALER_WireTransferIdentifierRawP wtid; +    struct TALER_Amount total; +    struct TALER_WireTransferIdentifierRawP wtid2; +    struct TALER_Amount total2; + +    memset (&wtid, +            42, +            sizeof (wtid)); +    GNUNET_assert (GNUNET_OK == +                   TALER_string_to_amount (CURRENCY ":42", +                                           &total)); +    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != +            plugin->select_aggregation_transient (plugin->cls, +                                                  &wire_target_h_payto, +                                                  "x-bank", +                                                  &wtid2, +                                                  &total2)); +    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != +            plugin->create_aggregation_transient (plugin->cls, +                                                  &wire_target_h_payto, +                                                  "x-bank", +                                                  &wtid, +                                                  &total)); +    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != +            plugin->select_aggregation_transient (plugin->cls, +                                                  &wire_target_h_payto, +                                                  "x-bank", +                                                  &wtid2, +                                                  &total2)); +    FAILIF (0 != +            GNUNET_memcmp (&wtid2, +                           &wtid)); +    FAILIF (0 != +            TALER_amount_cmp (&total2, +                              &total)); +    GNUNET_assert (GNUNET_OK == +                   TALER_string_to_amount (CURRENCY ":43", +                                           &total)); +    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != +            plugin->update_aggregation_transient (plugin->cls, +                                                  &wire_target_h_payto, +                                                  &wtid, +                                                  &total)); +    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != +            plugin->select_aggregation_transient (plugin->cls, +                                                  &wire_target_h_payto, +                                                  "x-bank", +                                                  &wtid2, +                                                  &total2)); +    FAILIF (0 != +            GNUNET_memcmp (&wtid2, +                           &wtid)); +    FAILIF (0 != +            TALER_amount_cmp (&total2, +                              &total)); +    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != +            plugin->delete_aggregation_transient (plugin->cls, +                                                  &wire_target_h_payto, +                                                  &wtid)); +    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != +            plugin->select_aggregation_transient (plugin->cls, +                                                  &wire_target_h_payto, +                                                  "x-bank", +                                                  &wtid2, +                                                  &total2)); +  }    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=            plugin->commit (plugin->cls)); diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index b2ea240e..4ca6905e 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -3146,6 +3146,98 @@ struct TALER_EXCHANGEDB_Plugin    /** +   * Aggregate all matching deposits for @a h_payto and +   * @a merchant_pub, returning the total amounts. +   * +   * @param cls the @e cls of this struct with the plugin-specific state +   * @param h_payto destination of the wire transfer +   * @param merchant_pub public key of the merchant +   * @param wtid wire transfer ID to set for the aggregate +   * @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds +   * @return transaction status +   */ +  enum GNUNET_DB_QueryStatus +  (*aggregate)( +    void *cls, +    const struct TALER_PaytoHashP *h_payto, +    const struct TALER_MerchantPublicKeyP *merchant_pub, +    const struct TALER_WireTransferIdentifierRawP *wtid, +    struct TALER_Amount *total); + + +  /** +   * Create a new entry in the transient aggregation table. +   * +   * @param cls the @e cls of this struct with the plugin-specific state +   * @param h_payto destination of the wire transfer +   * @param exchange_account_section exchange account to use +   * @param wtid the raw wire transfer identifier to be used +   * @param total amount to be wired in the future +   * @return transaction status +   */ +  enum GNUNET_DB_QueryStatus +  (*create_aggregation_transient)( +    void *cls, +    const struct TALER_PaytoHashP *h_payto, +    const char *exchange_account_section, +    const struct TALER_WireTransferIdentifierRawP *wtid, +    const struct TALER_Amount *total); + + +  /** +   * Find existing entry in the transient aggregation table. +   * +   * @param cls the @e cls of this struct with the plugin-specific state +   * @param h_payto destination of the wire transfer +   * @param exchange_account_section exchange account to use +   * @param[out] wtid set to the raw wire transfer identifier to be used +   * @param[out] total existing amount to be wired in the future +   * @return transaction status +   */ +  enum GNUNET_DB_QueryStatus +  (*select_aggregation_transient)( +    void *cls, +    const struct TALER_PaytoHashP *h_payto, +    const char *exchange_account_section, +    struct TALER_WireTransferIdentifierRawP *wtid, +    struct TALER_Amount *total); + + +  /** +   * Update existing entry in the transient aggregation table. +   * @a h_payto is only needed for query performance. +   * +   * @param cls the @e cls of this struct with the plugin-specific state +   * @param h_payto destination of the wire transfer +   * @param wtid the raw wire transfer identifier to update +   * @param total new total amount to be wired in the future +   * @return transaction status +   */ +  enum GNUNET_DB_QueryStatus +  (*update_aggregation_transient)( +    void *cls, +    const struct TALER_PaytoHashP *h_payto, +    const struct TALER_WireTransferIdentifierRawP *wtid, +    const struct TALER_Amount *total); + + +  /** +   * Delete existing entry in the transient aggregation table. +   * @a h_payto is only needed for query performance. +   * +   * @param cls the @e cls of this struct with the plugin-specific state +   * @param h_payto destination of the wire transfer +   * @param wtid the raw wire transfer identifier to update +   * @return transaction status +   */ +  enum GNUNET_DB_QueryStatus +  (*delete_aggregation_transient)( +    void *cls, +    const struct TALER_PaytoHashP *h_payto, +    const struct TALER_WireTransferIdentifierRawP *wtid); + + +  /**     * Lookup melt commitment data under the given @a rc.     *     * @param cls the @e cls of this struct with the plugin-specific state | 
