include partitioning logic in dbinit

This commit is contained in:
Marco Boss 2022-03-02 10:50:51 +01:00
parent 4978b1e966
commit 2abe9bf6d7
No known key found for this signature in database
GPG Key ID: 89A3EC33C625C3DF
8 changed files with 688 additions and 227 deletions

View File

@ -44,6 +44,11 @@ static int clear_shards;
*/
static int gc_db;
/**
* -P option: setup a partitioned database
*/
static uint32_t num_partitions;
/**
* Main function that will be run.
@ -90,6 +95,24 @@ run (void *cls,
global_ret = EXIT_NOPERMISSION;
return;
}
if (1 <
num_partitions)
{
if (GNUNET_OK != plugin->setup_partitions (plugin->cls, &num_partitions))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Could not setup partitions. Dropping default ones again\n");
}
if (GNUNET_OK != plugin->drop_tables (plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Could not drop tables after failed partitioning, please delete the DB manually\n");
}
TALER_EXCHANGEDB_plugin_unload (plugin);
plugin = NULL;
global_ret = EXIT_NOTINSTALLED;
return;
}
if (gc_db || clear_shards)
{
if (GNUNET_OK !=
@ -150,6 +173,11 @@ main (int argc,
"shardunlock",
"unlock all revolving shard locks (use after system crash or shard size change while services are not running)",
&clear_shards),
GNUNET_GETOPT_option_uint ('P',
"partition",
"NUMBER",
"Setup a partitioned database where each table which can be partitioned holds NUMBER partitions on a single DB node (NOTE: this is different from sharding)",
&num_partitions),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;

View File

@ -19,6 +19,7 @@ sql_DATA = \
benchmark-0001.sql \
exchange-0000.sql \
exchange-0001.sql \
partition-0001.sql \
drop0001.sql
EXTRA_DIST = \

View File

@ -22,6 +22,7 @@ BEGIN;
-- Unlike the other SQL files, it SHOULD be updated to reflect the
-- latest requirements for dropping tables.
-- Unregister patch (exchange-0001.sql)
SELECT _v.unregister_patch('exchange-0001');
@ -36,25 +37,40 @@ DROP TABLE IF EXISTS signkey_revocations CASCADE;
DROP TABLE IF EXISTS work_shards CASCADE;
DROP TABLE IF EXISTS prewire CASCADE;
DROP TABLE IF EXISTS recoup CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_recoup_partition;
DROP TABLE IF EXISTS recoup_refresh CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_recoup_refresh_partition;
DROP TABLE IF EXISTS aggregation_tracking CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_aggregation_tracking_partition;
DROP TABLE IF EXISTS wire_out CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_wire_out_partition;
DROP TABLE IF EXISTS wire_targets CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_wire_targets_partition;
DROP TABLE IF EXISTS wire_fee CASCADE;
DROP TABLE IF EXISTS deposits CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_deposits_partition;
DROP TABLE IF EXISTS extension_details CASCADE;
DROP TABLE IF EXISTS refunds CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_refunds_partition;
DROP TABLE IF EXISTS refresh_commitments CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_refresh_commitments_partition;
DROP TABLE IF EXISTS refresh_revealed_coins CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_refresh_revealed_coins_partition;
DROP TABLE IF EXISTS refresh_transfer_keys CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_refresh_transfer_keys_partition;
DROP TABLE IF EXISTS known_coins CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_known_coins_partition;
DROP TABLE IF EXISTS reserves_close CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_reserves_close_partition;
DROP TABLE IF EXISTS reserves_out CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_reserves_out_partition;
DROP TABLE IF EXISTS reserves_in CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_reserves_in_partition;
DROP TABLE IF EXISTS reserves CASCADE;
DROP TABLE IF EXISTS denomination_revocations CASCADE;
DROP TABLE IF EXISTS denominations CASCADE;
DROP TABLE IF EXISTS cs_nonce_locks CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_cs_nonce_locks_partition;
DROP FUNCTION IF EXISTS exchange_do_withdraw(bigint,int,bytea,bytea,bytea,bytea,bytea,bigint,bigint) ;
@ -72,6 +88,19 @@ DROP FUNCTION IF EXISTS exchange_do_recoup_to_reserve;
-- FIXME: drop other stored functions!
-- Unregister patch (partition-0001.sql)
-- SELECT _v.unregister_patch('partition-0001');
-- Drops for partition-0001.sql
DROP FUNCTION IF EXISTS create_table_partition;
DROP FUNCTION IF EXISTS create_partitions;
DROP FUNCTION IF EXISTS detach_default_partitions;
DROP FUNCTION IF EXISTS drop_default_partitions;
-- And we're out of here...
COMMIT;

View File

@ -84,17 +84,34 @@ COMMENT ON COLUMN wire_targets.kyc_ok
IS 'true if the KYC check was passed successfully';
COMMENT ON COLUMN wire_targets.external_id
IS 'Name of the user that was used for OAuth 2.0-based legitimization';
CREATE TABLE IF NOT EXISTS wire_targets_default
PARTITION OF wire_targets
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_wire_targets_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE wire_targets_' || partition_suffix || ' '
'ADD CONSTRAINT wire_targets_' || partition_suffix || '_wire_target_serial_id_key '
'UNIQUE (wire_target_serial_id)'
);
END
$$;
SELECT add_constraints_to_wire_targets_partition('default');
-- FIXME partition by serial_id rather than h_payto,
-- it is used more in join conditions - crucial for sharding to select this.
-- Author: (Boss Marco)
CREATE INDEX IF NOT EXISTS wire_targets_serial_id_index
ON wire_targets
(wire_target_serial_id
);
(wire_target_serial_id);
CREATE TABLE IF NOT EXISTS reserves
(reserve_uuid BIGINT GENERATED BY DEFAULT AS IDENTITY
@ -156,10 +173,28 @@ COMMENT ON COLUMN reserves_in.reserve_pub
IS 'Public key of the reserve. Private key signifies ownership of the remaining balance.';
COMMENT ON COLUMN reserves_in.credit_val
IS 'Amount that was transferred into the reserve';
CREATE TABLE IF NOT EXISTS reserves_in_default
PARTITION OF reserves_in
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_reserves_in_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE reserves_in_' || partition_suffix || ' '
'ADD CONSTRAINT reserves_in_' || partition_suffix || '_reserve_in_serial_id_key '
'UNIQUE (reserve_in_serial_id)'
);
END
$$;
SELECT add_constraints_to_reserves_in_partition('default');
CREATE INDEX IF NOT EXISTS reserves_in_by_reserve_in_serial_id_index
ON reserves_in
(reserve_in_serial_id);
@ -190,10 +225,28 @@ COMMENT ON TABLE reserves_close
IS 'wire transfers executed by the reserve to close reserves';
COMMENT ON COLUMN reserves_close.wire_target_serial_id
IS 'Identifies the credited bank account (and KYC status). Note that closing does not depend on KYC.';
CREATE TABLE IF NOT EXISTS reserves_close_default
PARTITION OF reserves_close
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_reserves_close_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE reserves_close_' || partition_suffix || ' '
'ADD CONSTRAINT reserves_close_' || partition_suffix || '_close_uuid_pkey '
'PRIMARY KEY (close_uuid)'
);
END
$$;
SELECT add_constraints_to_reserves_close_partition('default');
CREATE INDEX IF NOT EXISTS reserves_close_by_close_uuid_index
ON reserves_close
(close_uuid);
@ -220,9 +273,27 @@ COMMENT ON COLUMN reserves_out.h_blind_ev
IS 'Hash of the blinded coin, used as primary key here so that broken clients that use a non-random coin or blinding factor fail to withdraw (otherwise they would fail on deposit when the coin is not unique there).';
COMMENT ON COLUMN reserves_out.denominations_serial
IS 'We do not CASCADE ON DELETE here, we may keep the denomination data alive';
CREATE TABLE IF NOT EXISTS reserves_out_default
PARTITION OF reserves_out
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_reserves_out_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE reserves_out_' || partition_suffix || ' '
'ADD CONSTRAINT reserves_out_' || partition_suffix || '_reserve_out_serial_id_key '
'UNIQUE (reserve_out_serial_id)'
);
END
$$;
SELECT add_constraints_to_reserves_out_partition('default');
CREATE INDEX IF NOT EXISTS reserves_out_by_reserve_out_serial_id_index
ON reserves_out
@ -337,10 +408,28 @@ COMMENT ON COLUMN known_coins.age_commitment_hash
IS 'Optional hash of the age commitment for age restrictions as per DD 24 (active if denom_type has the respective bit set)';
COMMENT ON COLUMN known_coins.denom_sig
IS 'This is the signature of the exchange that affirms that the coin is a valid coin. The specific signature type depends on denom_type of the denomination.';
CREATE TABLE IF NOT EXISTS known_coins_default
PARTITION OF known_coins
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_known_coins_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE known_coins_' || partition_suffix || ' '
'ADD CONSTRAINT known_coins_' || partition_suffix || '_known_coin_id_key '
'UNIQUE (known_coin_id)'
);
END
$$;
SELECT add_constraints_to_known_coins_partition('default');
CREATE INDEX IF NOT EXISTS known_coins_by_known_coin_id_index
ON known_coins
(known_coin_id);
@ -367,10 +456,28 @@ COMMENT ON COLUMN refresh_commitments.old_coin_pub
IS 'Coin being melted in the refresh process.';
COMMENT ON COLUMN refresh_commitments.h_age_commitment
IS 'The (optional) age commitment that was involved in the minting process of the coin, may be NULL.';
CREATE TABLE IF NOT EXISTS refresh_commitments_default
PARTITION OF refresh_commitments
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_refresh_commitments_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE refresh_commitments_' || partition_suffix || ' '
'ADD CONSTRAINT refresh_commitments_' || partition_suffix || '_melt_serial_id_key '
'UNIQUE (melt_serial_id)'
);
END
$$;
SELECT add_constraints_to_refresh_commitments_partition('default');
CREATE INDEX IF NOT EXISTS refresh_commitments_by_melt_serial_id_index
ON refresh_commitments
(melt_serial_id);
@ -408,12 +515,33 @@ COMMENT ON COLUMN refresh_revealed_coins.h_coin_ev
IS 'hash of the envelope of the new coin to be signed (for lookups)';
COMMENT ON COLUMN refresh_revealed_coins.ev_sig
IS 'exchange signature over the envelope';
CREATE TABLE IF NOT EXISTS refresh_revealed_coins_default
PARTITION OF refresh_revealed_coins
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
-- We do require this primary key on each shard!
ALTER TABLE refresh_revealed_coins_default
ADD PRIMARY KEY (melt_serial_id, freshcoin_index);
CREATE OR REPLACE FUNCTION add_constraints_to_refresh_revealed_coins_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE refresh_revealed_coins_' || partition_suffix || ' '
'ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_rrc_serial_key '
'UNIQUE (rrc_serial) '
',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_coin_ev_key '
'UNIQUE (coin_ev) '
',ADD CONSTRAINT refresh_revealed_coins_' || partition_suffix || '_h_coin_ev_key '
'UNIQUE (h_coin_ev) '
',ADD PRIMARY KEY (melt_serial_id, freshcoin_index) '
);
END
$$;
SELECT add_constraints_to_refresh_revealed_coins_partition('default');
CREATE INDEX IF NOT EXISTS refresh_revealed_coins_by_rrc_serial_index
ON refresh_revealed_coins
@ -440,10 +568,28 @@ COMMENT ON COLUMN refresh_transfer_keys.transfer_pub
IS 'transfer public key for the gamma index';
COMMENT ON COLUMN refresh_transfer_keys.transfer_privs
IS 'array of TALER_CNC_KAPPA - 1 transfer private keys that have been revealed, with the gamma entry being skipped';
CREATE TABLE IF NOT EXISTS refresh_transfer_keys_default
PARTITION OF refresh_transfer_keys
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_refresh_transfer_keys_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE refresh_transfer_keys_' || partition_suffix || ' '
'ADD CONSTRAINT refresh_transfer_keys_' || partition_suffix || '_rtc_serial_key '
'UNIQUE (rtc_serial)'
);
END
$$;
SELECT add_constraints_to_refresh_transfer_keys_partition('default');
CREATE INDEX IF NOT EXISTS refresh_transfer_keys_by_rtc_serial_index
ON refresh_transfer_keys
(rtc_serial);
@ -480,10 +626,28 @@ CREATE TABLE IF NOT EXISTS deposits
,UNIQUE (shard, known_coin_id, merchant_pub, h_contract_terms)
)
PARTITION BY HASH (shard);
CREATE TABLE IF NOT EXISTS deposits_default
PARTITION OF deposits
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_deposits_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE deposits_' || partition_suffix || ' '
'ADD CONSTRAINT deposits_' || partition_suffix || '_deposit_serial_id_pkey '
'PRIMARY KEY (deposit_serial_id)'
);
END
$$;
SELECT add_constraints_to_deposits_partition('default');
COMMENT ON TABLE deposits
IS 'Deposits we have received and for which we need to make (aggregate) wire transfers (and manage refunds).';
COMMENT ON COLUMN deposits.shard
@ -546,11 +710,28 @@ COMMENT ON COLUMN refunds.deposit_serial_id
IS 'Identifies ONLY the merchant_pub, h_contract_terms and known_coin_id. Multiple deposits may match a refund, this only identifies one of them.';
COMMENT ON COLUMN refunds.rtransaction_id
IS 'used by the merchant to make refunds unique in case the same coin for the same deposit gets a subsequent (higher) refund';
CREATE TABLE IF NOT EXISTS refunds_default
PARTITION OF refunds
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
ALTER TABLE refunds_default
ADD PRIMARY KEY (deposit_serial_id, rtransaction_id);
CREATE OR REPLACE FUNCTION add_constraints_to_refunds_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE refunds_' || partition_suffix || ' '
'ADD CONSTRAINT refunds_' || partition_suffix || '_refund_serial_id_key '
'UNIQUE (refund_serial_id) '
',ADD PRIMARY KEY (deposit_serial_id, rtransaction_id) '
);
END
$$;
SELECT add_constraints_to_refunds_partition('default');
CREATE INDEX IF NOT EXISTS refunds_by_refund_serial_id_index
ON refunds
@ -573,10 +754,28 @@ COMMENT ON COLUMN wire_out.exchange_account_section
IS 'identifies the configuration section with the debit account of this payment';
COMMENT ON COLUMN wire_out.wire_target_serial_id
IS 'Identifies the credited bank account and KYC status';
CREATE TABLE IF NOT EXISTS wire_out_default
PARTITION OF wire_out
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_wire_out_partition(
IN partition_suffix VARCHAR
)
RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE wire_out_' || partition_suffix || ' '
'ADD CONSTRAINT wire_out_' || partition_suffix || '_wireout_uuid_pkey '
'PRIMARY KEY (wireout_uuid)'
);
END
$$;
SELECT add_constraints_to_wire_out_partition('default');
CREATE INDEX IF NOT EXISTS wire_out_by_wireout_uuid_index
ON wire_out
(wireout_uuid);
@ -585,7 +784,6 @@ CREATE INDEX IF NOT EXISTS wire_out_by_wire_target_serial_id_index
(wire_target_serial_id);
CREATE TABLE IF NOT EXISTS aggregation_tracking
(aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE
,deposit_serial_id INT8 PRIMARY KEY -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE
@ -596,10 +794,28 @@ COMMENT ON TABLE aggregation_tracking
IS 'mapping from wire transfer identifiers (WTID) to deposits (and back)';
COMMENT ON COLUMN aggregation_tracking.wtid_raw
IS 'We first create entries in the aggregation_tracking table and then finally the wire_out entry once we know the total amount. Hence the constraint must be deferrable and we cannot use a wireout_uuid here, because we do not have it when these rows are created. Changing the logic to first INSERT a dummy row into wire_out and then UPDATEing that row in the same transaction would theoretically reduce per-deposit storage costs by 5 percent (24/~460 bytes).';
CREATE TABLE IF NOT EXISTS aggregation_tracking_default
PARTITION OF aggregation_tracking
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_aggregation_tracking_partition(
IN partition_suffix VARCHAR
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE aggregation_tracking_' || partition_suffix || ' '
'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key '
'UNIQUE (aggregation_serial_id) '
);
END
$$;
SELECT add_constraints_to_aggregation_tracking_partition('default');
CREATE INDEX IF NOT EXISTS aggregation_tracking_by_aggregation_serial_id_index
ON aggregation_tracking
(aggregation_serial_id);
@ -653,10 +869,28 @@ COMMENT ON COLUMN recoup.coin_sig
IS 'Signature by the coin affirming the recoup, of type TALER_SIGNATURE_WALLET_COIN_RECOUP';
COMMENT ON COLUMN recoup.coin_blind
IS 'Denomination blinding key used when creating the blinded coin from the planchet. Secret revealed during the recoup to provide the linkage between the coin and the withdraw operation.';
CREATE TABLE IF NOT EXISTS recoup_default
PARTITION OF recoup
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_recoup_partition(
IN partition_suffix VARCHAR
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE recoup_' || partition_suffix || ' '
'ADD CONSTRAINT recoup_' || partition_suffix || '_recoup_uuid_key '
'UNIQUE (recoup_uuid) '
);
END
$$;
SELECT add_constraints_to_recoup_partition('default');
CREATE INDEX IF NOT EXISTS recoup_by_recoup_uuid_index
ON recoup
(recoup_uuid);
@ -687,10 +921,28 @@ COMMENT ON COLUMN recoup_refresh.rrc_serial
IS 'Link to the refresh operation. Also identifies the h_blind_ev of the recouped coin (as h_coin_ev).';
COMMENT ON COLUMN recoup_refresh.coin_blind
IS 'Denomination blinding key used when creating the blinded coin from the planchet. Secret revealed during the recoup to provide the linkage between the coin and the refresh operation.';
CREATE TABLE IF NOT EXISTS recoup_refresh_default
PARTITION OF recoup_refresh
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_recoup_refresh_partition(
IN partition_suffix VARCHAR
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE recoup_refresh_' || partition_suffix || ' '
'ADD CONSTRAINT recoup_refresh_' || partition_suffix || '_recoup_refresh_uuid_key '
'UNIQUE (recoup_refresh_uuid) '
);
END
$$;
SELECT add_constraints_to_recoup_refresh_partition('default');
CREATE INDEX IF NOT EXISTS recoup_refresh_by_recoup_refresh_uuid_index
ON recoup_refresh
(recoup_refresh_uuid);
@ -770,10 +1022,27 @@ COMMENT ON COLUMN cs_nonce_locks.op_hash
IS 'hash (RC for refresh, blind coin hash for withdraw) the nonce may be used with';
COMMENT ON COLUMN cs_nonce_locks.max_denomination_serial
IS 'Maximum number of a CS denomination serial the nonce could be used with, for GC';
CREATE TABLE IF NOT EXISTS cs_nonce_locks_default
PARTITION OF cs_nonce_locks
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
CREATE OR REPLACE FUNCTION add_constraints_to_cs_nonce_locks_partition(
IN partition_suffix VARCHAR
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
EXECUTE FORMAT (
'ALTER TABLE cs_nonce_locks_' || partition_suffix || ' '
'ADD CONSTRAINT cs_nonce_locks_' || partition_suffix || '_cs_nonce_lock_serial_id_key '
'UNIQUE (cs_nonce_lock_serial_id)'
);
END
$$;
SELECT add_constraints_to_cs_nonce_locks_partition('default');
CREATE TABLE IF NOT EXISTS work_shards
(shard_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY UNIQUE
@ -839,224 +1108,6 @@ CREATE INDEX IF NOT EXISTS revolving_work_shards_by_job_name_active_last_attempt
);
-- Partitions
CREATE OR REPLACE FUNCTION create_table_partition(
source_table_name VARCHAR
,modulus INTEGER
,num_partitions INTEGER
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
RAISE NOTICE 'Creating partition %_%', source_table_name, num_partitions;
EXECUTE FORMAT(
'CREATE TABLE IF NOT EXISTS %I '
'PARTITION OF %I '
'FOR VALUES WITH (MODULUS %s, REMAINDER %s)'
,source_table_name || '_' || num_partitions
,source_table_name
,modulus
,num_partitions-1
);
END
$$;
CREATE OR REPLACE FUNCTION detach_default_partitions()
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
RAISE NOTICE 'Detaching all default table partitions';
ALTER TABLE IF EXISTS wire_targets
DETACH PARTITION wire_targets_default;
ALTER TABLE IF EXISTS reserves
DETACH PARTITION reserves_default;
ALTER TABLE IF EXISTS reserves_in
DETACH PARTITION reserves_in_default;
ALTER TABLE IF EXISTS reserves_close
DETACH PARTITION reserves_close_default;
ALTER TABLE IF EXISTS reserves_out
DETACH PARTITION reserves_out_default;
ALTER TABLE IF EXISTS known_coins
DETACH PARTITION known_coins_default;
ALTER TABLE IF EXISTS refresh_commitments
DETACH PARTITION refresh_commitments_default;
ALTER TABLE IF EXISTS refresh_revealed_coins
DETACH PARTITION refresh_revealed_coins_default;
ALTER TABLE IF EXISTS refresh_transfer_keys
DETACH PARTITION refresh_transfer_keys_default;
ALTER TABLE IF EXISTS deposits
DETACH PARTITION deposits_default;
ALTER TABLE IF EXISTS refunds
DETACH PARTITION refunds_default;
ALTER TABLE IF EXISTS wire_out
DETACH PARTITION wire_out_default;
ALTER TABLE IF EXISTS aggregation_tracking
DETACH PARTITION aggregation_tracking_default;
ALTER TABLE IF EXISTS recoup
DETACH PARTITION recoup_default;
ALTER TABLE IF EXISTS recoup_refresh
DETACH PARTITION recoup_refresh_default;
ALTER TABLE IF EXISTS prewire
DETACH PARTITION prewire_default;
ALTER TABLE IF EXISTS cs_nonce_locks
DETACH partition cs_nonce_locks_default;
END
$$;
COMMENT ON FUNCTION detach_default_partitions
IS 'We need to drop default and create new one before deleting the default partitions
otherwise constraints get lost too';
CREATE OR REPLACE FUNCTION drop_default_partitions()
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
RAISE NOTICE 'Dropping default table partitions';
DROP TABLE IF EXISTS wire_targets_default;
DROP TABLE IF EXISTS reserves_default;
DROP TABLE IF EXISTS reserves_in_default;
DROP TABLE IF EXISTS reserves_close_default;
DROP TABLE IF EXISTS reserves_out_default;
DROP TABLE IF EXISTS known_coins_default;
DROP TABLE IF EXISTS refresh_commitments_default;
DROP TABLE IF EXISTS refresh_revealed_coins_default;
DROP TABLE IF EXISTS refresh_transfer_keys_default;
DROP TABLE IF EXISTS deposits_default;
DROP TABLE IF EXISTS refunds_default;
DROP TABLE IF EXISTS wire_out_default;
DROP TABLE IF EXISTS aggregation_tracking_default;
DROP TABLE IF EXISTS recoup_default;
DROP TABLE IF EXISTS recoup_refresh_default;
DROP TABLE IF EXISTS prewire_default;
DROP TABLE IF EXISTS cs_nonce_locks_default;
END
$$;
CREATE OR REPLACE FUNCTION create_partitions(
num_partitions INTEGER
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
modulus INTEGER;
BEGIN
modulus := num_partitions;
PERFORM detach_default_partitions();
LOOP
PERFORM create_table_partition(
'wire_targets'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'reserves'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'reserves_in'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'reserves_close'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'reserves_out'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'known_coins'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'refresh_commitments'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'refresh_revealed_coins'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'refresh_transfer_keys'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'deposits'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'refunds'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'wire_out'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'aggregation_tracking'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'recoup'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'recoup_refresh'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'prewire'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'cs_nonce_locks'
,modulus
,num_partitions
);
num_partitions=num_partitions-1;
EXIT WHEN num_partitions=0;
END LOOP;
PERFORM drop_default_partitions();
END
$$;
-- Stored procedures

View File

@ -0,0 +1,290 @@
--
-- This file is part of TALER
-- Copyright (C) 2014--2022 Taler Systems SA
--
-- TALER is free software; you can redistribute it and/or modify it under the
-- terms of the GNU General Public License as published by the Free Software
-- Foundation; either version 3, or (at your option) any later version.
--
-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
--
-- You should have received a copy of the GNU General Public License along with
-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
--
-- Everything in one big transaction
BEGIN;
-- Check patch versioning is in place.
-- SELECT _v.register_patch('partition-0001', NULL, NULL);
CREATE OR REPLACE FUNCTION create_table_partition(
source_table_name VARCHAR
,modulus INTEGER
,partition_num INTEGER
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
RAISE NOTICE 'Creating partition %_%', source_table_name, partition_num;
EXECUTE FORMAT(
'CREATE TABLE IF NOT EXISTS %I '
'PARTITION OF %I '
'FOR VALUES WITH (MODULUS %s, REMAINDER %s)'
,source_table_name || '_' || partition_num
,source_table_name
,modulus
,partition_num-1
);
END
$$;
CREATE OR REPLACE FUNCTION detach_default_partitions()
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
RAISE NOTICE 'Detaching all default table partitions';
ALTER TABLE IF EXISTS wire_targets
DETACH PARTITION wire_targets_default;
ALTER TABLE IF EXISTS reserves
DETACH PARTITION reserves_default;
ALTER TABLE IF EXISTS reserves_in
DETACH PARTITION reserves_in_default;
ALTER TABLE IF EXISTS reserves_close
DETACH PARTITION reserves_close_default;
ALTER TABLE IF EXISTS reserves_out
DETACH PARTITION reserves_out_default;
ALTER TABLE IF EXISTS known_coins
DETACH PARTITION known_coins_default;
ALTER TABLE IF EXISTS refresh_commitments
DETACH PARTITION refresh_commitments_default;
ALTER TABLE IF EXISTS refresh_revealed_coins
DETACH PARTITION refresh_revealed_coins_default;
ALTER TABLE IF EXISTS refresh_transfer_keys
DETACH PARTITION refresh_transfer_keys_default;
ALTER TABLE IF EXISTS deposits
DETACH PARTITION deposits_default;
ALTER TABLE IF EXISTS refunds
DETACH PARTITION refunds_default;
ALTER TABLE IF EXISTS wire_out
DETACH PARTITION wire_out_default;
ALTER TABLE IF EXISTS aggregation_tracking
DETACH PARTITION aggregation_tracking_default;
ALTER TABLE IF EXISTS recoup
DETACH PARTITION recoup_default;
ALTER TABLE IF EXISTS recoup_refresh
DETACH PARTITION recoup_refresh_default;
ALTER TABLE IF EXISTS prewire
DETACH PARTITION prewire_default;
ALTER TABLE IF EXISTS cs_nonce_locks
DETACH partition cs_nonce_locks_default;
END
$$;
COMMENT ON FUNCTION detach_default_partitions
IS 'We need to drop default and create new one before deleting the default partitions
otherwise constraints get lost too. Might be needed in shardig too';
CREATE OR REPLACE FUNCTION drop_default_partitions()
RETURNS VOID
LANGUAGE plpgsql
AS $$
BEGIN
RAISE NOTICE 'Dropping default table partitions';
DROP TABLE IF EXISTS wire_targets_default;
DROP TABLE IF EXISTS reserves_default;
DROP TABLE IF EXISTS reserves_in_default;
DROP TABLE IF EXISTS reserves_close_default;
DROP TABLE IF EXISTS reserves_out_default;
DROP TABLE IF EXISTS known_coins_default;
DROP TABLE IF EXISTS refresh_commitments_default;
DROP TABLE IF EXISTS refresh_revealed_coins_default;
DROP TABLE IF EXISTS refresh_transfer_keys_default;
DROP TABLE IF EXISTS deposits_default;
DROP TABLE IF EXISTS refunds_default;
DROP TABLE IF EXISTS wire_out_default;
DROP TABLE IF EXISTS aggregation_tracking_default;
DROP TABLE IF EXISTS recoup_default;
DROP TABLE IF EXISTS recoup_refresh_default;
DROP TABLE IF EXISTS prewire_default;
DROP TABLE IF EXISTS cs_nonce_locks_default;
END
$$;
COMMENT ON FUNCTION drop_default_partitions
IS 'Drop all default partitions once other partitions are attached.
Might be needed in sharding too.';
CREATE OR REPLACE FUNCTION create_partitions(
num_partitions INTEGER
)
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
modulus INTEGER;
BEGIN
modulus := num_partitions;
PERFORM detach_default_partitions();
LOOP
PERFORM create_table_partition(
'wire_targets'
,modulus
,num_partitions
);
PERFORM add_constraints_to_wire_targets_partition(num_partitions::varchar);
PERFORM create_table_partition(
'reserves'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'reserves_in'
,modulus
,num_partitions
);
PERFORM add_constraints_to_reserves_in_partition(num_partitions::varchar);
PERFORM create_table_partition(
'reserves_close'
,modulus
,num_partitions
);
PERFORM add_constraints_to_reserves_close_partition(num_partitions::varchar);
PERFORM create_table_partition(
'reserves_out'
,modulus
,num_partitions
);
PERFORM add_constraints_to_reserves_out_partition(num_partitions::varchar);
PERFORM create_table_partition(
'known_coins'
,modulus
,num_partitions
);
PERFORM add_constraints_to_known_coins_partition(num_partitions::varchar);
PERFORM create_table_partition(
'refresh_commitments'
,modulus
,num_partitions
);
PERFORM add_constraints_to_refresh_commitments_partition(num_partitions::varchar);
PERFORM create_table_partition(
'refresh_revealed_coins'
,modulus
,num_partitions
);
PERFORM add_constraints_to_refresh_revealed_coins_partition(num_partitions::varchar);
PERFORM create_table_partition(
'refresh_transfer_keys'
,modulus
,num_partitions
);
PERFORM add_constraints_to_refresh_transfer_keys_partition(num_partitions::varchar);
PERFORM create_table_partition(
'deposits'
,modulus
,num_partitions
);
PERFORM add_constraints_to_deposits_partition(num_partitions::varchar);
PERFORM create_table_partition(
'refunds'
,modulus
,num_partitions
);
PERFORM add_constraints_to_refunds_partition(num_partitions::varchar);
PERFORM create_table_partition(
'wire_out'
,modulus
,num_partitions
);
PERFORM add_constraints_to_wire_out_partition(num_partitions::varchar);
PERFORM create_table_partition(
'aggregation_tracking'
,modulus
,num_partitions
);
PERFORM add_constraints_to_aggregation_tracking_partition(num_partitions::varchar);
PERFORM create_table_partition(
'recoup'
,modulus
,num_partitions
);
PERFORM add_constraints_to_recoup_partition(num_partitions::varchar);
PERFORM create_table_partition(
'recoup_refresh'
,modulus
,num_partitions
);
PERFORM add_constraints_to_recoup_refresh_partition(num_partitions::varchar);
PERFORM create_table_partition(
'prewire'
,modulus
,num_partitions
);
PERFORM create_table_partition(
'cs_nonce_locks'
,modulus
,num_partitions
);
PERFORM add_constraints_to_cs_nonce_locks_partition(num_partitions::varchar);
num_partitions=num_partitions-1;
EXIT WHEN num_partitions=0;
END LOOP;
PERFORM drop_default_partitions();
END
$$;
COMMIT;

View File

@ -198,6 +198,50 @@ postgres_create_tables (void *cls)
}
/**
* Setup partitions of already existing tables
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @param num the number of partitions to create for each partitioned table
* @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure
*/
static enum GNUNET_GenericReturnValue
postgres_setup_partitions (void *cls,
const uint32_t *num)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_Context *conn;
enum GNUNET_GenericReturnValue ret;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint32 (num),
GNUNET_PQ_query_param_end
};
struct GNUNET_PQ_PreparedStatement ps[] = {
GNUNET_PQ_make_prepare ("setup_partitions",
"SELECT"
" create_partitions"
" ($1);",
1),
GNUNET_PQ_PREPARED_STATEMENT_END
};
conn = GNUNET_PQ_connect_with_cfg (pg->cfg,
"exchangedb-postgres",
"partition-",
NULL,
ps);
if (NULL == conn)
return GNUNET_SYSERR;
ret = GNUNET_OK;
if (0 > GNUNET_PQ_eval_prepared_non_select (conn,
"setup_partitions",
params))
ret = GNUNET_SYSERR;
GNUNET_PQ_disconnect (conn);
return ret;
}
/**
* Initialize prepared statements for @a pg.
*
@ -11717,6 +11761,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->cls = pg;
plugin->drop_tables = &postgres_drop_tables;
plugin->create_tables = &postgres_create_tables;
plugin->setup_partitions = &postgres_setup_partitions;
plugin->start = &postgres_start;
plugin->start_read_committed = &postgres_start_read_committed;
plugin->commit = &postgres_commit;

View File

@ -1333,6 +1333,7 @@ run (void *cls)
struct TALER_EXCHANGEDB_TransactionList *tlp;
const char *sndr = "payto://x-taler-bank/localhost:8080/1";
const char *rcvr = "payto://x-taler-bank/localhost:8080/2";
const uint32_t num_partitions = 10;
unsigned int matched;
unsigned int cnt;
enum GNUNET_DB_QueryStatus qs;
@ -1378,6 +1379,12 @@ run (void *cls)
result = 77;
goto cleanup;
}
if (GNUNET_OK !=
plugin->setup_partitions (plugin->cls, &num_partitions))
{
result = 77;
goto cleanup;
}
plugin->preflight (plugin->cls);
FAILIF (GNUNET_OK !=
plugin->start (plugin->cls,

View File

@ -2232,6 +2232,16 @@ struct TALER_EXCHANGEDB_Plugin
enum GNUNET_GenericReturnValue
(*create_tables)(void *cls);
/**
* Change already present tables of the database to more partitions
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param num the number of partitions to create for each partitioned table
* @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure
*/
enum GNUNET_GenericReturnValue
(*setup_partitions)(void *cls,
const uint32_t *num);
/**
* Start a transaction.