-- -- This file is part of TALER -- Copyright (C) 2014--2022 Taler Systems SA -- -- TALER is free software; you can redistribute it and/or modify it under the -- terms of the GNU General Public License as published by the Free Software -- Foundation; either version 3, or (at your option) any later version. -- -- TALER is distributed in the hope that it will be useful, but WITHOUT ANY -- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR -- A PARTICULAR PURPOSE. See the GNU General Public License for more details. -- -- You should have received a copy of the GNU General Public License along with -- TALER; see the file COPYING. If not, see -- -- Everything in one big transaction BEGIN; -------------------- Tables ---------------------------- CREATE OR REPLACE FUNCTION create_partitioned_table( IN table_definition VARCHAR ,IN table_name VARCHAR ,IN main_table_partition_str VARCHAR -- Used only when it is the main table - we do not partition shard tables ,IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN IF shard_suffix IS NOT NULL THEN table_name=table_name || '_' || shard_suffix; main_table_partition_str = ''; END IF; EXECUTE FORMAT( table_definition, table_name, main_table_partition_str ); END $$; ----------------------- wire_targets --------------------------- CREATE OR REPLACE FUNCTION create_table_wire_targets( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(wire_target_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',wire_target_h_payto BYTEA PRIMARY KEY CHECK (LENGTH(wire_target_h_payto)=32)' ',payto_uri VARCHAR NOT NULL' ',kyc_ok BOOLEAN NOT NULL DEFAULT (FALSE)' ',external_id VARCHAR' ') %s ;' ,'wire_targets' ,'PARTITION BY HASH (wire_target_h_payto)' ,shard_suffix ); END $$; -- We need a seperate function for this, as we call create_table only once but need to add -- those constraints to each partition which gets created CREATE OR REPLACE FUNCTION add_constraints_to_wire_targets_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE wire_targets_' || partition_suffix || ' ' 'ADD CONSTRAINT wire_targets_' || partition_suffix || '_wire_target_serial_id_key ' 'UNIQUE (wire_target_serial_id)' ); END $$; ------------------------ reserves ------------------------------- CREATE OR REPLACE FUNCTION create_table_reserves( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'reserves'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(reserve_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' ',reserve_pub BYTEA PRIMARY KEY CHECK(LENGTH(reserve_pub)=32)' ',current_balance_val INT8 NOT NULL' ',current_balance_frac INT4 NOT NULL' ',expiration_date INT8 NOT NULL' ',gc_date INT8 NOT NULL' ') %s ;' ,table_name ,'PARTITION BY HASH (reserve_pub)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_expiration_index ' 'ON ' || table_name || ' ' '(expiration_date' ',current_balance_val' ',current_balance_frac' ');' ); EXECUTE FORMAT ( 'COMMENT ON INDEX ' || table_name || '_by_expiration_index ' 'IS ' || quote_literal('used in get_expired_reserves') || ';' ); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_uuid_index ' 'ON ' || table_name || ' ' '(reserve_uuid);' ); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_gc_date_index ' 'ON ' || table_name || ' ' '(gc_date);' ); EXECUTE FORMAT ( 'COMMENT ON INDEX ' || table_name || '_by_gc_date_index ' 'IS ' || quote_literal('for reserve garbage collection') || ';' ); END $$; ----------------------- reserves_in ------------------------------ CREATE OR REPLACE FUNCTION create_table_reserves_in( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR default 'reserves_in'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(reserve_in_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',reserve_pub BYTEA PRIMARY KEY' -- REFERENCES reserves (reserve_pub) ON DELETE CASCADE' ',wire_reference INT8 NOT NULL' ',credit_val INT8 NOT NULL' ',credit_frac INT4 NOT NULL' ',wire_source_h_payto BYTEA CHECK (LENGTH(wire_source_h_payto)=32)' ',exchange_account_section TEXT NOT NULL' ',execution_date INT8 NOT NULL' ') %s ;' ,table_name ,'PARTITION BY HASH (reserve_pub)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_in_serial_id_index ' 'ON ' || table_name || ' ' '(reserve_in_serial_id);' ); -- FIXME: where do we need this index? Can we do better? EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_exch_accnt_section_execution_date_idx ' 'ON ' || table_name || ' ' '(exchange_account_section ' ',execution_date' ');' ); -- FIXME: where do we need this index? Can we do better? EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_exch_accnt_reserve_in_serial_id_idx ' 'ON ' || table_name || ' ' '(exchange_account_section,' 'reserve_in_serial_id DESC' ');' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_reserves_in_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE reserves_in_' || partition_suffix || ' ' 'ADD CONSTRAINT reserves_in_' || partition_suffix || '_reserve_in_serial_id_key ' 'UNIQUE (reserve_in_serial_id)' ); END $$; --------------------------- reserves_close ------------------------------- CREATE OR REPLACE FUNCTION create_table_reserves_close( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR default 'reserves_close'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(close_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE / PRIMARY KEY' ',reserve_pub BYTEA NOT NULL' -- REFERENCES reserves (reserve_pub) ON DELETE CASCADE' ',execution_date INT8 NOT NULL' ',wtid BYTEA NOT NULL CHECK (LENGTH(wtid)=32)' ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' ',amount_val INT8 NOT NULL' ',amount_frac INT4 NOT NULL' ',closing_fee_val INT8 NOT NULL' ',closing_fee_frac INT4 NOT NULL' ') %s ;' ,table_name ,'PARTITION BY HASH (reserve_pub)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_close_uuid_index ' 'ON ' || table_name || ' ' '(close_uuid);' ); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_pub_index ' 'ON ' || table_name || ' ' '(reserve_pub);' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_reserves_close_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE reserves_close_' || partition_suffix || ' ' 'ADD CONSTRAINT reserves_close_' || partition_suffix || '_close_uuid_pkey ' 'PRIMARY KEY (close_uuid)' ); END $$; ---------------------------- reserves_out ------------------------------- CREATE OR REPLACE FUNCTION create_table_reserves_out( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR default 'reserves_out'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(reserve_out_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',h_blind_ev BYTEA CHECK (LENGTH(h_blind_ev)=64) UNIQUE' ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial)' ',denom_sig BYTEA NOT NULL' ',reserve_uuid INT8 NOT NULL' -- REFERENCES reserves (reserve_uuid) ON DELETE CASCADE' ',reserve_sig BYTEA NOT NULL CHECK (LENGTH(reserve_sig)=64)' ',execution_date INT8 NOT NULL' ',amount_with_fee_val INT8 NOT NULL' ',amount_with_fee_frac INT4 NOT NULL' ') %s ;' ,'reserves_out' ,'PARTITION BY HASH (h_blind_ev)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_out_serial_id_index ' 'ON ' || table_name || ' ' '(reserve_out_serial_id);' ); -- FIXME: change query to use reserves_out_by_reserve instead and materialize execution_date there as well??? EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_reserve_uuid_and_execution_date_index ' 'ON ' || table_name || ' ' '(reserve_uuid, execution_date);' ); EXECUTE FORMAT ( 'COMMENT ON INDEX ' || table_name || '_by_reserve_uuid_and_execution_date_index ' 'IS ' || quote_literal('for get_reserves_out and exchange_do_withdraw_limit_check') || ';' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_reserves_out_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE reserves_out_' || partition_suffix || ' ' 'ADD CONSTRAINT reserves_out_' || partition_suffix || '_reserve_out_serial_id_key ' 'UNIQUE (reserve_out_serial_id)' ); END $$; CREATE OR REPLACE FUNCTION create_table_reserves_out_by_reserve( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'reserves_out_by_reserve'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(reserve_uuid INT8 NOT NULL' -- REFERENCES reserves (reserve_uuid) ON DELETE CASCADE ',h_blind_ev BYTEA CHECK (LENGTH(h_blind_ev)=64)' ') %s ' ,table_name ,'PARTITION BY HASH (reserve_uuid)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' 'ON ' || table_name || ' ' '(reserve_uuid);' ); END $$; ---------------------------- known_coins ------------------------------- CREATE OR REPLACE FUNCTION create_table_known_coins( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR default 'known_coins'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(known_coin_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial) ON DELETE CASCADE' ',coin_pub BYTEA NOT NULL PRIMARY KEY CHECK (LENGTH(coin_pub)=32)' ',age_commitment_hash BYTEA CHECK (LENGTH(age_commitment_hash)=32)' ',denom_sig BYTEA NOT NULL' ',remaining_val INT8 NOT NULL' ',remaining_frac INT4 NOT NULL' ') %s ;' ,table_name ,'PARTITION BY HASH (coin_pub)' -- FIXME: or include denominations_serial? or multi-level partitioning?; ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_known_coins_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE known_coins_' || partition_suffix || ' ' 'ADD CONSTRAINT known_coins_' || partition_suffix || '_known_coin_id_key ' 'UNIQUE (known_coin_id)' ); END $$; ---------------------------- refresh_commitments ------------------------------- CREATE OR REPLACE FUNCTION create_table_refresh_commitments( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'refresh_commitments'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(melt_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',rc BYTEA PRIMARY KEY CHECK (LENGTH(rc)=64)' ',old_coin_pub BYTEA NOT NULL' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE' ',old_coin_sig BYTEA NOT NULL CHECK(LENGTH(old_coin_sig)=64)' ',amount_with_fee_val INT8 NOT NULL' ',amount_with_fee_frac INT4 NOT NULL' ',noreveal_index INT4 NOT NULL' ') %s ;' ,table_name ,'PARTITION BY HASH (rc)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); -- Note: index spans partitions, may need to be materialized. EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_old_coin_pub_index ' 'ON ' || table_name || ' ' '(old_coin_pub);' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_refresh_commitments_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE refresh_commitments_' || partition_suffix || ' ' 'ADD CONSTRAINT refresh_commitments_' || partition_suffix || '_melt_serial_id_key ' 'UNIQUE (melt_serial_id)' ); END $$; ------------------------------ refresh_revealed_coins -------------------------------- CREATE OR REPLACE FUNCTION create_table_refresh_revealed_coins( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'refresh_revealed_coins'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(rrc_serial BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',melt_serial_id INT8 NOT NULL' -- REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE' ',freshcoin_index INT4 NOT NULL' ',link_sig BYTEA NOT NULL CHECK(LENGTH(link_sig)=64)' ',denominations_serial INT8 NOT NULL' -- REFERENCES denominations (denominations_serial) ON DELETE CASCADE' ',coin_ev BYTEA NOT NULL' -- UNIQUE' ',h_coin_ev BYTEA NOT NULL CHECK(LENGTH(h_coin_ev)=64)' -- UNIQUE' ',ev_sig BYTEA NOT NULL' ',ewv BYTEA NOT NULL' -- ,PRIMARY KEY (melt_serial_id, freshcoin_index) -- done per shard ') %s ;' ,table_name ,'PARTITION BY HASH (melt_serial_id)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_coins_by_melt_serial_id_index ' 'ON ' || table_name || ' ' '(melt_serial_id);' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_refresh_revealed_coins_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE refresh_revealed_coins_' || partition_suffix || ' ' 'ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_rrc_serial_key ' 'UNIQUE (rrc_serial) ' ',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_coin_ev_key ' 'UNIQUE (coin_ev) ' ',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_h_coin_ev_key ' 'UNIQUE (h_coin_ev) ' ',ADD PRIMARY KEY (melt_serial_id, freshcoin_index) ' ); END $$; ----------------------------- refresh_transfer_keys ------------------------------ CREATE OR REPLACE FUNCTION create_table_refresh_transfer_keys( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'refresh_transfer_keys'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(rtc_serial BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',melt_serial_id INT8 PRIMARY KEY' -- REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE' ',transfer_pub BYTEA NOT NULL CHECK(LENGTH(transfer_pub)=32)' ',transfer_privs BYTEA NOT NULL' ') %s ;' ,table_name ,'PARTITION BY HASH (melt_serial_id)' ,shard_suffix ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_refresh_transfer_keys_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE refresh_transfer_keys_' || partition_suffix || ' ' 'ADD CONSTRAINT refresh_transfer_keys_' || partition_suffix || '_rtc_serial_key ' 'UNIQUE (rtc_serial)' ); END $$; ---------------------------- deposits ------------------------------- CREATE OR REPLACE FUNCTION create_table_deposits( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'deposits'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- PRIMARY KEY' ',shard INT8 NOT NULL' ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE ',known_coin_id INT8 NOT NULL' -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE' --- FIXME: column needed??? ',amount_with_fee_val INT8 NOT NULL' ',amount_with_fee_frac INT4 NOT NULL' ',wallet_timestamp INT8 NOT NULL' ',exchange_timestamp INT8 NOT NULL' ',refund_deadline INT8 NOT NULL' ',wire_deadline INT8 NOT NULL' ',merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)' ',h_contract_terms BYTEA NOT NULL CHECK (LENGTH(h_contract_terms)=64)' ',coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)' ',wire_salt BYTEA NOT NULL CHECK (LENGTH(wire_salt)=16)' ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' ',done BOOLEAN NOT NULL DEFAULT FALSE' ',extension_blocked BOOLEAN NOT NULL DEFAULT FALSE' ',extension_details_serial_id INT8' -- REFERENCES extension_details (extension_details_serial_id) ON DELETE CASCADE' ') %s ;' ,table_name ,'PARTITION BY HASH (coin_pub)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' 'ON ' || table_name || ' ' '(coin_pub);' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_deposits_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE deposits_' || partition_suffix || ' ' 'ADD CONSTRAINT deposits_' || partition_suffix || '_deposit_serial_id_pkey ' 'PRIMARY KEY (deposit_serial_id) ' ',ADD CONSTRAINT deposits_' || partition_suffix || '_coin_pub_merchant_pub_h_contract_terms_key ' 'UNIQUE (coin_pub, merchant_pub, h_contract_terms)' ); END $$; CREATE OR REPLACE FUNCTION create_table_deposits_by_ready( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'deposits_by_ready'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(wire_deadline INT8 NOT NULL' ',shard INT8 NOT NULL' ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' ',deposit_serial_id INT8' ') %s ;' ,table_name ,'PARTITION BY RANGE (wire_deadline)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' 'ON ' || table_name || ' ' '(wire_deadline ASC, shard ASC, coin_pub);' ); END $$; CREATE OR REPLACE FUNCTION create_table_deposits_for_matching( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'deposits_for_matching'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(refund_deadline INT8 NOT NULL' ',merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)' ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE ',deposit_serial_id INT8' ') %s ;' ,table_name ,'PARTITION BY RANGE (refund_deadline)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' 'ON ' || table_name || ' ' '(refund_deadline ASC, merchant_pub, coin_pub);' ); END $$; ----------------------------- refunds ------------------------------ CREATE OR REPLACE FUNCTION create_table_refunds( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'refunds'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(refund_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE ',deposit_serial_id INT8 NOT NULL' -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE' ',merchant_sig BYTEA NOT NULL CHECK(LENGTH(merchant_sig)=64)' ',rtransaction_id INT8 NOT NULL' ',amount_with_fee_val INT8 NOT NULL' ',amount_with_fee_frac INT4 NOT NULL' -- ,PRIMARY KEY (deposit_serial_id, rtransaction_id) -- done per shard! ') %s ;' ,table_name ,'PARTITION BY HASH (coin_pub)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' 'ON ' || table_name || ' ' '(coin_pub);' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_refunds_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE refunds_' || partition_suffix || ' ' 'ADD CONSTRAINT refunds_' || partition_suffix || '_refund_serial_id_key ' 'UNIQUE (refund_serial_id) ' ',ADD PRIMARY KEY (deposit_serial_id, rtransaction_id) ' ); END $$; ---------------------------- wire_out ------------------------------- CREATE OR REPLACE FUNCTION create_table_wire_out( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'wire_out'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(wireout_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- PRIMARY KEY' ',execution_date INT8 NOT NULL' ',wtid_raw BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid_raw)=32)' ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' ',exchange_account_section TEXT NOT NULL' ',amount_val INT8 NOT NULL' ',amount_frac INT4 NOT NULL' ') %s ;' ,table_name ,'PARTITION BY HASH (wtid_raw)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_wire_target_h_payto_index ' 'ON ' || table_name || ' ' '(wire_target_h_payto);' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_wire_out_partition( IN partition_suffix VARCHAR ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE wire_out_' || partition_suffix || ' ' 'ADD CONSTRAINT wire_out_' || partition_suffix || '_wireout_uuid_pkey ' 'PRIMARY KEY (wireout_uuid)' ); END $$; ---------------------------- aggregation_transient ------------------------------ CREATE OR REPLACE FUNCTION create_table_aggregation_transient( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'aggregation_transient'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I ' '(amount_val INT8 NOT NULL' ',amount_frac INT4 NOT NULL' ',wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)' ',exchange_account_section TEXT NOT NULL' ',wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)' ') %s ;' ,table_name ,'PARTITION BY HASH (wire_target_h_payto)' ,shard_suffix ); END $$; ---------------------------- aggregation_tracking ------------------------------- CREATE OR REPLACE FUNCTION create_table_aggregation_tracking( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'aggregation_tracking'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',deposit_serial_id INT8 PRIMARY KEY' -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE' -- FIXME chnage to coint_pub + deposit_serial_id for more efficient depost -- or something else ??? ',wtid_raw BYTEA NOT NULL' -- CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE' ') %s ;' ,table_name ,'PARTITION BY HASH (deposit_serial_id)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_wtid_raw_index ' 'ON ' || table_name || ' ' '(wtid_raw);' ); EXECUTE FORMAT ( 'COMMENT ON INDEX ' || table_name || '_by_wtid_raw_index ' 'IS ' || quote_literal('for lookup_transactions') || ';' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_aggregation_tracking_partition( IN partition_suffix VARCHAR ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE aggregation_tracking_' || partition_suffix || ' ' 'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key ' 'UNIQUE (aggregation_serial_id) ' ); END $$; ----------------------------- recoup ------------------------------ CREATE OR REPLACE FUNCTION create_table_recoup( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'recoup'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(recoup_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ',coin_sig BYTEA NOT NULL CHECK(LENGTH(coin_sig)=64)' ',coin_blind BYTEA NOT NULL CHECK(LENGTH(coin_blind)=32)' ',amount_val INT8 NOT NULL' ',amount_frac INT4 NOT NULL' ',recoup_timestamp INT8 NOT NULL' ',reserve_out_serial_id INT8 NOT NULL' -- REFERENCES reserves_out (reserve_out_serial_id) ON DELETE CASCADE' ') %s ;' ,table_name ,'PARTITION BY HASH (coin_pub);' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' 'ON ' || table_name || ' ' '(coin_pub);' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_recoup_partition( IN partition_suffix VARCHAR ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE recoup_' || partition_suffix || ' ' 'ADD CONSTRAINT recoup_' || partition_suffix || '_recoup_uuid_key ' 'UNIQUE (recoup_uuid) ' ); END $$; CREATE OR REPLACE FUNCTION create_table_recoup_by_reserve( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'recoup_by_reserve'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(reserve_out_serial_id INT8 NOT NULL' -- REFERENCES reserves (reserve_out_serial_id) ON DELETE CASCADE ',coin_pub BYTEA CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ') %s ;' ,table_name ,'PARTITION BY HASH (reserve_out_serial_id)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_main_index ' 'ON ' || table_name || ' ' '(reserve_out_serial_id);' ); END $$; ---------------------------- recoup_refresh ------------------------------ CREATE OR REPLACE FUNCTION create_table_recoup_refresh( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'recoup_refresh'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(recoup_refresh_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)' -- REFERENCES known_coins (coin_pub) ',known_coin_id BIGINT NOT NULL' -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE ',coin_sig BYTEA NOT NULL CHECK(LENGTH(coin_sig)=64)' ',coin_blind BYTEA NOT NULL CHECK(LENGTH(coin_blind)=32)' ',amount_val INT8 NOT NULL' ',amount_frac INT4 NOT NULL' ',recoup_timestamp INT8 NOT NULL' ',rrc_serial INT8 NOT NULL' -- REFERENCES refresh_revealed_coins (rrc_serial) ON DELETE CASCADE -- UNIQUE' ') %s ;' ,table_name ,'PARTITION BY HASH (coin_pub)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); -- FIXME: any query using this index will be slow. Materialize index or change query? -- Also: which query uses this index? EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_rrc_serial_index ' 'ON ' || table_name || ' ' '(rrc_serial);' ); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_coin_pub_index ' 'ON ' || table_name || ' ' '(coin_pub);' ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_recoup_refresh_partition( IN partition_suffix VARCHAR ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE recoup_refresh_' || partition_suffix || ' ' 'ADD CONSTRAINT recoup_refresh_' || partition_suffix || '_recoup_refresh_uuid_key ' 'UNIQUE (recoup_refresh_uuid) ' ); END $$; ----------------------------- prewire ------------------------------ CREATE OR REPLACE FUNCTION create_table_prewire( IN shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE table_name VARCHAR DEFAULT 'prewire'; BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(prewire_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY' ',wire_method TEXT NOT NULL' ',finished BOOLEAN NOT NULL DEFAULT false' ',failed BOOLEAN NOT NULL DEFAULT false' ',buf BYTEA NOT NULL' ') %s ;' ,table_name ,'PARTITION BY HASH (prewire_uuid)' ,shard_suffix ); table_name = concat_ws('_', table_name, shard_suffix); EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_finished_index ' 'ON ' || table_name || ' ' '(finished);' ); EXECUTE FORMAT ( 'COMMENT ON INDEX ' || table_name || '_by_finished_index ' 'IS ' || quote_literal('for gc_prewire') || ';' ); -- FIXME: find a way to combine these two indices? EXECUTE FORMAT ( 'CREATE INDEX IF NOT EXISTS ' || table_name || '_by_failed_finished_index ' 'ON ' || table_name || ' ' '(failed,finished);' ); EXECUTE FORMAT ( 'COMMENT ON INDEX ' || table_name || '_by_failed_finished_index ' 'IS ' || quote_literal('for wire_prepare_data_get') || ';' ); END $$; ----------------------------- cs_nonce_locks ------------------------------ CREATE OR REPLACE FUNCTION create_table_cs_nonce_locks( shard_suffix VARCHAR DEFAULT NULL ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN PERFORM create_partitioned_table( 'CREATE TABLE IF NOT EXISTS %I' '(cs_nonce_lock_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY' -- UNIQUE' ',nonce BYTEA PRIMARY KEY CHECK (LENGTH(nonce)=32)' ',op_hash BYTEA NOT NULL CHECK (LENGTH(op_hash)=64)' ',max_denomination_serial INT8 NOT NULL' ') %s ;' ,'cs_nonce_locks' ,'PARTITION BY HASH (nonce)' ,shard_suffix ); END $$; CREATE OR REPLACE FUNCTION add_constraints_to_cs_nonce_locks_partition( IN partition_suffix VARCHAR ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN EXECUTE FORMAT ( 'ALTER TABLE cs_nonce_locks_' || partition_suffix || ' ' 'ADD CONSTRAINT cs_nonce_locks_' || partition_suffix || '_cs_nonce_lock_serial_id_key ' 'UNIQUE (cs_nonce_lock_serial_id)' ); END $$; ------------------------- Partitions ------------------------------ CREATE OR REPLACE FUNCTION create_hash_partition( source_table_name VARCHAR ,modulus INTEGER ,partition_num INTEGER ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'Creating partition %_%', source_table_name, partition_num; EXECUTE FORMAT( 'CREATE TABLE IF NOT EXISTS %I ' 'PARTITION OF %I ' 'FOR VALUES WITH (MODULUS %s, REMAINDER %s)' ,source_table_name || '_' || partition_num ,source_table_name ,modulus ,partition_num-1 ); END $$; CREATE OR REPLACE FUNCTION create_range_partition( source_table_name VARCHAR ,partition_num INTEGER ) RETURNS void LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'TODO'; END $$; CREATE OR REPLACE FUNCTION detach_default_partitions() RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'Detaching all default table partitions'; ALTER TABLE IF EXISTS wire_targets DETACH PARTITION wire_targets_default; ALTER TABLE IF EXISTS reserves DETACH PARTITION reserves_default; ALTER TABLE IF EXISTS reserves_in DETACH PARTITION reserves_in_default; ALTER TABLE IF EXISTS reserves_close DETACH PARTITION reserves_close_default; ALTER TABLE IF EXISTS reserves_out DETACH PARTITION reserves_out_default; ALTER TABLE IF EXISTS reserves_out_by_reserve DETACH PARTITION reserves_out_by_reserve_default; ALTER TABLE IF EXISTS known_coins DETACH PARTITION known_coins_default; ALTER TABLE IF EXISTS refresh_commitments DETACH PARTITION refresh_commitments_default; ALTER TABLE IF EXISTS refresh_revealed_coins DETACH PARTITION refresh_revealed_coins_default; ALTER TABLE IF EXISTS refresh_transfer_keys DETACH PARTITION refresh_transfer_keys_default; ALTER TABLE IF EXISTS deposits DETACH PARTITION deposits_default; --- TODO range partitioning -- ALTER TABLE IF EXISTS deposits_by_ready -- DETACH PARTITION deposits_by_ready_default; -- -- ALTER TABLE IF EXISTS deposits_for_matching -- DETACH PARTITION deposits_default_for_matching_default; ALTER TABLE IF EXISTS refunds DETACH PARTITION refunds_default; ALTER TABLE IF EXISTS wire_out DETACH PARTITION wire_out_default; ALTER TABLE IF EXISTS aggregation_transient DETACH PARTITION aggregation_transient_default; ALTER TABLE IF EXISTS aggregation_tracking DETACH PARTITION aggregation_tracking_default; ALTER TABLE IF EXISTS recoup DETACH PARTITION recoup_default; ALTER TABLE IF EXISTS recoup_by_reserve DETACH PARTITION recoup_by_reserve_default; ALTER TABLE IF EXISTS recoup_refresh DETACH PARTITION recoup_refresh_default; ALTER TABLE IF EXISTS prewire DETACH PARTITION prewire_default; ALTER TABLE IF EXISTS cs_nonce_locks DETACH partition cs_nonce_locks_default; END $$; COMMENT ON FUNCTION detach_default_partitions IS 'We need to drop default and create new one before deleting the default partitions otherwise constraints get lost too. Might be needed in shardig too'; CREATE OR REPLACE FUNCTION drop_default_partitions() RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'Dropping default table partitions'; DROP TABLE IF EXISTS wire_targets_default; DROP TABLE IF EXISTS reserves_default; DROP TABLE IF EXISTS reserves_in_default; DROP TABLE IF EXISTS reserves_close_default; DROP TABLE IF EXISTS reserves_out_default; DROP TABLE IF EXISTS reserves_out_by_reserve_default; DROP TABLE IF EXISTS known_coins_default; DROP TABLE IF EXISTS refresh_commitments_default; DROP TABLE IF EXISTS refresh_revealed_coins_default; DROP TABLE IF EXISTS refresh_transfer_keys_default; DROP TABLE IF EXISTS deposits_default; --DROP TABLE IF EXISTS deposits_by_ready_default; --DROP TABLE IF EXISTS deposits_for_matching_default; DROP TABLE IF EXISTS refunds_default; DROP TABLE IF EXISTS wire_out_default; DROP TABLE IF EXISTS aggregation_transient_default; DROP TABLE IF EXISTS aggregation_tracking_default; DROP TABLE IF EXISTS recoup_default; DROP TABLE IF EXISTS recoup_by_reserve_default; DROP TABLE IF EXISTS recoup_refresh_default; DROP TABLE IF EXISTS prewire_default; DROP TABLE IF EXISTS cs_nonce_locks_default; END $$; COMMENT ON FUNCTION drop_default_partitions IS 'Drop all default partitions once other partitions are attached. Might be needed in sharding too.'; CREATE OR REPLACE FUNCTION create_partitions( num_partitions INTEGER ) RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE modulus INTEGER; BEGIN modulus := num_partitions; PERFORM detach_default_partitions(); LOOP PERFORM create_hash_partition( 'wire_targets' ,modulus ,num_partitions ); PERFORM add_constraints_to_wire_targets_partition(num_partitions::varchar); PERFORM create_hash_partition( 'reserves' ,modulus ,num_partitions ); PERFORM create_hash_partition( 'reserves_in' ,modulus ,num_partitions ); PERFORM add_constraints_to_reserves_in_partition(num_partitions::varchar); PERFORM create_hash_partition( 'reserves_close' ,modulus ,num_partitions ); PERFORM add_constraints_to_reserves_close_partition(num_partitions::varchar); PERFORM create_hash_partition( 'reserves_out' ,modulus ,num_partitions ); PERFORM add_constraints_to_reserves_out_partition(num_partitions::varchar); PERFORM create_hash_partition( 'reserves_out_by_reserve' ,modulus ,num_partitions ); PERFORM create_hash_partition( 'known_coins' ,modulus ,num_partitions ); PERFORM add_constraints_to_known_coins_partition(num_partitions::varchar); PERFORM create_hash_partition( 'refresh_commitments' ,modulus ,num_partitions ); PERFORM add_constraints_to_refresh_commitments_partition(num_partitions::varchar); PERFORM create_hash_partition( 'refresh_revealed_coins' ,modulus ,num_partitions ); PERFORM add_constraints_to_refresh_revealed_coins_partition(num_partitions::varchar); PERFORM create_hash_partition( 'refresh_transfer_keys' ,modulus ,num_partitions ); PERFORM add_constraints_to_refresh_transfer_keys_partition(num_partitions::varchar); PERFORM create_hash_partition( 'deposits' ,modulus ,num_partitions ); PERFORM add_constraints_to_deposits_partition(num_partitions::varchar); -- TODO: dynamically (!) creating/deleting deposits partitions: -- create new partitions 'as needed', drop old ones once the aggregator has made -- them empty; as 'new' deposits will always have deadlines in the future, this -- would basically guarantee no conflict between aggregator and exchange service! -- SEE also: https://www.cybertec-postgresql.com/en/automatic-partition-creation-in-postgresql/ -- (article is slightly wrong, as this works:) --CREATE TABLE tab ( -- id bigint GENERATED ALWAYS AS IDENTITY, -- ts timestamp NOT NULL, -- data text -- PARTITION BY LIST ((ts::date)); -- CREATE TABLE tab_def PARTITION OF tab DEFAULT; -- BEGIN -- CREATE TABLE tab_part2 (LIKE tab); -- insert into tab_part2 (id,ts, data) values (5,'2022-03-21', 'foo'); -- alter table tab attach partition tab_part2 for values in ('2022-03-21'); -- commit; -- Naturally, to ensure this is actually 100% conflict-free, we'd -- need to create tables at the granularity of the wire/refund deadlines; -- that is right now configurable via AGGREGATOR_SHIFT option. -- FIXME: range partitioning -- PERFORM create_range_partition( -- 'deposits_by_ready' -- ,modulus -- ,num_partitions -- ); -- -- PERFORM create_range_partition( -- 'deposits_for_matching' -- ,modulus -- ,num_partitions -- ); PERFORM create_hash_partition( 'refunds' ,modulus ,num_partitions ); PERFORM add_constraints_to_refunds_partition(num_partitions::varchar); PERFORM create_hash_partition( 'wire_out' ,modulus ,num_partitions ); PERFORM add_constraints_to_wire_out_partition(num_partitions::varchar); PERFORM create_hash_partition( 'aggregation_transient' ,modulus ,num_partitions ); PERFORM create_hash_partition( 'aggregation_tracking' ,modulus ,num_partitions ); PERFORM add_constraints_to_aggregation_tracking_partition(num_partitions::varchar); PERFORM create_hash_partition( 'recoup' ,modulus ,num_partitions ); PERFORM add_constraints_to_recoup_partition(num_partitions::varchar); PERFORM create_hash_partition( 'recoup_by_reserve' ,modulus ,num_partitions ); PERFORM create_hash_partition( 'recoup_refresh' ,modulus ,num_partitions ); PERFORM add_constraints_to_recoup_refresh_partition(num_partitions::varchar); PERFORM create_hash_partition( 'prewire' ,modulus ,num_partitions ); PERFORM create_hash_partition( 'cs_nonce_locks' ,modulus ,num_partitions ); PERFORM add_constraints_to_cs_nonce_locks_partition(num_partitions::varchar); num_partitions=num_partitions-1; EXIT WHEN num_partitions=0; END LOOP; PERFORM drop_default_partitions(); END $$; --------------------- Sharding --------------------------- CREATE OR REPLACE FUNCTION create_foreign_hash_partition( source_table_name VARCHAR ,modulus INTEGER ,shard_suffix VARCHAR ,current_shard_num INTEGER ,local_user VARCHAR DEFAULT 'taler-exchange-httpd' ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'Creating %_% on %', source_table_name, shard_suffix, shard_suffix; EXECUTE FORMAT( 'CREATE FOREIGN TABLE IF NOT EXISTS %I ' 'PARTITION OF %I ' 'FOR VALUES WITH (MODULUS %s, REMAINDER %s) ' 'SERVER %I' ,source_table_name || '_' || shard_suffix ,source_table_name ,modulus ,current_shard_num-1 ,shard_suffix ); EXECUTE FORMAT( 'ALTER FOREIGN TABLE %I OWNER TO %I' ,source_table_name || '_' || shard_suffix ,local_user ); END $$; CREATE OR REPLACE FUNCTION create_foreign_range_partition( source_table_name VARCHAR ,partition_num INTEGER ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'TODO'; END $$; CREATE OR REPLACE FUNCTION prepare_sharding() RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN CREATE EXTENSION IF NOT EXISTS postgres_fdw; PERFORM detach_default_partitions(); ALTER TABLE IF EXISTS wire_targets DROP CONSTRAINT IF EXISTS wire_targets_pkey CASCADE ; ALTER TABLE IF EXISTS reserves DROP CONSTRAINT IF EXISTS reserves_pkey CASCADE ; ALTER TABLE IF EXISTS reserves_in DROP CONSTRAINT IF EXISTS reserves_in_pkey CASCADE ; ALTER TABLE IF EXISTS reserves_close DROP CONSTRAINT IF EXISTS reserves_close_pkey CASCADE ; ALTER TABLE IF EXISTS reserves_out DROP CONSTRAINT IF EXISTS reserves_out_pkey CASCADE ,DROP CONSTRAINT IF EXISTS reserves_out_denominations_serial_fkey ,DROP CONSTRAINT IF EXISTS reserves_out_h_blind_ev_key ; ALTER TABLE IF EXISTS known_coins DROP CONSTRAINT IF EXISTS known_coins_pkey CASCADE ,DROP CONSTRAINT IF EXISTS known_coins_denominations_serial_fkey ; ALTER TABLE IF EXISTS refresh_commitments DROP CONSTRAINT IF EXISTS refresh_commitments_pkey CASCADE ,DROP CONSTRAINT IF EXISTS refresh_old_coin_pub_fkey ; ALTER TABLE IF EXISTS refresh_revealed_coins DROP CONSTRAINT IF EXISTS refresh_revealed_coins_pkey CASCADE ,DROP CONSTRAINT IF EXISTS refresh_revealed_coins_denominations_serial_fkey ; ALTER TABLE IF EXISTS refresh_transfer_keys DROP CONSTRAINT IF EXISTS refresh_transfer_keys_pkey CASCADE ; ALTER TABLE IF EXISTS deposits DROP CONSTRAINT IF EXISTS deposits_pkey CASCADE ,DROP CONSTRAINT IF EXISTS deposits_extension_details_serial_id_fkey ,DROP CONSTRAINT IF EXISTS deposits_coin_pub_merchant_pub_h_contract_terms_key CASCADE ; ALTER TABLE IF EXISTS refunds DROP CONSTRAINT IF EXISTS refunds_pkey CASCADE ; ALTER TABLE IF EXISTS wire_out DROP CONSTRAINT IF EXISTS wire_out_pkey CASCADE ,DROP CONSTRAINT IF EXISTS wire_out_wtid_raw_key CASCADE ; ALTER TABLE IF EXISTS aggregation_tracking DROP CONSTRAINT IF EXISTS aggregation_tracking_pkey CASCADE ,DROP CONSTRAINT IF EXISTS aggregation_tracking_wtid_raw_fkey ; ALTER TABLE IF EXISTS recoup DROP CONSTRAINT IF EXISTS recoup_pkey CASCADE ; ALTER TABLE IF EXISTS recoup_refresh DROP CONSTRAINT IF EXISTS recoup_refresh_pkey CASCADE ; ALTER TABLE IF EXISTS prewire DROP CONSTRAINT IF EXISTS prewire_pkey CASCADE ; ALTER TABLE IF EXISTS cs_nonce_locks DROP CONSTRAINT IF EXISTS cs_nonce_locks_pkey CASCADE ; END $$; CREATE OR REPLACE FUNCTION create_shard_server( shard_suffix VARCHAR ,total_num_shards INTEGER ,current_shard_num INTEGER ,remote_host VARCHAR ,remote_user VARCHAR ,remote_user_password VARCHAR ,remote_db_name VARCHAR DEFAULT 'taler-exchange' ,remote_port INTEGER DEFAULT '5432' ,local_user VARCHAR DEFAULT 'taler-exchange-httpd' ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN RAISE NOTICE 'Creating server %', remote_host; EXECUTE FORMAT( 'CREATE SERVER IF NOT EXISTS %I ' 'FOREIGN DATA WRAPPER postgres_fdw ' 'OPTIONS (dbname %L, host %L, port %L)' ,shard_suffix ,remote_db_name ,remote_host ,remote_port ); EXECUTE FORMAT( 'CREATE USER MAPPING IF NOT EXISTS ' 'FOR %I SERVER %I ' 'OPTIONS (user %L, password %L)' ,local_user ,shard_suffix ,remote_user ,remote_user_password ); EXECUTE FORMAT( 'GRANT ALL PRIVILEGES ' 'ON FOREIGN SERVER %I ' 'TO %I;' ,shard_suffix ,local_user ); PERFORM create_foreign_hash_partition( 'wire_targets' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'reserves' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'reserves_in' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'reserves_out' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'reserves_out_by_reserve' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'reserves_close' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'known_coins' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'refresh_commitments' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'refresh_revealed_coins' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'refresh_transfer_keys' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'deposits' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); -- PERFORM create_foreign_range_partition( -- 'deposits_by_ready' -- ,total_num_shards -- ,shard_suffix -- ,current_shard_num -- ,local_user -- ); -- PERFORM create_foreign_range_partition( -- 'deposits_for_matching' -- ,total_num_shards -- ,shard_suffix -- ,current_shard_num -- ,local_user -- ); PERFORM create_foreign_hash_partition( 'refunds' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'wire_out' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'aggregation_transient' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'aggregation_tracking' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'recoup' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'recoup_by_reserve' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'recoup_refresh' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'prewire' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); PERFORM create_foreign_hash_partition( 'cs_nonce_locks' ,total_num_shards ,shard_suffix ,current_shard_num ,local_user ); END $$; COMMENT ON FUNCTION create_shard_server IS 'Create a shard server on the master node with all foreign tables and user mappings'; CREATE OR REPLACE FUNCTION create_foreign_servers( amount INTEGER ,domain VARCHAR DEFAULT 'perf.taler' ) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN PERFORM prepare_sharding(); FOR i IN 1..amount LOOP PERFORM create_shard_server( i::varchar ,amount ,i ,'shard-' || i::varchar || '.' || domain ,'taler' ,'taler' ,'taler-exchange' ,'5432' ,'taler-exchange-httpd' ); END LOOP; END $$; COMMIT;