diff --git a/src/exchange-tools/taler-exchange-dbinit.c b/src/exchange-tools/taler-exchange-dbinit.c index 9ec31afc1..cf35466ca 100644 --- a/src/exchange-tools/taler-exchange-dbinit.c +++ b/src/exchange-tools/taler-exchange-dbinit.c @@ -49,6 +49,15 @@ static int gc_db; */ static uint32_t num_partitions; +/** + * -F option: setup a sharded database, i.e. create foreign tables/server + */ +static uint32_t num_foreign_servers; + +/** + * -S option: setup a database on a shard server, creates tables with suffix shard_idx + */ +static uint32_t shard_idx; /** * Main function that will be run. @@ -85,6 +94,20 @@ run (void *cls, "Could not drop tables as requested. Either database was not yet initialized, or permission denied. Consult the logs. Will still try to create new tables.\n"); } } + if (1 < + shard_idx) + { + if (GNUNET_OK != plugin->create_shard_tables (plugin->cls, shard_idx)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not create shard database\n"); + global_ret = EXIT_NOTINSTALLED; + } + /* We do not want to continue if we are on a shard */ + TALER_EXCHANGEDB_plugin_unload (plugin); + plugin = NULL; + return; + } if (GNUNET_OK != plugin->create_tables (plugin->cls)) { @@ -113,6 +136,25 @@ run (void *cls, return; } } + else if (1 < + num_foreign_servers) + { + if (GNUNET_OK != plugin->setup_foreign_servers (plugin->cls, + num_foreign_servers)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not setup shards. Aborting\n"); + if (GNUNET_OK != plugin->drop_tables (plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not drop tables after failed shard setup, please delete the DB manually\n"); + } + TALER_EXCHANGEDB_plugin_unload (plugin); + plugin = NULL; + global_ret = EXIT_NOTINSTALLED; + return; + } + } if (gc_db || clear_shards) { if (GNUNET_OK != @@ -178,6 +220,16 @@ main (int argc, "NUMBER", "Setup a partitioned database where each table which can be partitioned holds NUMBER partitions on a single DB node (NOTE: this is different from sharding)", &num_partitions), + GNUNET_GETOPT_option_uint ('F', + "foreign", + "NUMBER", + "Setup a sharded database whit N foreign servers (shards) / tables", + &num_foreign_servers), + GNUNET_GETOPT_option_uint ('S', + "shard", + "INDEX", + "Setup a shard server, creates tables with INDEX as suffix", + &shard_idx), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; diff --git a/src/exchangedb/.gitignore b/src/exchangedb/.gitignore index aea9a74e4..b5d4ff3f7 100644 --- a/src/exchangedb/.gitignore +++ b/src/exchangedb/.gitignore @@ -5,3 +5,5 @@ test-exchangedb-postgres test-exchangedb-signkeys test-perf-taler-exchangedb bench-db-postgres +exchange-0001.sql +shard-0001.sql diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index 2eb1eb0ad..305de431c 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -19,9 +19,23 @@ sql_DATA = \ benchmark-0001.sql \ exchange-0000.sql \ exchange-0001.sql \ - partition-0001.sql \ + shard-0001.sql \ drop0001.sql +BUILT_SOURCES = \ + shard-0001.sql \ + exchange-0001.sql + +CLEANFILES = \ + shard-0001.sql \ + exchange-0001.sql + +exchange-0001.sql: common-0001.sql exchange-0001-part.sql + cat common-0001.sql exchange-0001-part.sql >$@ + +shard-0001.sql: common-0001.sql + cp common-0001.sql $@ + EXTRA_DIST = \ exchangedb.conf \ exchangedb-postgres.conf \ diff --git a/src/exchangedb/common-0001.sql b/src/exchangedb/common-0001.sql new file mode 100644 index 000000000..bf2a9925c --- /dev/null +++ b/src/exchangedb/common-0001.sql @@ -0,0 +1,1912 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2014--2022 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see +-- + +-- Everything in one big transaction +BEGIN; + +-- Check patch versioning is in place. +SELECT _v.register_patch('exchange-0001', NULL, NULL); + +-------------------- Tables ---------------------------- + +CREATE OR REPLACE FUNCTION create_partitioned_table( + IN table_definition VARCHAR + ,IN table_name VARCHAR + ,IN main_table_partition_str VARCHAR -- Used only when it is the main table - we do not partition shard tables + ,IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + IF shard_suffix IS NOT NULL THEN + table_name=table_name || '_' || shard_suffix; + main_table_partition_str = ''; + END IF; + + EXECUTE FORMAT( + table_definition, + table_name, + main_table_partition_str + ); + +END +$$; + +----------------------- wire_targets --------------------------- + +CREATE OR REPLACE FUNCTION create_table_wire_targets( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(wire_target_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',wire_target_h_payto BYTEA PRIMARY KEY CHECK (LENGTH(wire_target_h_payto)=32)' + ',payto_uri VARCHAR NOT NULL' + ',kyc_ok BOOLEAN NOT NULL DEFAULT (FALSE)' + ',external_id VARCHAR' + ') %s ;' + ,'wire_targets' + ,'PARTITION BY HASH (wire_target_h_payto)' + ,shard_suffix + ); + +END +$$; + +-- We need a seperate function for this, as we call create_table only once but need to add +-- those constraints to each partition which gets created +CREATE OR REPLACE FUNCTION add_constraints_to_wire_targets_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + + EXECUTE FORMAT ( + 'ALTER TABLE wire_targets_' || partition_suffix || ' ' + 'ADD CONSTRAINT wire_targets_' || partition_suffix || '_wire_target_serial_id_key ' + 'UNIQUE (wire_target_serial_id)' + ); +END +$$; + +------------------------ reserves ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_reserves( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'reserves'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' + ',reserve_pub BYTEA PRIMARY KEY CHECK(LENGTH(reserve_pub)=32)' + ',current_balance_val INT8 NOT NULL' + ',current_balance_frac INT4 NOT NULL' + ',expiration_date INT8 NOT NULL' + ',gc_date INT8 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (reserve_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_expiration_index ' + 'ON ' || table_name || ' ' + '(expiration_date' + ',current_balance_val' + ',current_balance_frac' + ');' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_expiration_index ' + 'IS ' || quote_literal('used in get_expired_reserves') || ';' + ); + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_uuid_index ' + 'ON ' || table_name || ' ' + '(reserve_uuid);' + ); + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_gc_date_index ' + 'ON ' || table_name || ' ' + '(gc_date);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_gc_date_index ' + 'IS ' || quote_literal('for reserve garbage collection') || ';' + ); + +END +$$; + +----------------------- reserves_in ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_reserves_in( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR default 'reserves_in'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_in_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',reserve_pub BYTEA PRIMARY KEY' -- REFERENCES reserves (reserve_pub) ON DELETE CASCADE' + ',wire_reference INT8 NOT NULL' + ',credit_val INT8 NOT NULL' + ',credit_frac INT4 NOT NULL' + ',wire_source_h_payto BYTEA CHECK (LENGTH(wire_source_h_payto)=32)' + ',exchange_account_section TEXT NOT NULL' + ',execution_date INT8 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (reserve_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_in_serial_id_index ' + 'ON ' || table_name || ' ' + '(reserve_in_serial_id);' + ); + -- FIXME: where do we need this index? Can we do better? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_exch_accnt_section_execution_date_idx ' + 'ON ' || table_name || ' ' + '(exchange_account_section ' + ',execution_date' + ');' + ); + -- FIXME: where do we need this index? Can we do better? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_exch_accnt_reserve_in_serial_id_idx ' + 'ON ' || table_name || ' ' + '(exchange_account_section,' + 'reserve_in_serial_id DESC' + ');' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_reserves_in_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE reserves_in_' || partition_suffix || ' ' + 'ADD CONSTRAINT reserves_in_' || partition_suffix || '_reserve_in_serial_id_key ' + 'UNIQUE (reserve_in_serial_id)' + ); +END +$$; + +--------------------------- reserves_close ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_reserves_close( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR default 'reserves_close'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(close_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE / PRIMARY KEY' + ',reserve_pub BYTEA NOT NULL' -- REFERENCES reserves (reserve_pub) ON DELETE CASCADE' + ',execution_date INT8 NOT NULL' + ',wtid BYTEA NOT NULL CHECK (LENGTH(wtid)=32)' + ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' + ',amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ',closing_fee_val INT8 NOT NULL' + ',closing_fee_frac INT4 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (reserve_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_close_uuid_index ' + 'ON ' || table_name || ' ' + '(close_uuid);' + ); + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_pub_index ' + 'ON ' || table_name || ' ' + '(reserve_pub);' + ); +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_reserves_close_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE reserves_close_' || partition_suffix || ' ' + 'ADD CONSTRAINT reserves_close_' || partition_suffix || '_close_uuid_pkey ' + 'PRIMARY KEY (close_uuid)' + ); +END +$$; + +---------------------------- reserves_out ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_reserves_out( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR default 'reserves_out'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_out_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',h_blind_ev BYTEA CHECK (LENGTH(h_blind_ev)=64) UNIQUE' + ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial)' + ',denom_sig BYTEA NOT NULL' + ',reserve_uuid INT8 NOT NULL' -- REFERENCES reserves (reserve_uuid) ON DELETE CASCADE' + ',reserve_sig BYTEA NOT NULL CHECK (LENGTH(reserve_sig)=64)' + ',execution_date INT8 NOT NULL' + ',amount_with_fee_val INT8 NOT NULL' + ',amount_with_fee_frac INT4 NOT NULL' + ') %s ;' + ,'reserves_out' + ,'PARTITION BY HASH (h_blind_ev)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_out_serial_id_index ' + 'ON ' || table_name || ' ' + '(reserve_out_serial_id);' + ); + -- FIXME: change query to use reserves_out_by_reserve instead and materialize execution_date there as well??? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_uuid_and_execution_date_index ' + 'ON ' || table_name || ' ' + '(reserve_uuid, execution_date);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_reserve_uuid_and_execution_date_index ' + 'IS ' || quote_literal('for get_reserves_out and exchange_do_withdraw_limit_check') || ';' + ); + +END +$$; + + +CREATE OR REPLACE FUNCTION add_constraints_to_reserves_out_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE reserves_out_' || partition_suffix || ' ' + 'ADD CONSTRAINT reserves_out_' || partition_suffix || '_reserve_out_serial_id_key ' + 'UNIQUE (reserve_out_serial_id)' + ); +END +$$; + +CREATE OR REPLACE FUNCTION create_table_reserves_out_by_reserve( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'reserves_out_by_reserve'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_uuid INT8 NOT NULL' -- REFERENCES reserves (reserve_uuid) ON DELETE CASCADE + ',h_blind_ev BYTEA CHECK (LENGTH(h_blind_ev)=64)' + ') %s ' + ,table_name + ,'PARTITION BY HASH (reserve_uuid)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' + 'ON ' || table_name || ' ' + '(reserve_uuid);' + ); + +END +$$; + +---------------------------- known_coins ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_known_coins( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR default 'known_coins'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(known_coin_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial) ON DELETE CASCADE' + ',coin_pub BYTEA NOT NULL PRIMARY KEY CHECK (LENGTH(coin_pub)=32)' + ',age_commitment_hash BYTEA CHECK (LENGTH(age_commitment_hash)=32)' + ',denom_sig BYTEA NOT NULL' + ',remaining_val INT8 NOT NULL' + ',remaining_frac INT4 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub)' -- FIXME: or include denominations_serial? or multi-level partitioning?; + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_known_coins_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE known_coins_' || partition_suffix || ' ' + 'ADD CONSTRAINT known_coins_' || partition_suffix || '_known_coin_id_key ' + 'UNIQUE (known_coin_id)' + ); +END +$$; + +---------------------------- refresh_commitments ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_refresh_commitments( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'refresh_commitments'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(melt_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',rc BYTEA PRIMARY KEY CHECK (LENGTH(rc)=64)' + ',old_coin_pub BYTEA NOT NULL' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE' + ',old_coin_sig BYTEA NOT NULL CHECK(LENGTH(old_coin_sig)=64)' + ',amount_with_fee_val INT8 NOT NULL' + ',amount_with_fee_frac INT4 NOT NULL' + ',noreveal_index INT4 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (rc)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + -- Note: index spans partitions, may need to be materialized. + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_old_coin_pub_index ' + 'ON ' || table_name || ' ' + '(old_coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_refresh_commitments_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE refresh_commitments_' || partition_suffix || ' ' + 'ADD CONSTRAINT refresh_commitments_' || partition_suffix || '_melt_serial_id_key ' + 'UNIQUE (melt_serial_id)' + ); +END +$$; + +------------------------------ refresh_revealed_coins -------------------------------- + +CREATE OR REPLACE FUNCTION create_table_refresh_revealed_coins( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'refresh_revealed_coins'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(rrc_serial BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',melt_serial_id INT8 NOT NULL' -- REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE' + ',freshcoin_index INT4 NOT NULL' + ',link_sig BYTEA NOT NULL CHECK(LENGTH(link_sig)=64)' + ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial) ON DELETE CASCADE' + ',coin_ev BYTEA NOT NULL' -- UNIQUE' + ',h_coin_ev BYTEA NOT NULL CHECK(LENGTH(h_coin_ev)=64)' -- UNIQUE' + ',ev_sig BYTEA NOT NULL' + ',ewv BYTEA NOT NULL' + -- ,PRIMARY KEY (melt_serial_id, freshcoin_index) -- done per shard + ') %s ;' + ,table_name + ,'PARTITION BY HASH (melt_serial_id)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_coins_by_melt_serial_id_index ' + 'ON ' || table_name || ' ' + '(melt_serial_id);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_refresh_revealed_coins_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE refresh_revealed_coins_' || partition_suffix || ' ' + 'ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_rrc_serial_key ' + 'UNIQUE (rrc_serial) ' + ',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_coin_ev_key ' + 'UNIQUE (coin_ev) ' + ',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_h_coin_ev_key ' + 'UNIQUE (h_coin_ev) ' + ',ADD PRIMARY KEY (melt_serial_id, freshcoin_index) ' + ); +END +$$; + +----------------------------- refresh_transfer_keys ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_refresh_transfer_keys( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'refresh_transfer_keys'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(rtc_serial BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',melt_serial_id INT8 PRIMARY KEY' -- REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE' + ',transfer_pub BYTEA NOT NULL CHECK(LENGTH(transfer_pub)=32)' + ',transfer_privs BYTEA NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (melt_serial_id)' + ,shard_suffix + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_refresh_transfer_keys_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE refresh_transfer_keys_' || partition_suffix || ' ' + 'ADD CONSTRAINT refresh_transfer_keys_' || partition_suffix || '_rtc_serial_key ' + 'UNIQUE (rtc_serial)' + ); +END +$$; + +---------------------------- deposits ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_deposits( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'deposits'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- PRIMARY KEY' + ',shard INT8 NOT NULL' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE + ',known_coin_id INT8 NOT NULL' -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE' --- FIXME: column needed??? + ',amount_with_fee_val INT8 NOT NULL' + ',amount_with_fee_frac INT4 NOT NULL' + ',wallet_timestamp INT8 NOT NULL' + ',exchange_timestamp INT8 NOT NULL' + ',refund_deadline INT8 NOT NULL' + ',wire_deadline INT8 NOT NULL' + ',merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)' + ',h_contract_terms BYTEA NOT NULL CHECK (LENGTH(h_contract_terms)=64)' + ',coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)' + ',wire_salt BYTEA NOT NULL CHECK (LENGTH(wire_salt)=16)' + ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' + ',done BOOLEAN NOT NULL DEFAULT FALSE' + ',extension_blocked BOOLEAN NOT NULL DEFAULT FALSE' + ',extension_details_serial_id INT8' -- REFERENCES extension_details (extension_details_serial_id) ON DELETE CASCADE' + ',UNIQUE (coin_pub, merchant_pub, h_contract_terms)' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' + 'ON ' || table_name || ' ' + '(coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_deposits_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE deposits_' || partition_suffix || ' ' + 'ADD CONSTRAINT deposits_' || partition_suffix || '_deposit_serial_id_pkey ' + 'PRIMARY KEY (deposit_serial_id)' + ); +END +$$; + +CREATE OR REPLACE FUNCTION create_table_deposits_by_ready( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'deposits_by_ready'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(wire_deadline INT8 NOT NULL' + ',shard INT8 NOT NULL' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' + ',deposit_serial_id INT8' + ') %s ;' + ,table_name + ,'PARTITION BY RANGE (wire_deadline)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' + 'ON ' || table_name || ' ' + '(wire_deadline ASC, shard ASC, coin_pub);' + ); + +END +$$; + + +CREATE OR REPLACE FUNCTION create_table_deposits_for_matching( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'deposits_for_matching'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(refund_deadline INT8 NOT NULL' + ',merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE + ',deposit_serial_id INT8' + ') %s ;' + ,table_name + ,'PARTITION BY RANGE (refund_deadline)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' + 'ON ' || table_name || ' ' + '(refund_deadline ASC, merchant_pub, coin_pub);' + ); + +END +$$; + +----------------------------- refunds ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_refunds( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'refunds'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(refund_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE + ',deposit_serial_id INT8 NOT NULL' -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE' + ',merchant_sig BYTEA NOT NULL CHECK(LENGTH(merchant_sig)=64)' + ',rtransaction_id INT8 NOT NULL' + ',amount_with_fee_val INT8 NOT NULL' + ',amount_with_fee_frac INT4 NOT NULL' + -- ,PRIMARY KEY (deposit_serial_id, rtransaction_id) -- done per shard! + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' + 'ON ' || table_name || ' ' + '(coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_refunds_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE refunds_' || partition_suffix || ' ' + 'ADD CONSTRAINT refunds_' || partition_suffix || '_refund_serial_id_key ' + 'UNIQUE (refund_serial_id) ' + ',ADD PRIMARY KEY (deposit_serial_id, rtransaction_id) ' + ); +END +$$; + +---------------------------- wire_out ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_wire_out( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'wire_out'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(wireout_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- PRIMARY KEY' + ',execution_date INT8 NOT NULL' + ',wtid_raw BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid_raw)=32)' + ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' + ',exchange_account_section TEXT NOT NULL' + ',amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (wtid_raw)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_wire_target_h_payto_index ' + 'ON ' || table_name || ' ' + '(wire_target_h_payto);' + ); + + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_wire_out_partition( + IN partition_suffix VARCHAR +) +RETURNS void +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE wire_out_' || partition_suffix || ' ' + 'ADD CONSTRAINT wire_out_' || partition_suffix || '_wireout_uuid_pkey ' + 'PRIMARY KEY (wireout_uuid)' + ); +END +$$; + +---------------------------- aggregation_transient ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_aggregation_transient( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'aggregation_transient'; +BEGIN + + EXECUTE FORMAT ( + 'CREATE TABLE IF NOT EXISTS %I ' + '(amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' + ',exchange_account_section TEXT NOT NULL' + ',wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (wire_target_h_payto)' + ,shard_suffix + ); + +END +$$; + +---------------------------- aggregation_tracking ------------------------------- + +CREATE OR REPLACE FUNCTION create_table_aggregation_tracking( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'aggregation_tracking'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',deposit_serial_id INT8 PRIMARY KEY' -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE' -- FIXME chnage to coint_pub + deposit_serial_id for more efficient depost -- or something else ??? + ',wtid_raw BYTEA NOT NULL' -- CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (deposit_serial_id)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_wtid_raw_index ' + 'ON ' || table_name || ' ' + '(wtid_raw);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_wtid_raw_index ' + 'IS ' || quote_literal('for lookup_transactions') || ';' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_aggregation_tracking_partition( + IN partition_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE aggregation_tracking_' || partition_suffix || ' ' + 'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key ' + 'UNIQUE (aggregation_serial_id) ' + ); +END +$$; + +----------------------------- recoup ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_recoup( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'recoup'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(recoup_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) + ',coin_sig BYTEA NOT NULL CHECK(LENGTH(coin_sig)=64)' + ',coin_blind BYTEA NOT NULL CHECK(LENGTH(coin_blind)=32)' + ',amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ',recoup_timestamp INT8 NOT NULL' + ',reserve_out_serial_id INT8 NOT NULL' -- REFERENCES reserves_out (reserve_out_serial_id) ON DELETE CASCADE' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub);' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' + 'ON ' || table_name || ' ' + '(coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_recoup_partition( + IN partition_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE recoup_' || partition_suffix || ' ' + 'ADD CONSTRAINT recoup_' || partition_suffix || '_recoup_uuid_key ' + 'UNIQUE (recoup_uuid) ' + ); +END +$$; + +CREATE OR REPLACE FUNCTION create_table_recoup_by_reserve( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'recoup_by_reserve'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(reserve_out_serial_id INT8 NOT NULL' -- REFERENCES reserves (reserve_out_serial_id) ON DELETE CASCADE + ',coin_pub BYTEA CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) + ') %s ;' + ,table_name + ,'PARTITION BY HASH (reserve_out_serial_id)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' + 'ON ' || table_name || ' ' + '(reserve_out_serial_id);' + ); + +END +$$; + +---------------------------- recoup_refresh ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_recoup_refresh( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'recoup_refresh'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(recoup_refresh_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) + ',known_coin_id BIGINT NOT NULL' -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE + ',coin_sig BYTEA NOT NULL CHECK(LENGTH(coin_sig)=64)' + ',coin_blind BYTEA NOT NULL CHECK(LENGTH(coin_blind)=32)' + ',amount_val INT8 NOT NULL' + ',amount_frac INT4 NOT NULL' + ',recoup_timestamp INT8 NOT NULL' + ',rrc_serial INT8 NOT NULL' -- REFERENCES refresh_revealed_coins (rrc_serial) ON DELETE CASCADE -- UNIQUE' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (coin_pub)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + -- FIXME: any query using this index will be slow. Materialize index or change query? + -- Also: which query uses this index? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_rrc_serial_index ' + 'ON ' || table_name || ' ' + '(rrc_serial);' + ); + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' + 'ON ' || table_name || ' ' + '(coin_pub);' + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_recoup_refresh_partition( + IN partition_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE recoup_refresh_' || partition_suffix || ' ' + 'ADD CONSTRAINT recoup_refresh_' || partition_suffix || '_recoup_refresh_uuid_key ' + 'UNIQUE (recoup_refresh_uuid) ' + ); +END +$$; + +----------------------------- prewire ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_prewire( + IN shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + table_name VARCHAR DEFAULT 'prewire'; +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(prewire_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY' + ',wire_method TEXT NOT NULL' + ',finished BOOLEAN NOT NULL DEFAULT false' + ',failed BOOLEAN NOT NULL DEFAULT false' + ',buf BYTEA NOT NULL' + ') %s ;' + ,table_name + ,'PARTITION BY HASH (prewire_uuid)' + ,shard_suffix + ); + + table_name = concat_ws('_', table_name, shard_suffix); + + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_finished_index ' + 'ON ' || table_name || ' ' + '(finished);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_finished_index ' + 'IS ' || quote_literal('for gc_prewire') || ';' + ); + -- FIXME: find a way to combine these two indices? + EXECUTE FORMAT ( + 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_failed_finished_index ' + 'ON ' || table_name || ' ' + '(failed,finished);' + ); + EXECUTE FORMAT ( + 'COMMENT ON INDEX ' || table_name || '_by_failed_finished_index ' + 'IS ' || quote_literal('for wire_prepare_data_get') || ';' + ); + +END +$$; + +----------------------------- cs_nonce_locks ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_cs_nonce_locks( + shard_suffix VARCHAR DEFAULT NULL +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + PERFORM create_partitioned_table( + 'CREATE TABLE IF NOT EXISTS %I' + '(cs_nonce_lock_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' + ',nonce BYTEA PRIMARY KEY CHECK (LENGTH(nonce)=32)' + ',op_hash BYTEA NOT NULL CHECK (LENGTH(op_hash)=64)' + ',max_denomination_serial INT8 NOT NULL' + ') %s ;' + ,'cs_nonce_locks' + ,'PARTITION BY HASH (nonce)' + ,shard_suffix + ); + +END +$$; + +CREATE OR REPLACE FUNCTION add_constraints_to_cs_nonce_locks_partition( + IN partition_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + EXECUTE FORMAT ( + 'ALTER TABLE cs_nonce_locks_' || partition_suffix || ' ' + 'ADD CONSTRAINT cs_nonce_locks_' || partition_suffix || '_cs_nonce_lock_serial_id_key ' + 'UNIQUE (cs_nonce_lock_serial_id)' + ); +END +$$; + +------------------------- Partitions ------------------------------ + +CREATE OR REPLACE FUNCTION create_table_partition( + source_table_name VARCHAR + ,modulus INTEGER + ,partition_num INTEGER + ) + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Creating partition %_%', source_table_name, partition_num; + + EXECUTE FORMAT( + 'CREATE TABLE IF NOT EXISTS %I ' + 'PARTITION OF %I ' + 'FOR VALUES WITH (MODULUS %s, REMAINDER %s)' + ,source_table_name || '_' || partition_num + ,source_table_name + ,modulus + ,partition_num-1 + ); + +END +$$; + +CREATE OR REPLACE FUNCTION detach_default_partitions() + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Detaching all default table partitions'; + + ALTER TABLE IF EXISTS wire_targets + DETACH PARTITION wire_targets_default; + + ALTER TABLE IF EXISTS reserves + DETACH PARTITION reserves_default; + + ALTER TABLE IF EXISTS reserves_in + DETACH PARTITION reserves_in_default; + + ALTER TABLE IF EXISTS reserves_close + DETACH PARTITION reserves_close_default; + + ALTER TABLE IF EXISTS reserves_out + DETACH PARTITION reserves_out_default; + + ALTER TABLE IF EXISTS reserves_out_by_reserve + DETACH PARTITION reserves_out_by_reserve_default; + + ALTER TABLE IF EXISTS known_coins + DETACH PARTITION known_coins_default; + + ALTER TABLE IF EXISTS refresh_commitments + DETACH PARTITION refresh_commitments_default; + + ALTER TABLE IF EXISTS refresh_revealed_coins + DETACH PARTITION refresh_revealed_coins_default; + + ALTER TABLE IF EXISTS refresh_transfer_keys + DETACH PARTITION refresh_transfer_keys_default; + + ALTER TABLE IF EXISTS deposits + DETACH PARTITION deposits_default; + +--- TODO range partitioning +-- ALTER TABLE IF EXISTS deposits_by_ready +-- DETACH PARTITION deposits_by_ready_default; +-- +-- ALTER TABLE IF EXISTS deposits_for_matching +-- DETACH PARTITION deposits_default_for_matching_default; + + ALTER TABLE IF EXISTS refunds + DETACH PARTITION refunds_default; + + ALTER TABLE IF EXISTS wire_out + DETACH PARTITION wire_out_default; + + ALTER TABLE IF EXISTS aggregation_transient + DETACH PARTITION aggregation_transient_default; + + ALTER TABLE IF EXISTS aggregation_tracking + DETACH PARTITION aggregation_tracking_default; + + ALTER TABLE IF EXISTS recoup + DETACH PARTITION recoup_default; + + ALTER TABLE IF EXISTS recoup_by_reserve + DETACH PARTITION recoup_by_reserve_default; + + ALTER TABLE IF EXISTS recoup_refresh + DETACH PARTITION recoup_refresh_default; + + ALTER TABLE IF EXISTS prewire + DETACH PARTITION prewire_default; + + ALTER TABLE IF EXISTS cs_nonce_locks + DETACH partition cs_nonce_locks_default; + +END +$$; + +COMMENT ON FUNCTION detach_default_partitions + IS 'We need to drop default and create new one before deleting the default partitions + otherwise constraints get lost too. Might be needed in shardig too'; + + +CREATE OR REPLACE FUNCTION drop_default_partitions() + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Dropping default table partitions'; + + DROP TABLE IF EXISTS wire_targets_default; + DROP TABLE IF EXISTS reserves_default; + DROP TABLE IF EXISTS reserves_in_default; + DROP TABLE IF EXISTS reserves_close_default; + DROP TABLE IF EXISTS reserves_out_default; + DROP TABLE IF EXISTS reserves_out_by_reserve_default; + DROP TABLE IF EXISTS known_coins_default; + DROP TABLE IF EXISTS refresh_commitments_default; + DROP TABLE IF EXISTS refresh_revealed_coins_default; + DROP TABLE IF EXISTS refresh_transfer_keys_default; + DROP TABLE IF EXISTS deposits_default; +--DROP TABLE IF EXISTS deposits_by_ready_default; +--DROP TABLE IF EXISTS deposits_for_matching_default; + DROP TABLE IF EXISTS refunds_default; + DROP TABLE IF EXISTS wire_out_default; + DROP TABLE IF EXISTS aggregation_transient_default; + DROP TABLE IF EXISTS aggregation_tracking_default; + DROP TABLE IF EXISTS recoup_default; + DROP TABLE IF EXISTS recoup_by_reserve_default; + DROP TABLE IF EXISTS recoup_refresh_default; + DROP TABLE IF EXISTS prewire_default; + DROP TABLE IF EXISTS cs_nonce_locks_default; + +END +$$; + +COMMENT ON FUNCTION drop_default_partitions + IS 'Drop all default partitions once other partitions are attached. + Might be needed in sharding too.'; + +CREATE OR REPLACE FUNCTION create_partitions( + num_partitions INTEGER +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + modulus INTEGER; +BEGIN + + modulus := num_partitions; + + PERFORM detach_default_partitions(); + + LOOP + + PERFORM create_table_partition( + 'wire_targets' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_wire_targets_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'reserves' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'reserves_in' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_reserves_in_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'reserves_close' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_reserves_close_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'reserves_out' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_reserves_out_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'reserves_out_by_reserve' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'known_coins' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_known_coins_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'refresh_commitments' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_refresh_commitments_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'refresh_revealed_coins' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_refresh_revealed_coins_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'refresh_transfer_keys' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_refresh_transfer_keys_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'deposits' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_deposits_partition(num_partitions::varchar); + +-- TODO: dynamically (!) creating/deleting deposits partitions: +-- create new partitions 'as needed', drop old ones once the aggregator has made +-- them empty; as 'new' deposits will always have deadlines in the future, this +-- would basically guarantee no conflict between aggregator and exchange service! +-- SEE also: https://www.cybertec-postgresql.com/en/automatic-partition-creation-in-postgresql/ +-- (article is slightly wrong, as this works:) +--CREATE TABLE tab ( +-- id bigint GENERATED ALWAYS AS IDENTITY, +-- ts timestamp NOT NULL, +-- data text +-- PARTITION BY LIST ((ts::date)); +-- CREATE TABLE tab_def PARTITION OF tab DEFAULT; +-- BEGIN +-- CREATE TABLE tab_part2 (LIKE tab); +-- insert into tab_part2 (id,ts, data) values (5,'2022-03-21', 'foo'); +-- alter table tab attach partition tab_part2 for values in ('2022-03-21'); +-- commit; +-- Naturally, to ensure this is actually 100% conflict-free, we'd +-- need to create tables at the granularity of the wire/refund deadlines; +-- that is right now configurable via AGGREGATOR_SHIFT option. + +-- FIXME: range partitioning +-- PERFORM create_table_partition( +-- 'deposits_by_ready' +-- ,modulus +-- ,num_partitions +-- ); +-- +-- PERFORM create_table_partition( +-- 'deposits_for_matching' +-- ,modulus +-- ,num_partitions +-- ); + + PERFORM create_table_partition( + 'refunds' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_refunds_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'wire_out' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_wire_out_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'aggregation_transient' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'aggregation_tracking' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_aggregation_tracking_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'recoup' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_recoup_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'recoup_by_reserve' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'recoup_refresh' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_recoup_refresh_partition(num_partitions::varchar); + + PERFORM create_table_partition( + 'prewire' + ,modulus + ,num_partitions + ); + + PERFORM create_table_partition( + 'cs_nonce_locks' + ,modulus + ,num_partitions + ); + PERFORM add_constraints_to_cs_nonce_locks_partition(num_partitions::varchar); + + num_partitions=num_partitions-1; + EXIT WHEN num_partitions=0; + + END LOOP; + + PERFORM drop_default_partitions(); + +END +$$; + +--------------------- Sharding --------------------------- + +---------------------- Shards ---------------------------- + +CREATE OR REPLACE FUNCTION setup_shard( + shard_suffix VARCHAR +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + PERFORM create_table_wire_targets(shard_suffix); + PERFORM add_constraints_to_wire_targets_partition(shard_suffix); + + PERFORM create_table_reserves(shard_suffix); + + PERFORM create_table_reserves_in(shard_suffix); + PERFORM add_constraints_to_reserves_in_partition(shard_suffix); + + PERFORM create_table_reserves_close(shard_suffix); + + PERFORM create_table_reserves_out(shard_suffix); + + PERFORM create_table_reserves_out_by_reserve(shard_suffix); + + PERFORM create_table_known_coins(shard_suffix); + PERFORM add_constraints_to_known_coins_partition(shard_suffix); + + PERFORM create_table_refresh_commitments(shard_suffix); + PERFORM add_constraints_to_refresh_commitments_partition(shard_suffix); + + PERFORM create_table_refresh_revealed_coins(shard_suffix); + PERFORM add_constraints_to_refresh_revealed_coins_partition(shard_suffix); + + PERFORM create_table_refresh_transfer_keys(shard_suffix); + PERFORM add_constraints_to_refresh_transfer_keys_partition(shard_suffix); + + PERFORM create_table_deposits(shard_suffix); + PERFORM add_constraints_to_deposits_partition(shard_suffix); + + PERFORM create_table_deposits_by_ready(shard_suffix); + + PERFORM create_table_deposits_for_matching(shard_suffix); + + PERFORM create_table_refunds(shard_suffix); + PERFORM add_constraints_to_refunds_partition(shard_suffix); + + PERFORM create_table_wire_out(shard_suffix); + PERFORM add_constraints_to_wire_out_partition(shard_suffix); + + PERFORM create_table_aggregation_transient(shard_suffix); + + PERFORM create_table_aggregation_tracking(shard_suffix); + PERFORM add_constraints_to_aggregation_tracking_partition(shard_suffix); + + PERFORM create_table_recoup(shard_suffix); + PERFORM add_constraints_to_recoup_partition(shard_suffix); + + PERFORM create_table_recoup_by_reserve(shard_suffix); + + PERFORM create_table_recoup_refresh(shard_suffix); + PERFORM add_constraints_to_recoup_refresh_partition(shard_suffix); + + PERFORM create_table_prewire(shard_suffix); + + PERFORM create_table_cs_nonce_locks(shard_suffix); + PERFORM add_constraints_to_cs_nonce_locks_partition(shard_suffix); + +END +$$; + +------------------------------ Master ---------------------------------- + +CREATE OR REPLACE FUNCTION create_foreign_table( + source_table_name VARCHAR + ,modulus INTEGER + ,shard_suffix VARCHAR + ,current_shard_num INTEGER + ,local_user VARCHAR DEFAULT 'taler-exchange-httpd' + ) + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Creating %_% on %', source_table_name, shard_suffix, shard_suffix; + + EXECUTE FORMAT( + 'CREATE FOREIGN TABLE IF NOT EXISTS %I ' + 'PARTITION OF %I ' + 'FOR VALUES WITH (MODULUS %s, REMAINDER %s) ' + 'SERVER %I' + ,source_table_name || '_' || shard_suffix + ,source_table_name + ,modulus + ,current_shard_num-1 + ,shard_suffix + ); + + EXECUTE FORMAT( + 'ALTER FOREIGN TABLE %I OWNER TO %L' + ,source_table_name || '_' || shard_suffix + ,local_user + ); + +END +$$; + +CREATE OR REPLACE FUNCTION prepare_sharding() +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + CREATE EXTENSION IF NOT EXISTS postgres_fdw; + + PERFORM detach_default_partitions(); + + ALTER TABLE IF EXISTS wire_targets + DROP CONSTRAINT IF EXISTS wire_targets_pkey CASCADE + ; + + ALTER TABLE IF EXISTS reserves + DROP CONSTRAINT IF EXISTS reserves_pkey CASCADE + ; + + ALTER TABLE IF EXISTS reserves_in + DROP CONSTRAINT IF EXISTS reserves_in_pkey CASCADE + ; + + ALTER TABLE IF EXISTS reserves_close + DROP CONSTRAINT IF EXISTS reserves_close_pkey CASCADE + ; + + ALTER TABLE IF EXISTS reserves_out + DROP CONSTRAINT IF EXISTS reserves_out_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS reserves_out_denominations_serial_fkey + ,DROP CONSTRAINT IF EXISTS reserves_out_h_blind_ev_key + ; + + ALTER TABLE IF EXISTS known_coins + DROP CONSTRAINT IF EXISTS known_coins_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS known_coins_denominations_serial_fkey + ; + + ALTER TABLE IF EXISTS refresh_commitments + DROP CONSTRAINT IF EXISTS refresh_commitments_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS refresh_old_coin_pub_fkey + ; + + ALTER TABLE IF EXISTS refresh_revealed_coins + DROP CONSTRAINT IF EXISTS refresh_revealed_coins_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS refresh_revealed_coins_denominations_serial_fkey + ; + + ALTER TABLE IF EXISTS refresh_transfer_keys + DROP CONSTRAINT IF EXISTS refresh_transfer_keys_pkey CASCADE + ; + + ALTER TABLE IF EXISTS deposits + DROP CONSTRAINT IF EXISTS deposits_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS deposits_extension_details_serial_id_fkey + ,DROP CONSTRAINT IF EXISTS deposits_shard_known_coin_id_merchant_pub_h_contract_terms_key CASCADE + ; + + ALTER TABLE IF EXISTS refunds + DROP CONSTRAINT IF EXISTS refunds_pkey CASCADE + ; + + ALTER TABLE IF EXISTS wire_out + DROP CONSTRAINT IF EXISTS wire_out_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS wire_out_wtid_raw_key CASCADE + ; + + ALTER TABLE IF EXISTS aggregation_tracking + DROP CONSTRAINT IF EXISTS aggregation_tracking_pkey CASCADE + ,DROP CONSTRAINT IF EXISTS aggregation_tracking_wtid_raw_fkey + ; + + ALTER TABLE IF EXISTS recoup + DROP CONSTRAINT IF EXISTS recoup_pkey CASCADE + ; + + ALTER TABLE IF EXISTS recoup_refresh + DROP CONSTRAINT IF EXISTS recoup_refresh_pkey CASCADE + ; + + ALTER TABLE IF EXISTS prewire + DROP CONSTRAINT IF EXISTS prewire_pkey CASCADE + ; + + ALTER TABLE IF EXISTS cs_nonce_locks + DROP CONSTRAINT IF EXISTS cs_nonce_locks_pkey CASCADE + ; + +END +$$; + + +CREATE OR REPLACE FUNCTION create_shard_server( + shard_suffix VARCHAR + ,total_num_shards INTEGER + ,current_shard_num INTEGER + ,remote_host VARCHAR + ,remote_user VARCHAR + ,remote_user_password VARCHAR + ,remote_db_name VARCHAR DEFAULT 'taler-exchange' + ,remote_port INTEGER DEFAULT '5432' + ,local_user VARCHAR DEFAULT 'taler-exchange-httpd' +) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + + RAISE NOTICE 'Creating server %s', remote_host; + + EXECUTE FORMAT( + 'CREATE SERVER IF NOT EXISTS %I ' + 'FOREIGN DATA WRAPPER postgres_fdw ' + 'OPTIONS (dbname %L, host %L, port %L)' + ,shard_suffix + ,remote_db_name + ,remote_host + ,remote_port + ); + + EXECUTE FORMAT( + 'CREATE USER MAPPING IF NOT EXISTS ' + 'FOR %s SERVER %I ' + 'OPTIONS (user %L, password %L)' + ,local_user + ,shard_suffix + ,remote_user + ,remote_user_password + ); + + EXECUTE FORMAT( + 'GRANT ALL PRIVILEGES ' + 'ON FOREIGN SERVER %I ' + 'TO %L;' + ,shard_suffix + ,local_user + ); + + PERFORM create_foreign_table( + 'wire_targets' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'reserves' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'reserves_in' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'reserves_out' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'reserves_close' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'known_coins' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'refresh_commitments' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'refresh_revealed_coins' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'refresh_transfer_keys' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'deposits' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); +-- PERFORM create_foreign_table( +-- 'deposits_by_ready' +-- ,total_num_shards +-- ,shard_suffix +-- ,current_shard_num +-- ,local_user +-- ); +-- PERFORM create_foreign_table( +-- 'deposits_for_matching' +-- ,total_num_shards +-- ,shard_suffix +-- ,current_shard_num +-- ,local_user +-- ); + PERFORM create_foreign_table( + 'refunds' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'wire_out' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'aggregation_tracking' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'recoup' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'recoup_by_reserve' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'reserves_out_by_reserve' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'recoup_refresh' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'prewire' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + PERFORM create_foreign_table( + 'cs_nonce_locks' + ,total_num_shards + ,shard_suffix + ,current_shard_num + ,local_user + ); + +END +$$; + +COMMENT ON FUNCTION create_shard_server + IS 'Create a shard server on the master + node with all foreign tables and user mappings'; + +CREATE OR REPLACE FUNCTION create_foreign_servers( + amount INTEGER + ,domain VARCHAR DEFAULT 'perf.taler' +) + RETURNS VOID + LANGUAGE plpgsql +AS $$ +BEGIN + + PERFORM prepare_sharding(); + + FOR i IN 1..amount LOOP + PERFORM create_shard_server( + i + ,amount + ,i + ,'shard-' || i::varchar || '.' || domain + ,'taler' + ,'taler' + ,'taler-exchange' + ,'5432' + ,'taler-exchange-httpd' + ); + END LOOP; +END +$$; + +COMMIT; \ No newline at end of file diff --git a/src/exchangedb/drop0001.sql b/src/exchangedb/drop0001.sql index 3f43a5693..364578515 100644 --- a/src/exchangedb/drop0001.sql +++ b/src/exchangedb/drop0001.sql @@ -43,43 +43,65 @@ DROP TABLE IF EXISTS wire_accounts CASCADE; DROP TABLE IF EXISTS signkey_revocations CASCADE; DROP TABLE IF EXISTS work_shards CASCADE; DROP TABLE IF EXISTS prewire CASCADE; +DROP FUNCTION IF EXISTS create_table_prewire; DROP TABLE IF EXISTS recoup CASCADE; +DROP FUNCTION IF EXISTS create_table_recoup; DROP FUNCTION IF EXISTS add_constraints_to_recoup_partition; DROP TABLE IF EXISTS recoup_refresh CASCADE; +DROP FUNCTION IF EXISTS create_table_recoup_refresh; DROP FUNCTION IF EXISTS add_constraints_to_recoup_refresh_partition; +DROP TABLE IF EXISTS aggregation_transient CASCADE; +DROP FUNCTION IF EXISTS create_table_aggregation_transient; DROP TABLE IF EXISTS aggregation_tracking CASCADE; +DROP FUNCTION IF EXISTS create_table_aggregation_tracking; DROP FUNCTION IF EXISTS add_constraints_to_aggregation_tracking_partition; DROP TABLE IF EXISTS wire_out CASCADE; +DROP FUNCTION IF EXISTS create_table_wire_out; DROP FUNCTION IF EXISTS add_constraints_to_wire_out_partition; DROP TABLE IF EXISTS wire_targets CASCADE; +DROP FUNCTION IF EXISTS create_table_wire_targets; DROP FUNCTION IF EXISTS add_constraints_to_wire_targets_partition; DROP TABLE IF EXISTS wire_fee CASCADE; DROP TABLE IF EXISTS deposits CASCADE; +DROP FUNCTION IF EXISTS create_table_deposits; DROP TABLE IF EXISTS deposits_by_ready CASCADE; +DROP FUNCTION IF EXISTS create_table_deposits_by_ready; DROP TABLE IF EXISTS deposits_for_matching CASCADE; +DROP FUNCTION IF EXISTS create_table_deposits_for_matching; DROP FUNCTION IF EXISTS add_constraints_to_deposits_partition; DROP TABLE IF EXISTS extension_details CASCADE; DROP TABLE IF EXISTS refunds CASCADE; +DROP FUNCTION IF EXISTS create_table_refunds; DROP FUNCTION IF EXISTS add_constraints_to_refunds_partition; DROP TABLE IF EXISTS refresh_commitments CASCADE; +DROP FUNCTION IF EXISTS create_table_refresh_commitments; DROP FUNCTION IF EXISTS add_constraints_to_refresh_commitments_partition; DROP TABLE IF EXISTS refresh_revealed_coins CASCADE; +DROP FUNCTION IF EXISTS create_table_refresh_revealed_coins; DROP FUNCTION IF EXISTS add_constraints_to_refresh_revealed_coins_partition; DROP TABLE IF EXISTS refresh_transfer_keys CASCADE; +DROP FUNCTION IF EXISTS create_table_refresh_transfer_keys; DROP FUNCTION IF EXISTS add_constraints_to_refresh_transfer_keys_partition; DROP TABLE IF EXISTS known_coins CASCADE; +DROP FUNCTION IF EXISTS create_table_known_coins; DROP FUNCTION IF EXISTS add_constraints_to_known_coins_partition; DROP TABLE IF EXISTS reserves_close CASCADE; +DROP FUNCTION IF EXISTS create_table_reserves_close; DROP FUNCTION IF EXISTS add_constraints_to_reserves_close_partition; DROP TABLE IF EXISTS reserves_out CASCADE; +DROP FUNCTION IF EXISTS create_table_reserves_out; DROP TABLE IF EXISTS reserves_out_by_reserve CASCADE; +DROP FUNCTION IF EXISTS create_table_reserves_out_by_reserve; DROP FUNCTION IF EXISTS add_constraints_to_reserves_out_partition; DROP TABLE IF EXISTS reserves_in CASCADE; +DROP FUNCTION IF EXISTS create_table_reserves_in; DROP FUNCTION IF EXISTS add_constraints_to_reserves_in_partition; DROP TABLE IF EXISTS reserves CASCADE; +DROP FUNCTION IF EXISTS create_table_reserves; DROP TABLE IF EXISTS denomination_revocations CASCADE; DROP TABLE IF EXISTS denominations CASCADE; DROP TABLE IF EXISTS cs_nonce_locks CASCADE; +DROP FUNCTION IF EXISTS create_table_cs_nonce_locks; DROP FUNCTION IF EXISTS add_constraints_to_cs_nonce_locks_partition; DROP TABLE IF EXISTS global_fee CASCADE; @@ -122,14 +144,14 @@ DROP FUNCTION IF EXISTS exchange_do_account_merge; DROP FUNCTION IF EXISTS exchange_do_history_request; DROP FUNCTION IF EXISTS exchange_do_close_request; --- Unregister patch (partition-0001.sql) --- SELECT _v.unregister_patch('partition-0001'); --- Drops for partition-0001.sql DROP FUNCTION IF EXISTS create_table_partition; DROP FUNCTION IF EXISTS create_partitions; DROP FUNCTION IF EXISTS detach_default_partitions; DROP FUNCTION IF EXISTS drop_default_partitions; +DROP FUNCTION IF EXISTS master_prepare_sharding; +DROP FUNCTION IF EXISTS create_foreign_table; +DROP FUNCTION IF EXISTS create_shard_server; -- And we're out of here... diff --git a/src/exchangedb/exchange-0001.sql b/src/exchangedb/exchange-0001-part.sql similarity index 81% rename from src/exchangedb/exchange-0001.sql rename to src/exchangedb/exchange-0001-part.sql index 2955663ac..359fe9f81 100644 --- a/src/exchangedb/exchange-0001.sql +++ b/src/exchangedb/exchange-0001-part.sql @@ -17,10 +17,6 @@ -- Everything in one big transaction BEGIN; --- Check patch versioning is in place. -SELECT _v.register_patch('exchange-0001', NULL, NULL); - - -- ------------------------------ denominations ---------------------------------------- CREATE TABLE IF NOT EXISTS denominations @@ -72,14 +68,8 @@ COMMENT ON TABLE denomination_revocations -- ------------------------------ wire_targets ---------------------------------------- -CREATE TABLE IF NOT EXISTS wire_targets - (wire_target_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,wire_target_h_payto BYTEA PRIMARY KEY CHECK (LENGTH(wire_target_h_payto)=32) - ,payto_uri VARCHAR NOT NULL - ,kyc_ok BOOLEAN NOT NULL DEFAULT (FALSE) - ,external_id VARCHAR - ) - PARTITION BY HASH (wire_target_h_payto); +SELECT create_table_wire_targets(); + COMMENT ON TABLE wire_targets IS 'All senders and recipients of money via the exchange'; COMMENT ON COLUMN wire_targets.payto_uri @@ -95,39 +85,12 @@ CREATE TABLE IF NOT EXISTS wire_targets_default PARTITION OF wire_targets FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_wire_targets_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE wire_targets_' || partition_suffix || ' ' - 'ADD CONSTRAINT wire_targets_' || partition_suffix || '_wire_target_serial_id_key ' - 'UNIQUE (wire_target_serial_id)' - ); -END -$$; - SELECT add_constraints_to_wire_targets_partition('default'); -CREATE INDEX IF NOT EXISTS wire_targets_serial_id_index - ON wire_targets - (wire_target_serial_id); - - -- ------------------------------ reserves ---------------------------------------- -CREATE TABLE IF NOT EXISTS reserves - (reserve_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY - ,reserve_pub BYTEA PRIMARY KEY CHECK(LENGTH(reserve_pub)=32) - ,current_balance_val INT8 NOT NULL - ,current_balance_frac INT4 NOT NULL - ,expiration_date INT8 NOT NULL - ,gc_date INT8 NOT NULL - ) - PARTITION BY HASH (reserve_pub); +SELECT create_table_reserves(); + COMMENT ON TABLE reserves IS 'Summarizes the balance of a reserve. Updated when new funds are added or withdrawn.'; COMMENT ON COLUMN reserves.reserve_pub @@ -138,40 +101,15 @@ COMMENT ON COLUMN reserves.expiration_date IS 'Used to trigger closing of reserves that have not been drained after some time'; COMMENT ON COLUMN reserves.gc_date IS 'Used to forget all information about a reserve during garbage collection'; + CREATE TABLE IF NOT EXISTS reserves_default PARTITION OF reserves FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE INDEX IF NOT EXISTS reserves_by_expiration_index - ON reserves - (expiration_date - ,current_balance_val - ,current_balance_frac - ); -COMMENT ON INDEX reserves_by_expiration_index - IS 'used in get_expired_reserves'; -CREATE INDEX IF NOT EXISTS reserves_by_reserve_uuid_index - ON reserves - (reserve_uuid); -CREATE INDEX IF NOT EXISTS reserves_by_gc_date_index - ON reserves - (gc_date); -COMMENT ON INDEX reserves_by_gc_date_index - IS 'for reserve garbage collection'; - -- ------------------------------ reserves_in ---------------------------------------- -CREATE TABLE IF NOT EXISTS reserves_in - (reserve_in_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,reserve_pub BYTEA PRIMARY KEY REFERENCES reserves (reserve_pub) ON DELETE CASCADE - ,wire_reference INT8 NOT NULL - ,credit_val INT8 NOT NULL - ,credit_frac INT4 NOT NULL - ,wire_source_h_payto BYTEA CHECK (LENGTH(wire_source_h_payto)=32) - ,exchange_account_section TEXT NOT NULL - ,execution_date INT8 NOT NULL - ) - PARTITION BY HASH (reserve_pub); +SELECT create_table_reserves_in(); + COMMENT ON TABLE reserves_in IS 'list of transfers of funds into the reserves, one per incoming wire transfer'; COMMENT ON COLUMN reserves_in.wire_source_h_payto @@ -185,101 +123,28 @@ CREATE TABLE IF NOT EXISTS reserves_in_default PARTITION OF reserves_in FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_reserves_in_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE reserves_in_' || partition_suffix || ' ' - 'ADD CONSTRAINT reserves_in_' || partition_suffix || '_reserve_in_serial_id_key ' - 'UNIQUE (reserve_in_serial_id)' - ); -END -$$; - SELECT add_constraints_to_reserves_in_partition('default'); -CREATE INDEX IF NOT EXISTS reserves_in_by_reserve_in_serial_id_index - ON reserves_in - (reserve_in_serial_id); --- FIXME: where do we need this index? Can we do better? -CREATE INDEX IF NOT EXISTS reserves_in_by_exchange_account_section_execution_date_index - ON reserves_in - (exchange_account_section - ,execution_date - ); --- FIXME: where do we need this index? Can we do better? -CREATE INDEX IF NOT EXISTS reserves_in_by_exchange_account_reserve_in_serial_id_index - ON reserves_in - (exchange_account_section - ,reserve_in_serial_id DESC - ); - - -- ------------------------------ reserves_close ---------------------------------------- -CREATE TABLE IF NOT EXISTS reserves_close - (close_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE / PRIMARY KEY - ,reserve_pub BYTEA NOT NULL REFERENCES reserves (reserve_pub) ON DELETE CASCADE - ,execution_date INT8 NOT NULL - ,wtid BYTEA NOT NULL CHECK (LENGTH(wtid)=32) - ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32) - ,amount_val INT8 NOT NULL - ,amount_frac INT4 NOT NULL - ,closing_fee_val INT8 NOT NULL - ,closing_fee_frac INT4 NOT NULL) - PARTITION BY HASH (reserve_pub); +SELECT create_table_reserves_close(); + COMMENT ON TABLE reserves_close IS 'wire transfers executed by the reserve to close reserves'; COMMENT ON COLUMN reserves_close.wire_target_h_payto IS 'Identifies the credited bank account (and KYC status). Note that closing does not depend on KYC.'; -CREATE INDEX IF NOT EXISTS reserves_close_by_close_uuid_index - ON reserves_close - (close_uuid); -CREATE INDEX IF NOT EXISTS reserves_close_by_reserve_pub_index - ON reserves_close - (reserve_pub); - CREATE TABLE IF NOT EXISTS reserves_close_default PARTITION OF reserves_close FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_reserves_close_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE reserves_close_' || partition_suffix || ' ' - 'ADD CONSTRAINT reserves_close_' || partition_suffix || '_close_uuid_pkey ' - 'PRIMARY KEY (close_uuid)' - ); -END -$$; - SELECT add_constraints_to_reserves_close_partition('default'); -- ------------------------------ reserves_out ---------------------------------------- -CREATE TABLE IF NOT EXISTS reserves_out - (reserve_out_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,h_blind_ev BYTEA CHECK (LENGTH(h_blind_ev)=64) UNIQUE - ,denominations_serial INT8 NOT NULL REFERENCES denominations (denominations_serial) - ,denom_sig BYTEA NOT NULL - ,reserve_uuid INT8 NOT NULL -- REFERENCES reserves (reserve_uuid) ON DELETE CASCADE - ,reserve_sig BYTEA NOT NULL CHECK (LENGTH(reserve_sig)=64) - ,execution_date INT8 NOT NULL - ,amount_with_fee_val INT8 NOT NULL - ,amount_with_fee_frac INT4 NOT NULL - ) - PARTITION BY HASH (h_blind_ev); +SELECT create_table_reserves_out(); + COMMENT ON TABLE reserves_out IS 'Withdraw operations performed on reserves.'; COMMENT ON COLUMN reserves_out.h_blind_ev @@ -287,50 +152,18 @@ COMMENT ON COLUMN reserves_out.h_blind_ev COMMENT ON COLUMN reserves_out.denominations_serial IS 'We do not CASCADE ON DELETE here, we may keep the denomination data alive'; -CREATE INDEX IF NOT EXISTS reserves_out_by_reserve_out_serial_id_index - ON reserves_out - (reserve_out_serial_id); --- FIXME: change query to use reserves_out_by_reserve instead and materialize execution_date there as well??? -CREATE INDEX IF NOT EXISTS reserves_out_by_reserve_uuid_and_execution_date_index - ON reserves_out - (reserve_uuid, execution_date); -COMMENT ON INDEX reserves_out_by_reserve_uuid_and_execution_date_index - IS 'for get_reserves_out and exchange_do_withdraw_limit_check'; - CREATE TABLE IF NOT EXISTS reserves_out_default PARTITION OF reserves_out FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_reserves_out_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE reserves_out_' || partition_suffix || ' ' - 'ADD CONSTRAINT reserves_out_' || partition_suffix || '_reserve_out_serial_id_key ' - 'UNIQUE (reserve_out_serial_id)' - ); -END -$$; - SELECT add_constraints_to_reserves_out_partition('default'); -CREATE TABLE IF NOT EXISTS reserves_out_by_reserve - (reserve_uuid INT8 NOT NULL -- REFERENCES reserves (reserve_uuid) ON DELETE CASCADE - ,h_blind_ev BYTEA CHECK (LENGTH(h_blind_ev)=64) - ) - PARTITION BY HASH (reserve_uuid); +SELECT create_table_reserves_out_by_reserve(); + COMMENT ON TABLE reserves_out_by_reserve IS 'Information in this table is strictly redundant with that of reserves_out, but saved by a different primary key for fast lookups by reserve public key/uuid.'; -CREATE INDEX IF NOT EXISTS reserves_out_by_reserve_main_index - ON reserves_out_by_reserve - (reserve_uuid); - CREATE TABLE IF NOT EXISTS reserves_out_by_reserve_default PARTITION OF reserves_out_by_reserve FOR VALUES WITH (MODULUS 1, REMAINDER 0); @@ -467,16 +300,8 @@ COMMENT ON COLUMN extensions.config -- ------------------------------ known_coins ---------------------------------------- -CREATE TABLE IF NOT EXISTS known_coins - (known_coin_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,denominations_serial INT8 NOT NULL REFERENCES denominations (denominations_serial) ON DELETE CASCADE - ,coin_pub BYTEA NOT NULL PRIMARY KEY CHECK (LENGTH(coin_pub)=32) - ,age_commitment_hash BYTEA CHECK (LENGTH(age_commitment_hash)=32) - ,denom_sig BYTEA NOT NULL - ,remaining_val INT8 NOT NULL - ,remaining_frac INT4 NOT NULL - ) - PARTITION BY HASH (coin_pub); +SELECT create_table_known_coins(); + COMMENT ON TABLE known_coins IS 'information about coins and their signatures, so we do not have to store the signatures more than once if a coin is involved in multiple operations'; COMMENT ON COLUMN known_coins.denominations_serial @@ -494,36 +319,13 @@ CREATE TABLE IF NOT EXISTS known_coins_default PARTITION OF known_coins FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_known_coins_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE known_coins_' || partition_suffix || ' ' - 'ADD CONSTRAINT known_coins_' || partition_suffix || 'k_nown_coin_id_key ' - 'UNIQUE (known_coin_id)' - ); -END -$$; - SELECT add_constraints_to_known_coins_partition('default'); -- ------------------------------ refresh_commitments ---------------------------------------- -CREATE TABLE IF NOT EXISTS refresh_commitments - (melt_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,rc BYTEA PRIMARY KEY CHECK (LENGTH(rc)=64) - ,old_coin_pub BYTEA NOT NULL REFERENCES known_coins (coin_pub) ON DELETE CASCADE - ,old_coin_sig BYTEA NOT NULL CHECK(LENGTH(old_coin_sig)=64) - ,amount_with_fee_val INT8 NOT NULL - ,amount_with_fee_frac INT4 NOT NULL - ,noreveal_index INT4 NOT NULL - ) - PARTITION BY HASH (rc); +SELECT create_table_refresh_commitments(); + COMMENT ON TABLE refresh_commitments IS 'Commitments made when melting coins and the gamma value chosen by the exchange.'; COMMENT ON COLUMN refresh_commitments.noreveal_index @@ -533,48 +335,17 @@ COMMENT ON COLUMN refresh_commitments.rc COMMENT ON COLUMN refresh_commitments.old_coin_pub IS 'Coin being melted in the refresh process.'; --- Note: index spans partitions, may need to be materialized. -CREATE INDEX IF NOT EXISTS refresh_commitments_by_old_coin_pub_index - ON refresh_commitments - (old_coin_pub); - CREATE TABLE IF NOT EXISTS refresh_commitments_default PARTITION OF refresh_commitments FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_refresh_commitments_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE refresh_commitments_' || partition_suffix || ' ' - 'ADD CONSTRAINT refresh_commitments_' || partition_suffix || '_melt_serial_id_key ' - 'UNIQUE (melt_serial_id)' - ); -END -$$; - SELECT add_constraints_to_refresh_commitments_partition('default'); -- ------------------------------ refresh_revealed_coins ---------------------------------------- -CREATE TABLE IF NOT EXISTS refresh_revealed_coins - (rrc_serial BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,melt_serial_id INT8 NOT NULL -- REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE - ,freshcoin_index INT4 NOT NULL - ,link_sig BYTEA NOT NULL CHECK(LENGTH(link_sig)=64) - ,denominations_serial INT8 NOT NULL REFERENCES denominations (denominations_serial) ON DELETE CASCADE - ,coin_ev BYTEA NOT NULL -- UNIQUE - ,h_coin_ev BYTEA NOT NULL CHECK(LENGTH(h_coin_ev)=64) -- UNIQUE - ,ev_sig BYTEA NOT NULL - ,ewv BYTEA NOT NULL - -- ,PRIMARY KEY (melt_serial_id, freshcoin_index) -- done per shard - ) - PARTITION BY HASH (melt_serial_id); +SELECT create_table_refresh_revealed_coins(); + COMMENT ON TABLE refresh_revealed_coins IS 'Revelations about the new coins that are to be created during a melting session.'; COMMENT ON COLUMN refresh_revealed_coins.rrc_serial @@ -592,46 +363,17 @@ COMMENT ON COLUMN refresh_revealed_coins.h_coin_ev COMMENT ON COLUMN refresh_revealed_coins.ev_sig IS 'exchange signature over the envelope'; -CREATE INDEX IF NOT EXISTS refresh_revealed_coins_by_melt_serial_id_index - ON refresh_revealed_coins - (melt_serial_id); - CREATE TABLE IF NOT EXISTS refresh_revealed_coins_default PARTITION OF refresh_revealed_coins FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_refresh_revealed_coins_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE refresh_revealed_coins_' || partition_suffix || ' ' - 'ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_rrc_serial_key ' - 'UNIQUE (rrc_serial) ' - ',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_coin_ev_key ' - 'UNIQUE (coin_ev) ' - ',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_h_coin_ev_key ' - 'UNIQUE (h_coin_ev) ' - ',ADD PRIMARY KEY (melt_serial_id, freshcoin_index) ' - ); -END -$$; - SELECT add_constraints_to_refresh_revealed_coins_partition('default'); -- ------------------------------ refresh_transfer_keys ---------------------------------------- -CREATE TABLE IF NOT EXISTS refresh_transfer_keys - (rtc_serial BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,melt_serial_id INT8 PRIMARY KEY -- REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE - ,transfer_pub BYTEA NOT NULL CHECK(LENGTH(transfer_pub)=32) - ,transfer_privs BYTEA NOT NULL - ) - PARTITION BY HASH (melt_serial_id); +SELECT create_table_refresh_transfer_keys(); + COMMENT ON TABLE refresh_transfer_keys IS 'Transfer keys of a refresh operation (the data revealed to the exchange).'; COMMENT ON COLUMN refresh_transfer_keys.rtc_serial @@ -647,21 +389,6 @@ CREATE TABLE IF NOT EXISTS refresh_transfer_keys_default PARTITION OF refresh_transfer_keys FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_refresh_transfer_keys_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE refresh_transfer_keys_' || partition_suffix || ' ' - 'ADD CONSTRAINT refresh_transfer_keys_' || partition_suffix || '_rtc_serial_key ' - 'UNIQUE (rtc_serial)' - ); -END -$$; - SELECT add_constraints_to_refresh_transfer_keys_partition('default'); @@ -683,28 +410,8 @@ CREATE TABLE IF NOT EXISTS extension_details_default -- ------------------------------ deposits ---------------------------------------- -CREATE TABLE IF NOT EXISTS deposits - (deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- PRIMARY KEY - ,shard INT8 NOT NULL - ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) ON DELETE CASCADE - ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE - ,amount_with_fee_val INT8 NOT NULL - ,amount_with_fee_frac INT4 NOT NULL - ,wallet_timestamp INT8 NOT NULL - ,exchange_timestamp INT8 NOT NULL - ,refund_deadline INT8 NOT NULL - ,wire_deadline INT8 NOT NULL - ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32) - ,h_contract_terms BYTEA NOT NULL CHECK (LENGTH(h_contract_terms)=64) - ,coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64) - ,wire_salt BYTEA NOT NULL CHECK (LENGTH(wire_salt)=16) - ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32) - ,done BOOLEAN NOT NULL DEFAULT FALSE - ,extension_blocked BOOLEAN NOT NULL DEFAULT FALSE - ,extension_details_serial_id INT8 REFERENCES extension_details (extension_details_serial_id) ON DELETE CASCADE - ,UNIQUE (coin_pub, merchant_pub, h_contract_terms) - ) - PARTITION BY HASH (coin_pub); +SELECT create_table_deposits(); + COMMENT ON TABLE deposits IS 'Deposits we have received and for which we need to make (aggregate) wire transfers (and manage refunds).'; COMMENT ON COLUMN deposits.shard @@ -722,65 +429,28 @@ COMMENT ON COLUMN deposits.extension_blocked COMMENT ON COLUMN deposits.extension_details_serial_id IS 'References extensions table, NULL if extensions are not used'; -CREATE INDEX IF NOT EXISTS deposits_by_coin_pub_index - ON deposits - (coin_pub); - CREATE TABLE IF NOT EXISTS deposits_default PARTITION OF deposits FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_deposits_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE deposits_' || partition_suffix || ' ' - 'ADD CONSTRAINT deposits_' || partition_suffix || '_deposit_serial_id_pkey ' - 'PRIMARY KEY (deposit_serial_id)' - ); -END -$$; - SELECT add_constraints_to_deposits_partition('default'); -CREATE TABLE IF NOT EXISTS deposits_by_ready - (wire_deadline INT8 NOT NULL - ,shard INT8 NOT NULL - ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) ON DELETE CASCADE - ,deposit_serial_id INT8 - ) - PARTITION BY RANGE (wire_deadline); +SELECT create_table_deposits_by_ready(); + COMMENT ON TABLE deposits_by_ready IS 'Enables fast lookups for deposits_get_ready, auto-populated via TRIGGER below'; -CREATE INDEX IF NOT EXISTS deposits_by_ready_main_index - ON deposits_by_ready - (wire_deadline ASC, shard ASC, coin_pub); - CREATE TABLE IF NOT EXISTS deposits_by_ready_default PARTITION OF deposits_by_ready DEFAULT; -CREATE TABLE IF NOT EXISTS deposits_for_matching - (refund_deadline INT8 NOT NULL - ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32) - ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) ON DELETE CASCADE - ,deposit_serial_id INT8 - ) - PARTITION BY RANGE (refund_deadline); +SELECT create_table_deposits_for_matching(); + COMMENT ON TABLE deposits_for_matching IS 'Enables fast lookups for deposits_iterate_matching, auto-populated via TRIGGER below'; -CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index - ON deposits_for_matching - (refund_deadline ASC, merchant_pub, coin_pub); - CREATE TABLE IF NOT EXISTS deposits_for_matching_default PARTITION OF deposits_for_matching DEFAULT; @@ -920,17 +590,8 @@ CREATE TRIGGER deposits_on_delete -- ------------------------------ refunds ---------------------------------------- -CREATE TABLE IF NOT EXISTS refunds - (refund_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) ON DELETE CASCADE - ,deposit_serial_id INT8 NOT NULL -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE - ,merchant_sig BYTEA NOT NULL CHECK(LENGTH(merchant_sig)=64) - ,rtransaction_id INT8 NOT NULL - ,amount_with_fee_val INT8 NOT NULL - ,amount_with_fee_frac INT4 NOT NULL - -- ,PRIMARY KEY (deposit_serial_id, rtransaction_id) -- done per shard! - ) - PARTITION BY HASH (coin_pub); +SELECT create_table_refunds(); + COMMENT ON TABLE refunds IS 'Data on coins that were refunded. Technically, refunds always apply against specific deposit operations involving a coin. The combination of coin_pub, merchant_pub, h_contract_terms and rtransaction_id MUST be unique, and we usually select by coin_pub so that one goes first.'; COMMENT ON COLUMN refunds.deposit_serial_id @@ -938,46 +599,17 @@ COMMENT ON COLUMN refunds.deposit_serial_id COMMENT ON COLUMN refunds.rtransaction_id IS 'used by the merchant to make refunds unique in case the same coin for the same deposit gets a subsequent (higher) refund'; -CREATE INDEX IF NOT EXISTS refunds_by_coin_pub_index - ON refunds - (coin_pub); - CREATE TABLE IF NOT EXISTS refunds_default PARTITION OF refunds FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_refunds_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE refunds_' || partition_suffix || ' ' - 'ADD CONSTRAINT refunds_' || partition_suffix || '_refund_serial_id_key ' - 'UNIQUE (refund_serial_id) ' - ',ADD PRIMARY KEY (deposit_serial_id, rtransaction_id) ' - ); -END -$$; - SELECT add_constraints_to_refunds_partition('default'); - -- ------------------------------ wire_out ---------------------------------------- -CREATE TABLE IF NOT EXISTS wire_out - (wireout_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY -- PRIMARY KEY - ,execution_date INT8 NOT NULL - ,wtid_raw BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid_raw)=32) - ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32) - ,exchange_account_section TEXT NOT NULL - ,amount_val INT8 NOT NULL - ,amount_frac INT4 NOT NULL - ) - PARTITION BY HASH (wtid_raw); +SELECT create_table_wire_out(); + COMMENT ON TABLE wire_out IS 'wire transfers the exchange has executed'; COMMENT ON COLUMN wire_out.exchange_account_section @@ -985,32 +617,10 @@ COMMENT ON COLUMN wire_out.exchange_account_section COMMENT ON COLUMN wire_out.wire_target_h_payto IS 'Identifies the credited bank account and KYC status'; -CREATE INDEX IF NOT EXISTS wire_out_by_wireout_uuid_index - ON wire_out - (wireout_uuid); -CREATE INDEX IF NOT EXISTS wire_out_by_wire_target_h_payto_index - ON wire_out - (wire_target_h_payto); - CREATE TABLE IF NOT EXISTS wire_out_default PARTITION OF wire_out FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_wire_out_partition( - IN partition_suffix VARCHAR -) -RETURNS void -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE wire_out_' || partition_suffix || ' ' - 'ADD CONSTRAINT wire_out_' || partition_suffix || '_wireout_uuid_pkey ' - 'PRIMARY KEY (wireout_uuid)' - ); -END -$$; - SELECT add_constraints_to_wire_out_partition('default'); CREATE OR REPLACE FUNCTION wire_out_delete_trigger() @@ -1034,14 +644,8 @@ CREATE TRIGGER wire_out_on_delete -- ------------------------------ aggregation_transient ---------------------------------------- -CREATE TABLE IF NOT EXISTS aggregation_transient - (amount_val INT8 NOT NULL - ,amount_frac INT4 NOT NULL - ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32) - ,exchange_account_section TEXT NOT NULL - ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32) - ) - PARTITION BY HASH (wire_target_h_payto); +SELECT create_table_aggregation_transient(); + COMMENT ON TABLE aggregation_transient IS 'aggregations currently happening (lacking wire_out, usually because the amount is too low); this table is not replicated'; COMMENT ON COLUMN aggregation_transient.amount_val @@ -1054,15 +658,10 @@ CREATE TABLE IF NOT EXISTS aggregation_transient_default FOR VALUES WITH (MODULUS 1, REMAINDER 0); - -- ------------------------------ aggregation_tracking ---------------------------------------- -CREATE TABLE IF NOT EXISTS aggregation_tracking - (aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,deposit_serial_id INT8 PRIMARY KEY -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE - ,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32) - ) - PARTITION BY HASH (deposit_serial_id); +SELECT create_table_aggregation_tracking(); + COMMENT ON TABLE aggregation_tracking IS 'mapping from wire transfer identifiers (WTID) to deposits (and back)'; COMMENT ON COLUMN aggregation_tracking.wtid_raw @@ -1072,29 +671,8 @@ CREATE TABLE IF NOT EXISTS aggregation_tracking_default PARTITION OF aggregation_tracking FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_aggregation_tracking_partition( - IN partition_suffix VARCHAR -) -RETURNS VOID -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE aggregation_tracking_' || partition_suffix || ' ' - 'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key ' - 'UNIQUE (aggregation_serial_id);' - ); -END -$$; - SELECT add_constraints_to_aggregation_tracking_partition('default'); -CREATE INDEX IF NOT EXISTS aggregation_tracking_by_wtid_raw_index - ON aggregation_tracking - (wtid_raw); -COMMENT ON INDEX aggregation_tracking_by_wtid_raw_index - IS 'for lookup_transactions'; - -- ------------------------------ wire_fee ---------------------------------------- @@ -1155,17 +733,8 @@ CREATE INDEX IF NOT EXISTS global_fee_by_end_date_index -- ------------------------------ recoup ---------------------------------------- -CREATE TABLE IF NOT EXISTS recoup - (recoup_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) - ,coin_sig BYTEA NOT NULL CHECK(LENGTH(coin_sig)=64) - ,coin_blind BYTEA NOT NULL CHECK(LENGTH(coin_blind)=32) - ,amount_val INT8 NOT NULL - ,amount_frac INT4 NOT NULL - ,recoup_timestamp INT8 NOT NULL - ,reserve_out_serial_id INT8 NOT NULL -- REFERENCES reserves_out (reserve_out_serial_id) ON DELETE CASCADE - ) - PARTITION BY HASH (coin_pub); +SELECT create_table_recoup(); + COMMENT ON TABLE recoup IS 'Information about recoups that were executed between a coin and a reserve. In this type of recoup, the amount is credited back to the reserve from which the coin originated.'; COMMENT ON COLUMN recoup.coin_pub @@ -1177,43 +746,18 @@ COMMENT ON COLUMN recoup.coin_sig COMMENT ON COLUMN recoup.coin_blind IS 'Denomination blinding key used when creating the blinded coin from the planchet. Secret revealed during the recoup to provide the linkage between the coin and the withdraw operation.'; -CREATE INDEX IF NOT EXISTS recoup_by_coin_pub_index - ON recoup - (coin_pub); - CREATE TABLE IF NOT EXISTS recoup_default PARTITION OF recoup FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_recoup_partition( - IN partition_suffix VARCHAR -) -RETURNS VOID -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE recoup_' || partition_suffix || ' ' - 'ADD CONSTRAINT recoup_' || partition_suffix || '_recoup_uuid_key ' - 'UNIQUE (recoup_uuid) ' - ); -END -$$; - SELECT add_constraints_to_recoup_partition('default'); -CREATE TABLE IF NOT EXISTS recoup_by_reserve - (reserve_out_serial_id INT8 NOT NULL -- REFERENCES reserves (reserve_out_serial_id) ON DELETE CASCADE - ,coin_pub BYTEA CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) - ) - PARTITION BY HASH (reserve_out_serial_id); + +SELECT create_table_recoup_by_reserve(); + COMMENT ON TABLE recoup_by_reserve IS 'Information in this table is strictly redundant with that of recoup, but saved by a different primary key for fast lookups by reserve_out_serial_id.'; -CREATE INDEX IF NOT EXISTS recoup_by_reserve_main_index - ON recoup_by_reserve - (reserve_out_serial_id); - CREATE TABLE IF NOT EXISTS recoup_by_reserve_default PARTITION OF recoup_by_reserve FOR VALUES WITH (MODULUS 1, REMAINDER 0); @@ -1260,18 +804,8 @@ CREATE TRIGGER recoup_on_delete -- ------------------------------ recoup_refresh ---------------------------------------- -CREATE TABLE IF NOT EXISTS recoup_refresh - (recoup_refresh_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) - ,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) - ,coin_sig BYTEA NOT NULL CHECK(LENGTH(coin_sig)=64) - ,coin_blind BYTEA NOT NULL CHECK(LENGTH(coin_blind)=32) - ,amount_val INT8 NOT NULL - ,amount_frac INT4 NOT NULL - ,recoup_timestamp INT8 NOT NULL - ,rrc_serial INT8 NOT NULL -- REFERENCES refresh_revealed_coins (rrc_serial) ON DELETE CASCADE -- UNIQUE - ) - PARTITION BY HASH (coin_pub); +SELECT create_table_recoup_refresh(); + COMMENT ON TABLE recoup_refresh IS 'Table of coins that originated from a refresh operation and that were recouped. Links the (fresh) coin to the melted operation (and thus the old coin). A recoup on a refreshed coin credits the old coin and debits the fresh coin.'; COMMENT ON COLUMN recoup_refresh.coin_pub @@ -1283,47 +817,17 @@ COMMENT ON COLUMN recoup_refresh.rrc_serial COMMENT ON COLUMN recoup_refresh.coin_blind IS 'Denomination blinding key used when creating the blinded coin from the planchet. Secret revealed during the recoup to provide the linkage between the coin and the refresh operation.'; -CREATE INDEX IF NOT EXISTS recoup_refresh_by_coin_pub_index - ON recoup_refresh - (coin_pub); --- FIXME: any query using this index will be slow. Materialize index or change query? --- Also: which query uses this index? -CREATE INDEX IF NOT EXISTS recoup_refresh_by_rrc_serial_index - ON recoup_refresh - (rrc_serial); - CREATE TABLE IF NOT EXISTS recoup_refresh_default PARTITION OF recoup_refresh FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_recoup_refresh_partition( - IN partition_suffix VARCHAR -) -RETURNS VOID -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE recoup_refresh_' || partition_suffix || ' ' - 'ADD CONSTRAINT recoup_refresh_' || partition_suffix || '_recoup_refresh_uuid_key ' - 'UNIQUE (recoup_refresh_uuid) ' - ); -END -$$; - SELECT add_constraints_to_recoup_refresh_partition('default'); -- ------------------------------ prewire ---------------------------------------- -CREATE TABLE IF NOT EXISTS prewire - (prewire_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY - ,wire_method TEXT NOT NULL - ,finished BOOLEAN NOT NULL DEFAULT false - ,failed BOOLEAN NOT NULL DEFAULT false - ,buf BYTEA NOT NULL - ) - PARTITION BY HASH (prewire_uuid); +SELECT create_table_prewire(); + COMMENT ON TABLE prewire IS 'pre-commit data for wire transfers we are about to execute'; COMMENT ON COLUMN prewire.failed @@ -1333,18 +837,6 @@ COMMENT ON COLUMN prewire.finished COMMENT ON COLUMN prewire.buf IS 'serialized data to send to the bank to execute the wire transfer'; -CREATE INDEX IF NOT EXISTS prewire_by_finished_index - ON prewire - (finished); -COMMENT ON INDEX prewire_by_finished_index - IS 'for gc_prewire'; --- FIXME: find a way to combine these two indices? -CREATE INDEX IF NOT EXISTS prewire_by_failed_finished_index - ON prewire - (failed,finished); -COMMENT ON INDEX prewire_by_failed_finished_index - IS 'for wire_prepare_data_get'; - CREATE TABLE IF NOT EXISTS prewire_default PARTITION OF prewire FOR VALUES WITH (MODULUS 1, REMAINDER 0); @@ -1374,13 +866,8 @@ COMMENT ON COLUMN wire_accounts.last_change -- ------------------------------ cs_nonce_locks ---------------------------------------- -CREATE TABLE IF NOT EXISTS cs_nonce_locks - (cs_nonce_lock_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE - ,nonce BYTEA PRIMARY KEY CHECK (LENGTH(nonce)=32) - ,op_hash BYTEA NOT NULL CHECK (LENGTH(op_hash)=64) - ,max_denomination_serial INT8 NOT NULL - ) - PARTITION BY HASH (nonce); +SELECT create_table_cs_nonce_locks(); + COMMENT ON TABLE cs_nonce_locks IS 'ensures a Clause Schnorr client nonce is locked for use with an operation identified by a hash'; COMMENT ON COLUMN cs_nonce_locks.nonce @@ -1394,21 +881,6 @@ CREATE TABLE IF NOT EXISTS cs_nonce_locks_default PARTITION OF cs_nonce_locks FOR VALUES WITH (MODULUS 1, REMAINDER 0); -CREATE OR REPLACE FUNCTION add_constraints_to_cs_nonce_locks_partition( - IN partition_suffix VARCHAR -) -RETURNS VOID -LANGUAGE plpgsql -AS $$ -BEGIN - EXECUTE FORMAT ( - 'ALTER TABLE cs_nonce_locks_' || partition_suffix || ' ' - 'ADD CONSTRAINT cs_nonce_locks_' || partition_suffix || '_cs_nonce_lock_serial_id_key ' - 'UNIQUE (cs_nonce_lock_serial_id)' - ); -END -$$; - SELECT add_constraints_to_cs_nonce_locks_partition('default'); diff --git a/src/exchangedb/partition-0001.sql b/src/exchangedb/partition-0001.sql deleted file mode 100644 index ba3267988..000000000 --- a/src/exchangedb/partition-0001.sql +++ /dev/null @@ -1,312 +0,0 @@ --- --- This file is part of TALER --- Copyright (C) 2014--2022 Taler Systems SA --- --- TALER is free software; you can redistribute it and/or modify it under the --- terms of the GNU General Public License as published by the Free Software --- Foundation; either version 3, or (at your option) any later version. --- --- TALER is distributed in the hope that it will be useful, but WITHOUT ANY --- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR --- A PARTICULAR PURPOSE. See the GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License along with --- TALER; see the file COPYING. If not, see --- - --- Everything in one big transaction -BEGIN; - --- Check patch versioning is in place. --- SELECT _v.register_patch('partition-0001', NULL, NULL); - -CREATE OR REPLACE FUNCTION create_table_partition( - source_table_name VARCHAR - ,modulus INTEGER - ,partition_num INTEGER - ) - RETURNS VOID - LANGUAGE plpgsql -AS $$ -BEGIN - - RAISE NOTICE 'Creating partition %_%', source_table_name, partition_num; - - EXECUTE FORMAT( - 'CREATE TABLE IF NOT EXISTS %I ' - 'PARTITION OF %I ' - 'FOR VALUES WITH (MODULUS %s, REMAINDER %s)' - ,source_table_name || '_' || partition_num - ,source_table_name - ,modulus - ,partition_num-1 - ); - -END -$$; - -CREATE OR REPLACE FUNCTION detach_default_partitions() - RETURNS VOID - LANGUAGE plpgsql -AS $$ -BEGIN - - RAISE NOTICE 'Detaching all default table partitions'; - - ALTER TABLE IF EXISTS wire_targets - DETACH PARTITION wire_targets_default; - - ALTER TABLE IF EXISTS reserves - DETACH PARTITION reserves_default; - - ALTER TABLE IF EXISTS reserves_in - DETACH PARTITION reserves_in_default; - - ALTER TABLE IF EXISTS reserves_close - DETACH PARTITION reserves_close_default; - - ALTER TABLE IF EXISTS reserves_out - DETACH PARTITION reserves_out_default; - - ALTER TABLE IF EXISTS known_coins - DETACH PARTITION known_coins_default; - - ALTER TABLE IF EXISTS refresh_commitments - DETACH PARTITION refresh_commitments_default; - - ALTER TABLE IF EXISTS refresh_revealed_coins - DETACH PARTITION refresh_revealed_coins_default; - - ALTER TABLE IF EXISTS refresh_transfer_keys - DETACH PARTITION refresh_transfer_keys_default; - - ALTER TABLE IF EXISTS deposits - DETACH PARTITION deposits_default; - - ALTER TABLE IF EXISTS refunds - DETACH PARTITION refunds_default; - - ALTER TABLE IF EXISTS wire_out - DETACH PARTITION wire_out_default; - - ALTER TABLE IF EXISTS aggregation_tracking - DETACH PARTITION aggregation_tracking_default; - - ALTER TABLE IF EXISTS recoup - DETACH PARTITION recoup_default; - - ALTER TABLE IF EXISTS recoup_refresh - DETACH PARTITION recoup_refresh_default; - - ALTER TABLE IF EXISTS prewire - DETACH PARTITION prewire_default; - - ALTER TABLE IF EXISTS cs_nonce_locks - DETACH partition cs_nonce_locks_default; - -END -$$; - -COMMENT ON FUNCTION detach_default_partitions - IS 'We need to drop default and create new one before deleting the default partitions - otherwise constraints get lost too. Might be needed in shardig too'; - - -CREATE OR REPLACE FUNCTION drop_default_partitions() - RETURNS VOID - LANGUAGE plpgsql -AS $$ -BEGIN - - RAISE NOTICE 'Dropping default table partitions'; - - DROP TABLE IF EXISTS wire_targets_default; - DROP TABLE IF EXISTS reserves_default; - DROP TABLE IF EXISTS reserves_in_default; - DROP TABLE IF EXISTS reserves_close_default; - DROP TABLE IF EXISTS reserves_out_default; - DROP TABLE IF EXISTS known_coins_default; - DROP TABLE IF EXISTS refresh_commitments_default; - DROP TABLE IF EXISTS refresh_revealed_coins_default; - DROP TABLE IF EXISTS refresh_transfer_keys_default; - DROP TABLE IF EXISTS deposits_default; - DROP TABLE IF EXISTS refunds_default; - DROP TABLE IF EXISTS wire_out_default; - DROP TABLE IF EXISTS aggregation_tracking_default; - DROP TABLE IF EXISTS recoup_default; - DROP TABLE IF EXISTS recoup_refresh_default; - DROP TABLE IF EXISTS prewire_default; - DROP TABLE IF EXISTS cs_nonce_locks_default; - -END -$$; - -COMMENT ON FUNCTION drop_default_partitions - IS 'Drop all default partitions once other partitions are attached. - Might be needed in sharding too.'; - -CREATE OR REPLACE FUNCTION create_partitions( - num_partitions INTEGER -) -RETURNS VOID -LANGUAGE plpgsql -AS $$ -DECLARE - modulus INTEGER; -BEGIN - - modulus := num_partitions; - - PERFORM detach_default_partitions(); - - LOOP - PERFORM create_table_partition( - 'wire_targets' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_wire_targets_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'reserves' - ,modulus - ,num_partitions - ); - - PERFORM create_table_partition( - 'reserves_in' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_reserves_in_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'reserves_close' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_reserves_close_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'reserves_out' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_reserves_out_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'known_coins' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_known_coins_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'refresh_commitments' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_refresh_commitments_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'refresh_revealed_coins' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_refresh_revealed_coins_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'refresh_transfer_keys' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_refresh_transfer_keys_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'deposits' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_deposits_partition(num_partitions::varchar); - --- TODO: dynamically (!) creating/deleting deposits partitions: --- create new partitions 'as needed', drop old ones once the aggregator has made --- them empty; as 'new' deposits will always have deadlines in the future, this --- would basically guarantee no conflict between aggregator and exchange service! --- SEE also: https://www.cybertec-postgresql.com/en/automatic-partition-creation-in-postgresql/ --- (article is slightly wrong, as this works:) ---CREATE TABLE tab ( --- id bigint GENERATED ALWAYS AS IDENTITY, --- ts timestamp NOT NULL, --- data text --- PARTITION BY LIST ((ts::date)); --- CREATE TABLE tab_def PARTITION OF tab DEFAULT; --- BEGIN --- CREATE TABLE tab_part2 (LIKE tab); --- insert into tab_part2 (id,ts, data) values (5,'2022-03-21', 'foo'); --- alter table tab attach partition tab_part2 for values in ('2022-03-21'); --- commit; --- Naturally, to ensure this is actually 100% conflict-free, we'd --- need to create tables at the granularity of the wire/refund deadlines; --- that is right now configurable via AGGREGATOR_SHIFT option. - - - PERFORM create_table_partition( - 'refunds' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_refunds_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'wire_out' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_wire_out_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'aggregation_tracking' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_aggregation_tracking_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'recoup' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_recoup_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'recoup_refresh' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_recoup_refresh_partition(num_partitions::varchar); - - PERFORM create_table_partition( - 'prewire' - ,modulus - ,num_partitions - ); - - PERFORM create_table_partition( - 'cs_nonce_locks' - ,modulus - ,num_partitions - ); - PERFORM add_constraints_to_cs_nonce_locks_partition(num_partitions::varchar); - - num_partitions=num_partitions-1; - EXIT WHEN num_partitions=0; - - END LOOP; - - PERFORM drop_default_partitions(); - -END -$$; - -COMMIT; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 8f0f5e22c..1d14cb730 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -198,6 +198,49 @@ postgres_create_tables (void *cls) } +/** + * Create tables of a shard node with index idx + * + * @param cls the `struct PostgresClosure` with the plugin-specific state + * @param idx the shards index, will be appended as suffix to all tables + * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure + */ +static enum GNUNET_GenericReturnValue +postgres_create_shard_tables (void *cls, + uint32_t idx) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_Context *conn; + enum GNUNET_GenericReturnValue ret; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint32 (&idx), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_PreparedStatement ps[] = { + GNUNET_PQ_make_prepare ("create_shard_tables", + "SELECT" + " setup_shard" + " ($1);", + 1), + GNUNET_PQ_PREPARED_STATEMENT_END + }; + + conn = GNUNET_PQ_connect_with_cfg (pg->cfg, + "exchangedb-postgres", + "shard-", + NULL, + ps); + if (NULL == conn) + return GNUNET_SYSERR; + if (0 > GNUNET_PQ_eval_prepared_non_select (conn, + "create_shard_tables", + params)) + ret = GNUNET_SYSERR; + GNUNET_PQ_disconnect (conn); + return ret; +} + + /** * Setup partitions of already existing tables * @@ -207,7 +250,7 @@ postgres_create_tables (void *cls) */ static enum GNUNET_GenericReturnValue postgres_setup_partitions (void *cls, - const uint32_t num) + uint32_t num) { struct PostgresClosure *pg = cls; struct GNUNET_PQ_Context *conn; @@ -227,7 +270,7 @@ postgres_setup_partitions (void *cls, conn = GNUNET_PQ_connect_with_cfg (pg->cfg, "exchangedb-postgres", - "partition-", + NULL, NULL, ps); if (NULL == conn) @@ -242,6 +285,49 @@ postgres_setup_partitions (void *cls, } +/** + * Setup foreign servers (shards) for already existing tables + * + * @param cls the `struct PostgresClosure` with the plugin-specific state + * @param num the number of foreign servers (shards) to create for each partitioned table + * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure + */ +static enum GNUNET_GenericReturnValue +postgres_setup_foreign_servers (void *cls, + uint32_t num) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_Context *conn; + enum GNUNET_GenericReturnValue ret; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint32 (&num), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_PreparedStatement ps[] = { + GNUNET_PQ_make_prepare ("create_foreign_servers", + "SELECT" + " create_foreign_servers" + " ($1);", + 1), + GNUNET_PQ_PREPARED_STATEMENT_END + }; + + conn = GNUNET_PQ_connect_with_cfg (pg->cfg, + "exchangedb-postgres", + NULL, + NULL, + ps); + if (NULL == conn) + return GNUNET_SYSERR; + if (0 > GNUNET_PQ_eval_prepared_non_select (conn, + "create_foreign_servers", + params)) + ret = GNUNET_SYSERR; + GNUNET_PQ_disconnect (conn); + return ret; +} + + /** * Initialize prepared statements for @a pg. * @@ -13055,7 +13141,9 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) plugin->cls = pg; plugin->drop_tables = &postgres_drop_tables; plugin->create_tables = &postgres_create_tables; + plugin->create_shard_tables = &postgres_create_shard_tables; plugin->setup_partitions = &postgres_setup_partitions; + plugin->setup_foreign_servers = &postgres_setup_foreign_servers; plugin->start = &postgres_start; plugin->start_read_committed = &postgres_start_read_committed; plugin->commit = &postgres_commit; diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 488ffd519..7383913a3 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -2254,6 +2254,17 @@ struct TALER_EXCHANGEDB_Plugin enum GNUNET_GenericReturnValue (*create_tables)(void *cls); + /** + * Initialize the database of a shard node + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param idx the current shard index, will be appended to tables as suffix + * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure + */ + enum GNUNET_GenericReturnValue + (*create_shard_tables)(void *cls, + uint32_t idx); + /** * Change already present tables of the database to num partitions * Only has an effect if there are default partitions only @@ -2264,7 +2275,23 @@ struct TALER_EXCHANGEDB_Plugin */ enum GNUNET_GenericReturnValue (*setup_partitions)(void *cls, - const uint32_t num); + uint32_t num); + + /** + * Change already present tables of the database to num foreign tables on + * num foreign servers (shards). + * Only has an effect if there are default partitions only + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param num the number of shard servers to create. The shard servers + * must follow the numbering of 1-N, have the same user as + * the master and have tables named _n where n is the same + * as the servers index of N. + * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure + */ + enum GNUNET_GenericReturnValue + (*setup_foreign_servers)(void *cls, + uint32_t num); /** * Start a transaction.