diff options
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 83 | ||||
| -rw-r--r-- | src/exchangedb/drop0001.sql | 8 | ||||
| -rw-r--r-- | src/exchangedb/exchange-0001.sql | 266 | ||||
| -rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 131 | ||||
| -rw-r--r-- | src/exchangedb/test_exchangedb.c | 4 | ||||
| -rw-r--r-- | src/include/taler_exchangedb_plugin.h | 8 | 
6 files changed, 324 insertions, 176 deletions
| diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index abab347f..c34d47f9 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -28,6 +28,18 @@  #include "taler_json_lib.h"  #include "taler_bank_service.h" +struct AdditionalDeposit +{ +  /** +   * Public key of the coin. +   */ +  struct TALER_CoinSpendPublicKeyP coin_pub; + +  /** +   * Row of the deposit. +   */ +  uint64_t row; +};  /**   * Information about one aggregation process to be executed.  There is @@ -43,6 +55,11 @@ struct AggregationUnit    struct TALER_MerchantPublicKeyP merchant_pub;    /** +   * Public key of the coin. +   */ +  struct TALER_CoinSpendPublicKeyP coin_pub; + +  /**     * Total amount to be transferred, before subtraction of @e fees.wire and rounding down.     */    struct TALER_Amount total_amount; @@ -97,7 +114,8 @@ struct AggregationUnit    /**     * Array of row_ids from the aggregation.     */ -  uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; +  struct AdditionalDeposit +    additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];    /**     * Offset specifying how many @e additional_rows are in use. @@ -383,7 +401,8 @@ deposit_cb (void *cls,    enum GNUNET_DB_QueryStatus qs;    au->merchant_pub = *merchant_pub; -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +  au->coin_pub = *coin_pub; +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                "Aggregator processing payment %s with amount %s\n",                TALER_B2S (coin_pub),                TALER_amount2s (amount_with_fee)); @@ -405,7 +424,7 @@ deposit_cb (void *cls,    {      struct TALER_Amount ntotal; -    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                  "Non-refunded transaction, subtracting deposit fee %s\n",                  TALER_amount2s (deposit_fee));      if (0 > @@ -428,6 +447,9 @@ deposit_cb (void *cls,        au->total_amount = ntotal;      }    } +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +              "Amount after fee is %s\n", +              TALER_amount2s (&au->total_amount));    GNUNET_assert (NULL == au->payto_uri);    au->payto_uri = GNUNET_strdup (payto_uri); @@ -437,7 +459,7 @@ deposit_cb (void *cls,    GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,                                &au->wtid,                                sizeof (au->wtid)); -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n",                TALER_B2S (&au->wtid),                TALER_amount2s (amount_with_fee), @@ -493,7 +515,7 @@ deposit_cb (void *cls,                "Aggregator marks deposit %llu as done\n",                (unsigned long long) row_id);    qs = db_plugin->mark_deposit_done (db_plugin->cls, -                                     merchant_pub, +                                     coin_pub,                                       row_id);    if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)    { @@ -528,6 +550,8 @@ aggregate_cb (void *cls,    struct TALER_Amount old;    enum GNUNET_DB_QueryStatus qs; +  if (row_id == au->row_id) +    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;    if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)    {      /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ @@ -605,18 +629,29 @@ aggregate_cb (void *cls,    }    /* "append" to our list of rows */ -  au->additional_rows[au->rows_offset++] = row_id; +  au->additional_rows[au->rows_offset].coin_pub = *coin_pub; +  au->additional_rows[au->rows_offset].row = row_id; +  au->rows_offset++;    /* insert into aggregation tracking table */ +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +              "Adding %llu to aggregate %s\n", +              (unsigned long long) row_id, +              TALER_B2S (&au->wtid));    qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,                                                 &au->wtid,                                                 row_id);    if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)    { +    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +                "Failed to add %llu to aggregate %s: %d\n", +                (unsigned long long) row_id, +                TALER_B2S (&au->wtid), +                qs);      GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);      return qs;    }    qs = db_plugin->mark_deposit_done (db_plugin->cls, -                                     &au->merchant_pub, +                                     coin_pub,                                       row_id);    if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)    { @@ -775,7 +810,7 @@ run_aggregation (void *cls)    }    /* Now try to find other deposits to aggregate */ -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,                "Found ready deposit for %s, aggregating by target %llu\n",                TALER_B2S (&au_active.merchant_pub),                (unsigned long long) au_active.wire_target); @@ -808,13 +843,17 @@ run_aggregation (void *cls)                                       s);      return;    } -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, -              "Found %d other deposits to combine into wire transfer.\n", -              qs); +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +              "Found %d other deposits to combine into wire transfer with fee %s.\n", +              qs, +              TALER_amount2s (&au_active.fees.wire));    /* Subtract wire transfer fee and round to the unit supported by the       wire transfer method; Check if after rounding down, we still have       an amount to transfer, and if not mark as 'tiny'. */ +  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +              "Rounding aggregate of %s\n", +              TALER_amount2s (&au_active.total_amount));    if ( (0 >=          TALER_amount_subtract (&au_active.final_amount,                                 &au_active.total_amount, @@ -822,8 +861,7 @@ run_aggregation (void *cls)         (GNUNET_SYSERR ==          TALER_amount_round_down (&au_active.final_amount,                                   ¤cy_round_unit)) || -       ( (0 == au_active.final_amount.value) && -         (0 == au_active.final_amount.fraction) ) ) +       (TALER_amount_is_zero (&au_active.final_amount)) )    {      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Aggregate value too low for transfer (%d/%s)\n", @@ -848,23 +886,29 @@ run_aggregation (void *cls)        return;      }      /* Mark transactions by row_id as minor */ +    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +                "Marking %s (%llu) as tiny\n", +                TALER_B2S (&au_active.coin_pub), +                (unsigned long long) au_active.row_id);      qs = db_plugin->mark_deposit_tiny (db_plugin->cls, -                                       &au_active.merchant_pub, +                                       &au_active.coin_pub,                                         au_active.row_id); -    if (0 <= qs) +    if (0 < qs)      {        for (unsigned int i = 0; i<au_active.rows_offset; i++)        {          qs = db_plugin->mark_deposit_tiny (db_plugin->cls, -                                           &au_active.merchant_pub, -                                           au_active.additional_rows[i]); -        if (0 > qs) +                                           &au_active.additional_rows[i]. +                                           coin_pub, +                                           au_active.additional_rows[i].row); +        if (0 >= qs)            break;        }      } +    GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs);      if (GNUNET_DB_STATUS_SOFT_ERROR == qs)      { -      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                    "Serialization issue, trying again later!\n");        db_plugin->rollback (db_plugin->cls);        cleanup_au (&au_active); @@ -876,6 +920,7 @@ run_aggregation (void *cls)      }      if (GNUNET_DB_STATUS_HARD_ERROR == qs)      { +      GNUNET_break (0);        db_plugin->rollback (db_plugin->cls);        cleanup_au (&au_active);        global_ret = EXIT_FAILURE; diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql index 60acc98a..225c817a 100644 --- a/src/exchangedb/drop0001.sql +++ b/src/exchangedb/drop0001.sql @@ -55,6 +55,8 @@ DROP TABLE IF EXISTS wire_targets CASCADE;  DROP FUNCTION IF EXISTS add_constraints_to_wire_targets_partition;  DROP TABLE IF EXISTS wire_fee CASCADE;  DROP TABLE IF EXISTS deposits CASCADE; +DROP TABLE IF EXISTS deposits_by_ready CASCADE; +DROP TABLE IF EXISTS deposits_for_matching CASCADE;  DROP FUNCTION IF EXISTS add_constraints_to_deposits_partition;  DROP TABLE IF EXISTS extension_details CASCADE;  DROP TABLE IF EXISTS refunds CASCADE; @@ -88,6 +90,7 @@ DROP TABLE IF EXISTS recoup_by_reserve CASCADE;  DROP TABLE IF EXISTS partners CASCADE;  DROP TABLE IF EXISTS account_merges CASCADE;  DROP TABLE IF EXISTS purse_merges CASCADE; +DROP TABLE IF EXISTS purse_deposits CASCADE;  DROP TABLE IF EXISTS contracts CASCADE;  DROP TABLE IF EXISTS history_requests CASCADE;  DROP TABLE IF EXISTS close_requests CASCADE; @@ -103,8 +106,9 @@ DROP FUNCTION IF EXISTS exchange_do_withdraw;  DROP FUNCTION IF EXISTS exchange_do_withdraw_limit_check;  DROP FUNCTION IF EXISTS recoup_insert_trigger;  DROP FUNCTION IF EXISTS recoup_delete_trigger; -DROP FUNCTION IF EXISTS deposits_by_coin_insert_trigger; -DROP FUNCTION IF EXISTS deposits_by_coin_delete_trigger; +DROP FUNCTION IF EXISTS deposits_insert_trigger; +DROP FUNCTION IF EXISTS deposits_update_trigger; +DROP FUNCTION IF EXISTS deposits_delete_trigger;  DROP FUNCTION IF EXISTS reserves_out_by_reserve_insert_trigger;  DROP FUNCTION IF EXISTS reserves_out_by_reserve_delete_trigger;  DROP FUNCTION IF EXISTS exchange_do_deposit; diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001.sql index e723a367..568779f9 100644 --- a/src/exchangedb/exchange-0001.sql +++ b/src/exchangedb/exchange-0001.sql @@ -610,7 +610,7 @@ CREATE TABLE IF NOT EXISTS deposits    (deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- PRIMARY KEY    ,shard INT8 NOT NULL    ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE -  ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE +  ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE --- FIXME: column needed???    ,amount_with_fee_val INT8 NOT NULL    ,amount_with_fee_frac INT4 NOT NULL    ,wallet_timestamp INT8 NOT NULL @@ -626,22 +626,11 @@ CREATE TABLE IF NOT EXISTS deposits    ,done BOOLEAN NOT NULL DEFAULT FALSE    ,extension_blocked BOOLEAN NOT NULL DEFAULT FALSE    ,extension_details_serial_id INT8 REFERENCES extension_details (extension_details_serial_id) ON DELETE CASCADE -  ,UNIQUE (shard, coin_pub, merchant_pub, h_contract_terms) +  ,UNIQUE (coin_pub, merchant_pub, h_contract_terms)    ) -  PARTITION BY HASH (shard); -- FIXME: why not BY RANGE? RANGE would seem better for 'deposits_get_ready'! +  PARTITION BY HASH (coin_pub);  -- FIXME: --- new idea: partition deposits by coin_pub (remove deposits_by_coin) --- define 'ready' == ! (tiny || done || blocked) --- add new deposits_by_ready (on shard + wire_deadline), select by shard, then ready + deadline ---         -- use triggers to ONLY include 'ready' deposits (delete on update)! ---         -- use multi-level partitions: Hash(shard) + Range(wire_deadline/sec) --- add new deposits_by_match (on shard + refund_deadline) ---         -- use triggers to ONLY include 'ready' deposits (delete on update)! ---         -- use multi-level partitions: Hash(shard) + Range(refund_deadline/sec) --- => first we select per-merchant shard, basically stay on the same system as other ops for the same merchant --- => second we select by deadline, use enough values so that _usually_ the aggregator ---    and the 'insert' process _can_ work on different shards! --- => the latter could be achieved by dynamically (!) creating/deleting partitions: +-- TODO: dynamically (!) creating/deleting partitions:  --    create new partitions 'as needed', drop old ones once the aggregator has made  --    them empty; as 'new' deposits will always have deadlines in the future, this  --    would basically guarantee no conflict between aggregator and exchange service! @@ -683,31 +672,15 @@ COMMENT ON COLUMN deposits.extension_details_serial_id  COMMENT ON COLUMN deposits.tiny    IS 'Set to TRUE if we decided that the amount is too small to ever trigger a wire transfer by itself (requires real aggregation)'; +-- FIXME: we sometimes go ONLY by 'deposit_serial_id', +--        check if queries could be improved by adding shard or adding another index without shard here, or inverting the order of the index here!  CREATE INDEX IF NOT EXISTS deposits_deposit_by_serial_id_index    ON deposits    (shard,deposit_serial_id); -CREATE INDEX IF NOT EXISTS deposits_for_get_ready_index -  ON deposits -  (shard ASC -  ,done -  ,extension_blocked -  ,tiny -  ,wire_deadline ASC -  ); -COMMENT ON INDEX deposits_for_get_ready_index -  IS 'for deposits_get_ready'; -CREATE INDEX IF NOT EXISTS deposits_for_iterate_matching_index +CREATE INDEX IF NOT EXISTS deposits_by_coin_pub_index    ON deposits -  (shard -  ,merchant_pub -  ,wire_target_h_payto -  ,done -  ,extension_blocked -  ,refund_deadline ASC -  ); -COMMENT ON INDEX deposits_for_iterate_matching_index -  IS 'for deposits_iterate_matching'; +  (coin_pub);  CREATE TABLE IF NOT EXISTS deposits_default @@ -732,66 +705,198 @@ $$;  SELECT add_constraints_to_deposits_partition('default'); +CREATE TABLE IF NOT EXISTS deposits_by_ready +  (wire_deadline INT8 NOT NULL +  ,shard INT8 NOT NULL +  ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) +  ,deposit_serial_id INT8 +  ) +  PARTITION BY RANGE (wire_deadline); +COMMENT ON TABLE deposits_by_ready +  IS 'Enables fast lookups for deposits_get_ready, auto-populated via TRIGGER below'; + +CREATE INDEX IF NOT EXISTS deposits_by_ready_main_index +  ON deposits_by_ready +  (wire_deadline ASC, shard ASC, coin_pub); + +CREATE TABLE IF NOT EXISTS deposits_by_ready_default +  PARTITION OF deposits_by_ready +  DEFAULT; -CREATE TABLE IF NOT EXISTS deposits_by_coin -  (deposit_serial_id BIGINT + +CREATE TABLE IF NOT EXISTS deposits_for_matching +  (refund_deadline INT8 NOT NULL    ,shard INT8 NOT NULL    ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) +  ,deposit_serial_id INT8    ) -  PARTITION BY HASH (coin_pub); -COMMENT ON TABLE deposits_by_coin -  IS 'Enables fast lookups of deposit by coin_pub, auto-populated via TRIGGER below'; +  PARTITION BY RANGE (refund_deadline); +COMMENT ON TABLE deposits_for_matching +  IS 'Enables fast lookups for deposits_iterate_matching, auto-populated via TRIGGER below'; -CREATE INDEX IF NOT EXISTS deposits_by_coin_main_index -  ON deposits_by_coin -  (coin_pub); +CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index +  ON deposits_for_matching +  (refund_deadline ASC, shard, coin_pub); -CREATE TABLE IF NOT EXISTS deposits_by_coin_default -  PARTITION OF deposits_by_coin -  FOR VALUES WITH (MODULUS 1, REMAINDER 0); +CREATE TABLE IF NOT EXISTS deposits_for_matching_default +  PARTITION OF deposits_for_matching +  DEFAULT; +   -CREATE OR REPLACE FUNCTION deposits_by_coin_insert_trigger() +CREATE OR REPLACE FUNCTION deposits_insert_trigger()    RETURNS trigger    LANGUAGE plpgsql    AS $$ +DECLARE +  is_ready BOOLEAN; +DECLARE +  is_tready BOOLEAN; -- is ready, but may be tiny  BEGIN -  INSERT INTO deposits_by_coin -    (deposit_serial_id -    ,shard -    ,coin_pub) -  VALUES -    (NEW.deposit_serial_id -    ,NEW.shard -    ,NEW.coin_pub); +  is_ready  = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked); +  is_tready = NOT (NEW.done OR NEW.extension_blocked); + +  IF (is_ready) +  THEN +    INSERT INTO deposits_by_ready +      (wire_deadline +      ,shard +      ,coin_pub +      ,deposit_serial_id) +    VALUES +      (NEW.wire_deadline +      ,NEW.shard +      ,NEW.coin_pub +      ,NEW.deposit_serial_id); +  END IF; +  IF (is_tready) +  THEN +    INSERT INTO deposits_for_matching +      (refund_deadline +      ,shard +      ,coin_pub +      ,deposit_serial_id) +    VALUES +      (NEW.refund_deadline +      ,NEW.shard +      ,NEW.coin_pub +      ,NEW.deposit_serial_id); +  END IF;    RETURN NEW;  END $$;   -COMMENT ON FUNCTION deposits_by_coin_insert_trigger() -  IS 'Replicate deposit inserts into deposits_by_coin table.'; +COMMENT ON FUNCTION deposits_insert_trigger() +  IS 'Replicate deposit inserts into materialized indices.';  CREATE TRIGGER deposits_on_insert    AFTER INSERT     ON deposits -   FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_insert_trigger(); +   FOR EACH ROW EXECUTE FUNCTION deposits_insert_trigger(); -CREATE OR REPLACE FUNCTION deposits_by_coin_delete_trigger() +CREATE OR REPLACE FUNCTION deposits_update_trigger()    RETURNS trigger    LANGUAGE plpgsql    AS $$ +DECLARE +  was_ready BOOLEAN; +DECLARE +  is_ready BOOLEAN; +DECLARE +  was_tready BOOLEAN; -- was ready, but may be tiny +DECLARE +  is_tready BOOLEAN; -- is ready, but may be tiny  BEGIN -  DELETE FROM deposits_by_coin -   WHERE coin_pub = OLD.coin_pub -     AND shard = OLD.shard -     AND deposit_serial_id = OLD.deposit_serial_id; -  RETURN OLD; +  was_ready = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked); +  is_ready  = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked); +  was_tready = NOT (OLD.done OR OLD.extension_blocked); +  is_tready  = NOT (NEW.done OR NEW.extension_blocked); +  IF (was_ready AND NOT is_ready) +  THEN +    DELETE FROM deposits_by_ready +     WHERE wire_deadline = OLD.wire_deadline +       AND shard = OLD.shard +       AND coin_pub = OLD.coin_pub +       AND deposit_serial_id = OLD.deposit_serial_id; +  END IF; +  IF (was_tready AND NOT is_tready) +  THEN +    DELETE FROM deposits_for_matching +     WHERE refund_deadline = OLD.refund_deadline +       AND shard = OLD.shard +       AND coin_pub = OLD.coin_pub +       AND deposit_serial_id = OLD.deposit_serial_id; +  END IF; +  IF (is_ready AND NOT was_ready) +  THEN +    INSERT INTO deposits_by_ready +      (wire_deadline +      ,shard +      ,coin_pub +      ,deposit_serial_id) +    VALUES +      (NEW.wire_deadline +      ,NEW.shard +      ,NEW.coin_pub +      ,NEW.deposit_serial_id); +  END IF; +  IF (is_tready AND NOT was_tready) +  THEN +    INSERT INTO deposits_for_matching +      (refund_deadline +      ,shard +      ,coin_pub +      ,deposit_serial_id) +    VALUES +      (NEW.refund_deadline +      ,NEW.shard +      ,NEW.coin_pub +      ,NEW.deposit_serial_id); +  END IF; +  RETURN NEW; +END $$; +COMMENT ON FUNCTION deposits_update_trigger() +  IS 'Replicate deposits changes into materialized indices.'; + +CREATE TRIGGER deposits_on_update +  AFTER UPDATE +    ON deposits +   FOR EACH ROW EXECUTE FUNCTION deposits_update_trigger(); + +CREATE OR REPLACE FUNCTION deposits_delete_trigger() +  RETURNS trigger +  LANGUAGE plpgsql +  AS $$ +DECLARE +  was_ready BOOLEAN; +DECLARE +  was_tready BOOLEAN; -- is ready, but may be tiny +BEGIN +  was_ready  = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked); +  was_tready = NOT (OLD.done OR OLD.extension_blocked); + +  IF (was_ready) +  THEN +    DELETE FROM deposits_by_ready +     WHERE wire_deadline = OLD.wire_deadline +       AND shard = OLD.shard +       AND coin_pub = OLD.coin_pub +       AND deposit_serial_id = OLD.deposit_serial_id; +  END IF; +  IF (was_tready) +  THEN +    DELETE FROM deposits_for_matching +     WHERE refund_deadline = OLD.refund_deadline +       AND shard = OLD.shard +       AND coin_pub = OLD.coin_pub +       AND deposit_serial_id = OLD.deposit_serial_id; +  END IF; +  RETURN NEW;  END $$;   -COMMENT ON FUNCTION deposits_by_coin_delete_trigger() -  IS 'Replicate deposits deletions into deposits_by_coin table.'; +COMMENT ON FUNCTION deposits_delete_trigger() +  IS 'Replicate deposit deletions into materialized indices.';  CREATE TRIGGER deposits_on_delete    AFTER DELETE -    ON deposits -   FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_delete_trigger(); - +   ON deposits +   FOR EACH ROW EXECUTE FUNCTION deposits_delete_trigger();  CREATE TABLE IF NOT EXISTS refunds @@ -2011,7 +2116,7 @@ DECLARE  BEGIN  -- Shards: INSERT extension_details (by extension_details_serial_id)  --         INSERT wire_targets (by h_payto), on CONFLICT DO NOTHING; ---         INSERT deposits (by shard + merchant_pub + h_payto), ON CONFLICT DO NOTHING; +--         INSERT deposits (by coin_pub, shard), ON CONFLICT DO NOTHING;  --         UPDATE known_coins (by coin_pub)  IF NOT NULL in_extension_details @@ -2356,27 +2461,26 @@ DECLARE  DECLARE    deposit_frac INT8; -- amount that was originally deposited  BEGIN --- Shards: SELECT deposits (by shard, coin_pub, h_contract_terms, merchant_pub) +-- Shards: SELECT deposits (coin_pub, shard, h_contract_terms, merchant_pub)  --         INSERT refunds (by deposit_serial_id, rtransaction_id) ON CONFLICT DO NOTHING  --         SELECT refunds (by deposit_serial_id)  --         UPDATE known_coins (by coin_pub)  SELECT -   dep.deposit_serial_id -  ,dep.amount_with_fee_val -  ,dep.amount_with_fee_frac -  ,dep.done +   deposit_serial_id +  ,amount_with_fee_val +  ,amount_with_fee_frac +  ,done  INTO     dsi    ,deposit_val    ,deposit_frac    ,out_gone -FROM deposits_by_coin dbc -  JOIN deposits dep USING (shard,deposit_serial_id) - WHERE dbc.coin_pub=in_coin_pub -  AND dep.shard=in_deposit_shard -  AND dep.merchant_pub=in_merchant_pub -  AND dep.h_contract_terms=in_h_contract_terms; +FROM deposits + WHERE coin_pub=in_coin_pub +  AND shard=in_deposit_shard +  AND merchant_pub=in_merchant_pub +  AND h_contract_terms=in_h_contract_terms;  IF NOT FOUND  THEN diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 120f475d..3cde9773 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -990,12 +990,11 @@ prepare_statements (struct PostgresClosure *pg)        ",rtransaction_id "        ",amount_with_fee_val "        ",amount_with_fee_frac " -      ") SELECT dbc.deposit_serial_id, $3, $5, $6, $7" -      "    FROM deposits_by_coin dbc" -      "    JOIN deposits dep USING (shard,deposit_serial_id)" -      "   WHERE dbc.coin_pub=$1" -      "     AND dep.h_contract_terms=$4" -      "     AND dep.merchant_pub=$2", +      ") SELECT deposit_serial_id, $3, $5, $6, $7" +      "    FROM deposits" /* FIXME: check if adding additional AND on the 'shard' would help (possibly after reviewing indices on deposits!) */ +      "   WHERE coin_pub=$1" +      "     AND h_contract_terms=$4" +      "     AND merchant_pub=$2",        7),      /* Query the 'refunds' by coin public key */      GNUNET_PQ_make_prepare ( @@ -1010,12 +1009,11 @@ prepare_statements (struct PostgresClosure *pg)        ",denom.fee_refund_val "        ",denom.fee_refund_frac "        ",ref.refund_serial_id" -      " FROM deposits_by_coin dbc" +      " FROM deposits dep"        " JOIN refunds ref USING (deposit_serial_id)" -      " JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id = dep.deposit_serial_id)" -      " JOIN known_coins kc ON (dbc.coin_pub = kc.coin_pub)" +      " JOIN known_coins kc ON (dep.coin_pub = kc.coin_pub)"        " JOIN denominations denom USING (denominations_serial)" -      " WHERE dbc.coin_pub=$1;", +      " WHERE dep.coin_pub=$1;",        1),      /* Query the 'refunds' by coin public key, merchant_pub and contract hash */      GNUNET_PQ_make_prepare ( @@ -1023,10 +1021,9 @@ prepare_statements (struct PostgresClosure *pg)        "SELECT"        " ref.amount_with_fee_val"        ",ref.amount_with_fee_frac" -      " FROM deposits_by_coin dbc" +      " FROM deposits dep"        " JOIN refunds ref USING (shard,deposit_serial_id)" -      " JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id = dep.deposit_serial_id)" -      " WHERE dbc.coin_pub=$1" +      " WHERE dep.coin_pub=$1"        "   AND dep.merchant_pub=$2"        "   AND dep.h_contract_terms=$3;",        3), @@ -1053,6 +1050,7 @@ prepare_statements (struct PostgresClosure *pg)      /* Lock deposit table; NOTE: we may want to eventually shard the         deposit table to avoid this lock being the main point of         contention limiting transaction performance. */ +    // FIXME: check if this query is even still used!      GNUNET_PQ_make_prepare (        "lock_deposit",        "LOCK TABLE deposits;", @@ -1098,12 +1096,11 @@ prepare_statements (struct PostgresClosure *pg)        ",dep.h_contract_terms"        ",dep.wire_salt"        ",wt.payto_uri AS receiver_wire_account" -      " FROM deposits_by_coin dbc" -      " JOIN deposits dep USING (shard,deposit_serial_id)" -      " JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)" +      " FROM deposits dep" +      " JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)"        " JOIN denominations USING (denominations_serial)"        " JOIN wire_targets wt USING (wire_target_h_payto)" -      " WHERE dbc.coin_pub=$1" +      " WHERE dep.coin_pub=$1"        "   AND dep.merchant_pub=$3"        "   AND dep.h_contract_terms=$2;",        3), @@ -1150,12 +1147,11 @@ prepare_statements (struct PostgresClosure *pg)        ",denom.fee_deposit_val"        ",denom.fee_deposit_frac"        ",dep.wire_deadline" -      " FROM deposits_by_coin dbc" -      "    JOIN deposits dep USING (shard,deposit_serial_id)" +      " FROM deposits dep"        "    JOIN wire_targets wt USING (wire_target_h_payto)" -      "    JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)" +      "    JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)"        "    JOIN denominations denom USING (denominations_serial)" -      " WHERE dbc.coin_pub=$1" +      " WHERE dep.coin_pub=$1"        "   AND dep.merchant_pub=$3"        "   AND dep.h_contract_terms=$2;",        3), @@ -1163,7 +1159,7 @@ prepare_statements (struct PostgresClosure *pg)      GNUNET_PQ_make_prepare (        "deposits_get_ready",        "SELECT" -      " deposit_serial_id" +      " dep.deposit_serial_id"        ",amount_with_fee_val"        ",amount_with_fee_frac"        ",denom.fee_deposit_val" @@ -1173,47 +1169,46 @@ prepare_statements (struct PostgresClosure *pg)        ",wire_target_serial_id"        ",merchant_pub"        ",kc.coin_pub" -      " FROM deposits" +      " FROM deposits_by_ready dbr" +      "  JOIN deposits dep" +      "    ON (dbr.coin_pub = dep.coin_pub AND dbr.deposit_serial_id = dep.deposit_serial_id)"        "  JOIN wire_targets "        "    USING (wire_target_h_payto)"        "  JOIN known_coins kc" -      "    USING (coin_pub)" +      "    ON (kc.coin_pub = dep.coin_pub)"        "  JOIN denominations denom"        "    USING (denominations_serial)" -      " WHERE " -      "       shard >= $2" -      "   AND shard <= $3" -      "   AND done=FALSE" -      "   AND extension_blocked=FALSE" -      "   AND tiny=FALSE" -      "   AND wire_deadline<=$1" +      " WHERE dbr.wire_deadline<=$1" +      "   AND dbr.shard >= $2" +      "   AND dbr.shard <= $3"        "   AND (kyc_ok OR $4)"        " ORDER BY " -      "   shard ASC" -      "  ,wire_deadline ASC" +      "   dbr.wire_deadline ASC" +      "  ,dbr.shard ASC"        " LIMIT 1;",        4),      /* Used in #postgres_iterate_matching_deposits() */      GNUNET_PQ_make_prepare (        "deposits_iterate_matching",        "SELECT" -      " deposit_serial_id" -      ",amount_with_fee_val" -      ",amount_with_fee_frac" +      " dep.deposit_serial_id" +      ",dep.amount_with_fee_val" +      ",dep.amount_with_fee_frac"        ",denom.fee_deposit_val"        ",denom.fee_deposit_frac" -      ",h_contract_terms" -      ",kc.coin_pub" -      " FROM deposits" -      "    JOIN known_coins kc USING (coin_pub)" -      "    JOIN denominations denom USING (denominations_serial)" -      " WHERE shard=$4" -      "  AND merchant_pub=$1" -      "  AND wire_target_h_payto=$2" -      "  AND done=FALSE" -      "  AND extension_blocked=FALSE" -      "  AND refund_deadline<$3" -      " ORDER BY refund_deadline ASC" +      ",dep.h_contract_terms" +      ",dfm.coin_pub" +      " FROM deposits_for_matching dfm" +      "    JOIN deposits dep " +      "      ON (dep.coin_pub = dfm.coin_pub and dep.deposit_serial_id = dfm.deposit_serial_id)" +      "    JOIN known_coins kc" +      "      ON (dep.coin_pub = kc.coin_pub)" +      "    JOIN denominations denom" +      "      USING (denominations_serial)" +      " WHERE dfm.refund_deadline<$3" +      "  AND dfm.shard=$4" +      "  AND dep.merchant_pub=$1" +      "  AND dep.wire_target_h_payto=$2"        " LIMIT "        TALER_QUOTE (          TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";", @@ -1223,16 +1218,16 @@ prepare_statements (struct PostgresClosure *pg)        "mark_deposit_tiny",        "UPDATE deposits"        " SET tiny=TRUE" -      " WHERE shard=$2" -      "   AND deposit_serial_id=$1", +      " WHERE coin_pub=$1" +      "   AND deposit_serial_id=$2",        2),      /* Used in #postgres_mark_deposit_done() */      GNUNET_PQ_make_prepare (        "mark_deposit_done",        "UPDATE deposits"        " SET done=TRUE" -      " WHERE shard=$2" -      "   AND deposit_serial_id=$1;", +      " WHERE coin_pub=$1" +      "   AND deposit_serial_id=$2;",        2),      /* Used in #postgres_get_coin_transactions() to obtain information         about how a coin has been spend with /deposit requests. */ @@ -1255,16 +1250,14 @@ prepare_statements (struct PostgresClosure *pg)        ",dep.coin_sig"        ",dep.deposit_serial_id"        ",dep.done" -      " FROM deposits_by_coin dbc" -      "    JOIN deposits dep" -      "      USING (shard,deposit_serial_id)" +      " FROM deposits dep"        "    JOIN wire_targets wt"        "      USING (wire_target_h_payto)"        "    JOIN known_coins kc" -      "      ON (kc.coin_pub = dbc.coin_pub)" +      "      ON (kc.coin_pub = dep.coin_pub)"        "    JOIN denominations denoms"        "      USING (denominations_serial)" -      " WHERE dbc.coin_pub=$1;", +      " WHERE dep.coin_pub=$1;",        1),      /* Used in #postgres_get_link_data(). */ @@ -1329,20 +1322,18 @@ prepare_statements (struct PostgresClosure *pg)        ",wt.payto_uri"        ",denom.fee_deposit_val"        ",denom.fee_deposit_frac" -      " FROM deposits_by_coin dbc" -      "    JOIN deposits dep" -      "      USING (shard,deposit_serial_id)" +      " FROM deposits dep"        "    JOIN wire_targets wt"        "      USING (wire_target_h_payto)"        "    JOIN aggregation_tracking"        "      USING (deposit_serial_id)"        "    JOIN known_coins kc" -      "      ON (kc.coin_pub = dbc.coin_pub)" +      "      ON (kc.coin_pub = dep.coin_pub)"        "    JOIN denominations denom"        "      USING (denominations_serial)"        "    JOIN wire_out"        "      USING (wtid_raw)" -      " WHERE dbc.coin_pub=$1" +      " WHERE dep.coin_pub=$1"        "   AND dep.merchant_pub=$3"        "   AND dep.h_contract_terms=$2",        3), @@ -5898,14 +5889,13 @@ postgres_have_deposit2 (   */  static enum GNUNET_DB_QueryStatus  postgres_mark_deposit_tiny (void *cls, -                            const struct TALER_MerchantPublicKeyP *merchant_pub, +                            const struct TALER_CoinSpendPublicKeyP *coin_pub,                              uint64_t rowid)  {    struct PostgresClosure *pg = cls; -  uint64_t deposit_shard = compute_shard (merchant_pub);    struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_auto_from_type (coin_pub),      GNUNET_PQ_query_param_uint64 (&rowid), -    GNUNET_PQ_query_param_uint64 (&deposit_shard),      GNUNET_PQ_query_param_end    }; @@ -5927,14 +5917,13 @@ postgres_mark_deposit_tiny (void *cls,   */  static enum GNUNET_DB_QueryStatus  postgres_mark_deposit_done (void *cls, -                            const struct TALER_MerchantPublicKeyP *merchant_pub, +                            const struct TALER_CoinSpendPublicKeyP *coin_pub,                              uint64_t rowid)  {    struct PostgresClosure *pg = cls; -  uint64_t deposit_shard = compute_shard (merchant_pub);    struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_auto_from_type (coin_pub),      GNUNET_PQ_query_param_uint64 (&rowid), -    GNUNET_PQ_query_param_uint64 (&deposit_shard),      GNUNET_PQ_query_param_end    }; @@ -6431,6 +6420,12 @@ postgres_insert_deposit (void *cls,      GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);      return qs;    } +  if (GNUNET_TIME_timestamp_cmp (deposit->wire_deadline, +                                 <, +                                 deposit->refund_deadline)) +  { +    GNUNET_break (0); +  }    {      uint64_t shard = compute_shard (&deposit->merchant_pub);      struct GNUNET_PQ_QueryParam params[] = { diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 012cac64..79b09e0e 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -2284,7 +2284,7 @@ run (void *cls)                           "test-2"));    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=            plugin->mark_deposit_tiny (plugin->cls, -                                     &deposit.merchant_pub, +                                     &deposit.coin.coin_pub,                                       deposit_rowid));    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=            plugin->get_ready_deposit (plugin->cls, @@ -2306,7 +2306,7 @@ run (void *cls)                           "test-3"));    FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=            plugin->mark_deposit_done (plugin->cls, -                                     &deposit.merchant_pub, +                                     &deposit.coin.coin_pub,                                       deposit_rowid));    FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=            plugin->commit (plugin->cls)); diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index cee50954..2a462aba 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -3060,13 +3060,13 @@ struct TALER_EXCHANGEDB_Plugin     * returned by @e iterate_ready_deposits()     *     * @param cls the @e cls of this struct with the plugin-specific state -   * @param merchant_pub identifies the beneficiary of the deposit +   * @param coin_pub identifies the coin of the deposit     * @param deposit_rowid identifies the deposit row to modify     * @return query result status     */    enum GNUNET_DB_QueryStatus    (*mark_deposit_tiny)(void *cls, -                       const struct TALER_MerchantPublicKeyP *merchant_pub, +                       const struct TALER_CoinSpendPublicKeyP *coin_pub,                         uint64_t rowid); @@ -3076,13 +3076,13 @@ struct TALER_EXCHANGEDB_Plugin     * @e iterate_ready_deposits() or @e iterate_matching_deposits().     *     * @param cls the @e cls of this struct with the plugin-specific state -   * @param merchant_pub identifies the beneficiary of the deposit +   * @param coin_pub identifies the coin of the deposit     * @param deposit_rowid identifies the deposit row to modify     * @return query result status     */    enum GNUNET_DB_QueryStatus    (*mark_deposit_done)(void *cls, -                       const struct TALER_MerchantPublicKeyP *merchant_pub, +                       const struct TALER_CoinSpendPublicKeyP *coin_pub,                         uint64_t rowid); | 
