rework deposits sharding, towards making aggregator faster (not necessarily done)
This commit is contained in:
parent
c782dfe2aa
commit
b856d56d95
@ -28,6 +28,18 @@
|
||||
#include "taler_json_lib.h"
|
||||
#include "taler_bank_service.h"
|
||||
|
||||
struct AdditionalDeposit
|
||||
{
|
||||
/**
|
||||
* Public key of the coin.
|
||||
*/
|
||||
struct TALER_CoinSpendPublicKeyP coin_pub;
|
||||
|
||||
/**
|
||||
* Row of the deposit.
|
||||
*/
|
||||
uint64_t row;
|
||||
};
|
||||
|
||||
/**
|
||||
* Information about one aggregation process to be executed. There is
|
||||
@ -42,6 +54,11 @@ struct AggregationUnit
|
||||
*/
|
||||
struct TALER_MerchantPublicKeyP merchant_pub;
|
||||
|
||||
/**
|
||||
* Public key of the coin.
|
||||
*/
|
||||
struct TALER_CoinSpendPublicKeyP coin_pub;
|
||||
|
||||
/**
|
||||
* Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
|
||||
*/
|
||||
@ -97,7 +114,8 @@ struct AggregationUnit
|
||||
/**
|
||||
* Array of row_ids from the aggregation.
|
||||
*/
|
||||
uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
|
||||
struct AdditionalDeposit
|
||||
additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
|
||||
|
||||
/**
|
||||
* Offset specifying how many @e additional_rows are in use.
|
||||
@ -383,7 +401,8 @@ deposit_cb (void *cls,
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
|
||||
au->merchant_pub = *merchant_pub;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
au->coin_pub = *coin_pub;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Aggregator processing payment %s with amount %s\n",
|
||||
TALER_B2S (coin_pub),
|
||||
TALER_amount2s (amount_with_fee));
|
||||
@ -405,7 +424,7 @@ deposit_cb (void *cls,
|
||||
{
|
||||
struct TALER_Amount ntotal;
|
||||
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Non-refunded transaction, subtracting deposit fee %s\n",
|
||||
TALER_amount2s (deposit_fee));
|
||||
if (0 >
|
||||
@ -428,6 +447,9 @@ deposit_cb (void *cls,
|
||||
au->total_amount = ntotal;
|
||||
}
|
||||
}
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Amount after fee is %s\n",
|
||||
TALER_amount2s (&au->total_amount));
|
||||
|
||||
GNUNET_assert (NULL == au->payto_uri);
|
||||
au->payto_uri = GNUNET_strdup (payto_uri);
|
||||
@ -437,7 +459,7 @@ deposit_cb (void *cls,
|
||||
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
|
||||
&au->wtid,
|
||||
sizeof (au->wtid));
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n",
|
||||
TALER_B2S (&au->wtid),
|
||||
TALER_amount2s (amount_with_fee),
|
||||
@ -493,7 +515,7 @@ deposit_cb (void *cls,
|
||||
"Aggregator marks deposit %llu as done\n",
|
||||
(unsigned long long) row_id);
|
||||
qs = db_plugin->mark_deposit_done (db_plugin->cls,
|
||||
merchant_pub,
|
||||
coin_pub,
|
||||
row_id);
|
||||
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
|
||||
{
|
||||
@ -528,6 +550,8 @@ aggregate_cb (void *cls,
|
||||
struct TALER_Amount old;
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
|
||||
if (row_id == au->row_id)
|
||||
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
|
||||
if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
|
||||
{
|
||||
/* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */
|
||||
@ -605,18 +629,29 @@ aggregate_cb (void *cls,
|
||||
}
|
||||
|
||||
/* "append" to our list of rows */
|
||||
au->additional_rows[au->rows_offset++] = row_id;
|
||||
au->additional_rows[au->rows_offset].coin_pub = *coin_pub;
|
||||
au->additional_rows[au->rows_offset].row = row_id;
|
||||
au->rows_offset++;
|
||||
/* insert into aggregation tracking table */
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Adding %llu to aggregate %s\n",
|
||||
(unsigned long long) row_id,
|
||||
TALER_B2S (&au->wtid));
|
||||
qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
|
||||
&au->wtid,
|
||||
row_id);
|
||||
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
|
||||
"Failed to add %llu to aggregate %s: %d\n",
|
||||
(unsigned long long) row_id,
|
||||
TALER_B2S (&au->wtid),
|
||||
qs);
|
||||
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
|
||||
return qs;
|
||||
}
|
||||
qs = db_plugin->mark_deposit_done (db_plugin->cls,
|
||||
&au->merchant_pub,
|
||||
coin_pub,
|
||||
row_id);
|
||||
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
|
||||
{
|
||||
@ -775,7 +810,7 @@ run_aggregation (void *cls)
|
||||
}
|
||||
|
||||
/* Now try to find other deposits to aggregate */
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Found ready deposit for %s, aggregating by target %llu\n",
|
||||
TALER_B2S (&au_active.merchant_pub),
|
||||
(unsigned long long) au_active.wire_target);
|
||||
@ -808,13 +843,17 @@ run_aggregation (void *cls)
|
||||
s);
|
||||
return;
|
||||
}
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Found %d other deposits to combine into wire transfer.\n",
|
||||
qs);
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Found %d other deposits to combine into wire transfer with fee %s.\n",
|
||||
qs,
|
||||
TALER_amount2s (&au_active.fees.wire));
|
||||
|
||||
/* Subtract wire transfer fee and round to the unit supported by the
|
||||
wire transfer method; Check if after rounding down, we still have
|
||||
an amount to transfer, and if not mark as 'tiny'. */
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Rounding aggregate of %s\n",
|
||||
TALER_amount2s (&au_active.total_amount));
|
||||
if ( (0 >=
|
||||
TALER_amount_subtract (&au_active.final_amount,
|
||||
&au_active.total_amount,
|
||||
@ -822,8 +861,7 @@ run_aggregation (void *cls)
|
||||
(GNUNET_SYSERR ==
|
||||
TALER_amount_round_down (&au_active.final_amount,
|
||||
¤cy_round_unit)) ||
|
||||
( (0 == au_active.final_amount.value) &&
|
||||
(0 == au_active.final_amount.fraction) ) )
|
||||
(TALER_amount_is_zero (&au_active.final_amount)) )
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Aggregate value too low for transfer (%d/%s)\n",
|
||||
@ -848,23 +886,29 @@ run_aggregation (void *cls)
|
||||
return;
|
||||
}
|
||||
/* Mark transactions by row_id as minor */
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
"Marking %s (%llu) as tiny\n",
|
||||
TALER_B2S (&au_active.coin_pub),
|
||||
(unsigned long long) au_active.row_id);
|
||||
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
|
||||
&au_active.merchant_pub,
|
||||
&au_active.coin_pub,
|
||||
au_active.row_id);
|
||||
if (0 <= qs)
|
||||
if (0 < qs)
|
||||
{
|
||||
for (unsigned int i = 0; i<au_active.rows_offset; i++)
|
||||
{
|
||||
qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
|
||||
&au_active.merchant_pub,
|
||||
au_active.additional_rows[i]);
|
||||
if (0 > qs)
|
||||
&au_active.additional_rows[i].
|
||||
coin_pub,
|
||||
au_active.additional_rows[i].row);
|
||||
if (0 >= qs)
|
||||
break;
|
||||
}
|
||||
}
|
||||
GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs);
|
||||
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Serialization issue, trying again later!\n");
|
||||
db_plugin->rollback (db_plugin->cls);
|
||||
cleanup_au (&au_active);
|
||||
@ -876,6 +920,7 @@ run_aggregation (void *cls)
|
||||
}
|
||||
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
|
||||
{
|
||||
GNUNET_break (0);
|
||||
db_plugin->rollback (db_plugin->cls);
|
||||
cleanup_au (&au_active);
|
||||
global_ret = EXIT_FAILURE;
|
||||
|
@ -55,6 +55,8 @@ DROP TABLE IF EXISTS wire_targets CASCADE;
|
||||
DROP FUNCTION IF EXISTS add_constraints_to_wire_targets_partition;
|
||||
DROP TABLE IF EXISTS wire_fee CASCADE;
|
||||
DROP TABLE IF EXISTS deposits CASCADE;
|
||||
DROP TABLE IF EXISTS deposits_by_ready CASCADE;
|
||||
DROP TABLE IF EXISTS deposits_for_matching CASCADE;
|
||||
DROP FUNCTION IF EXISTS add_constraints_to_deposits_partition;
|
||||
DROP TABLE IF EXISTS extension_details CASCADE;
|
||||
DROP TABLE IF EXISTS refunds CASCADE;
|
||||
@ -88,6 +90,7 @@ DROP TABLE IF EXISTS recoup_by_reserve CASCADE;
|
||||
DROP TABLE IF EXISTS partners CASCADE;
|
||||
DROP TABLE IF EXISTS account_merges CASCADE;
|
||||
DROP TABLE IF EXISTS purse_merges CASCADE;
|
||||
DROP TABLE IF EXISTS purse_deposits CASCADE;
|
||||
DROP TABLE IF EXISTS contracts CASCADE;
|
||||
DROP TABLE IF EXISTS history_requests CASCADE;
|
||||
DROP TABLE IF EXISTS close_requests CASCADE;
|
||||
@ -103,8 +106,9 @@ DROP FUNCTION IF EXISTS exchange_do_withdraw;
|
||||
DROP FUNCTION IF EXISTS exchange_do_withdraw_limit_check;
|
||||
DROP FUNCTION IF EXISTS recoup_insert_trigger;
|
||||
DROP FUNCTION IF EXISTS recoup_delete_trigger;
|
||||
DROP FUNCTION IF EXISTS deposits_by_coin_insert_trigger;
|
||||
DROP FUNCTION IF EXISTS deposits_by_coin_delete_trigger;
|
||||
DROP FUNCTION IF EXISTS deposits_insert_trigger;
|
||||
DROP FUNCTION IF EXISTS deposits_update_trigger;
|
||||
DROP FUNCTION IF EXISTS deposits_delete_trigger;
|
||||
DROP FUNCTION IF EXISTS reserves_out_by_reserve_insert_trigger;
|
||||
DROP FUNCTION IF EXISTS reserves_out_by_reserve_delete_trigger;
|
||||
DROP FUNCTION IF EXISTS exchange_do_deposit;
|
||||
|
@ -610,7 +610,7 @@ CREATE TABLE IF NOT EXISTS deposits
|
||||
(deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- PRIMARY KEY
|
||||
,shard INT8 NOT NULL
|
||||
,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) -- REFERENCES known_coins (coin_pub) ON DELETE CASCADE
|
||||
,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE
|
||||
,known_coin_id BIGINT NOT NULL -- REFERENCES known_coins (known_coin_id) ON DELETE CASCADE --- FIXME: column needed???
|
||||
,amount_with_fee_val INT8 NOT NULL
|
||||
,amount_with_fee_frac INT4 NOT NULL
|
||||
,wallet_timestamp INT8 NOT NULL
|
||||
@ -626,22 +626,11 @@ CREATE TABLE IF NOT EXISTS deposits
|
||||
,done BOOLEAN NOT NULL DEFAULT FALSE
|
||||
,extension_blocked BOOLEAN NOT NULL DEFAULT FALSE
|
||||
,extension_details_serial_id INT8 REFERENCES extension_details (extension_details_serial_id) ON DELETE CASCADE
|
||||
,UNIQUE (shard, coin_pub, merchant_pub, h_contract_terms)
|
||||
,UNIQUE (coin_pub, merchant_pub, h_contract_terms)
|
||||
)
|
||||
PARTITION BY HASH (shard); -- FIXME: why not BY RANGE? RANGE would seem better for 'deposits_get_ready'!
|
||||
PARTITION BY HASH (coin_pub);
|
||||
-- FIXME:
|
||||
-- new idea: partition deposits by coin_pub (remove deposits_by_coin)
|
||||
-- define 'ready' == ! (tiny || done || blocked)
|
||||
-- add new deposits_by_ready (on shard + wire_deadline), select by shard, then ready + deadline
|
||||
-- -- use triggers to ONLY include 'ready' deposits (delete on update)!
|
||||
-- -- use multi-level partitions: Hash(shard) + Range(wire_deadline/sec)
|
||||
-- add new deposits_by_match (on shard + refund_deadline)
|
||||
-- -- use triggers to ONLY include 'ready' deposits (delete on update)!
|
||||
-- -- use multi-level partitions: Hash(shard) + Range(refund_deadline/sec)
|
||||
-- => first we select per-merchant shard, basically stay on the same system as other ops for the same merchant
|
||||
-- => second we select by deadline, use enough values so that _usually_ the aggregator
|
||||
-- and the 'insert' process _can_ work on different shards!
|
||||
-- => the latter could be achieved by dynamically (!) creating/deleting partitions:
|
||||
-- TODO: dynamically (!) creating/deleting partitions:
|
||||
-- create new partitions 'as needed', drop old ones once the aggregator has made
|
||||
-- them empty; as 'new' deposits will always have deadlines in the future, this
|
||||
-- would basically guarantee no conflict between aggregator and exchange service!
|
||||
@ -683,31 +672,15 @@ COMMENT ON COLUMN deposits.extension_details_serial_id
|
||||
COMMENT ON COLUMN deposits.tiny
|
||||
IS 'Set to TRUE if we decided that the amount is too small to ever trigger a wire transfer by itself (requires real aggregation)';
|
||||
|
||||
-- FIXME: we sometimes go ONLY by 'deposit_serial_id',
|
||||
-- check if queries could be improved by adding shard or adding another index without shard here, or inverting the order of the index here!
|
||||
CREATE INDEX IF NOT EXISTS deposits_deposit_by_serial_id_index
|
||||
ON deposits
|
||||
(shard,deposit_serial_id);
|
||||
CREATE INDEX IF NOT EXISTS deposits_for_get_ready_index
|
||||
ON deposits
|
||||
(shard ASC
|
||||
,done
|
||||
,extension_blocked
|
||||
,tiny
|
||||
,wire_deadline ASC
|
||||
);
|
||||
COMMENT ON INDEX deposits_for_get_ready_index
|
||||
IS 'for deposits_get_ready';
|
||||
|
||||
CREATE INDEX IF NOT EXISTS deposits_for_iterate_matching_index
|
||||
CREATE INDEX IF NOT EXISTS deposits_by_coin_pub_index
|
||||
ON deposits
|
||||
(shard
|
||||
,merchant_pub
|
||||
,wire_target_h_payto
|
||||
,done
|
||||
,extension_blocked
|
||||
,refund_deadline ASC
|
||||
);
|
||||
COMMENT ON INDEX deposits_for_iterate_matching_index
|
||||
IS 'for deposits_iterate_matching';
|
||||
(coin_pub);
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS deposits_default
|
||||
@ -732,66 +705,198 @@ $$;
|
||||
SELECT add_constraints_to_deposits_partition('default');
|
||||
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS deposits_by_coin
|
||||
(deposit_serial_id BIGINT
|
||||
CREATE TABLE IF NOT EXISTS deposits_by_ready
|
||||
(wire_deadline INT8 NOT NULL
|
||||
,shard INT8 NOT NULL
|
||||
,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)
|
||||
,deposit_serial_id INT8
|
||||
)
|
||||
PARTITION BY HASH (coin_pub);
|
||||
COMMENT ON TABLE deposits_by_coin
|
||||
IS 'Enables fast lookups of deposit by coin_pub, auto-populated via TRIGGER below';
|
||||
PARTITION BY RANGE (wire_deadline);
|
||||
COMMENT ON TABLE deposits_by_ready
|
||||
IS 'Enables fast lookups for deposits_get_ready, auto-populated via TRIGGER below';
|
||||
|
||||
CREATE INDEX IF NOT EXISTS deposits_by_coin_main_index
|
||||
ON deposits_by_coin
|
||||
(coin_pub);
|
||||
CREATE INDEX IF NOT EXISTS deposits_by_ready_main_index
|
||||
ON deposits_by_ready
|
||||
(wire_deadline ASC, shard ASC, coin_pub);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS deposits_by_coin_default
|
||||
PARTITION OF deposits_by_coin
|
||||
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
|
||||
CREATE TABLE IF NOT EXISTS deposits_by_ready_default
|
||||
PARTITION OF deposits_by_ready
|
||||
DEFAULT;
|
||||
|
||||
CREATE OR REPLACE FUNCTION deposits_by_coin_insert_trigger()
|
||||
|
||||
CREATE TABLE IF NOT EXISTS deposits_for_matching
|
||||
(refund_deadline INT8 NOT NULL
|
||||
,shard INT8 NOT NULL
|
||||
,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)
|
||||
,deposit_serial_id INT8
|
||||
)
|
||||
PARTITION BY RANGE (refund_deadline);
|
||||
COMMENT ON TABLE deposits_for_matching
|
||||
IS 'Enables fast lookups for deposits_iterate_matching, auto-populated via TRIGGER below';
|
||||
|
||||
CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index
|
||||
ON deposits_for_matching
|
||||
(refund_deadline ASC, shard, coin_pub);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS deposits_for_matching_default
|
||||
PARTITION OF deposits_for_matching
|
||||
DEFAULT;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION deposits_insert_trigger()
|
||||
RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
is_ready BOOLEAN;
|
||||
DECLARE
|
||||
is_tready BOOLEAN; -- is ready, but may be tiny
|
||||
BEGIN
|
||||
INSERT INTO deposits_by_coin
|
||||
(deposit_serial_id
|
||||
,shard
|
||||
,coin_pub)
|
||||
VALUES
|
||||
(NEW.deposit_serial_id
|
||||
,NEW.shard
|
||||
,NEW.coin_pub);
|
||||
is_ready = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked);
|
||||
is_tready = NOT (NEW.done OR NEW.extension_blocked);
|
||||
|
||||
IF (is_ready)
|
||||
THEN
|
||||
INSERT INTO deposits_by_ready
|
||||
(wire_deadline
|
||||
,shard
|
||||
,coin_pub
|
||||
,deposit_serial_id)
|
||||
VALUES
|
||||
(NEW.wire_deadline
|
||||
,NEW.shard
|
||||
,NEW.coin_pub
|
||||
,NEW.deposit_serial_id);
|
||||
END IF;
|
||||
IF (is_tready)
|
||||
THEN
|
||||
INSERT INTO deposits_for_matching
|
||||
(refund_deadline
|
||||
,shard
|
||||
,coin_pub
|
||||
,deposit_serial_id)
|
||||
VALUES
|
||||
(NEW.refund_deadline
|
||||
,NEW.shard
|
||||
,NEW.coin_pub
|
||||
,NEW.deposit_serial_id);
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END $$;
|
||||
COMMENT ON FUNCTION deposits_by_coin_insert_trigger()
|
||||
IS 'Replicate deposit inserts into deposits_by_coin table.';
|
||||
COMMENT ON FUNCTION deposits_insert_trigger()
|
||||
IS 'Replicate deposit inserts into materialized indices.';
|
||||
|
||||
CREATE TRIGGER deposits_on_insert
|
||||
AFTER INSERT
|
||||
ON deposits
|
||||
FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_insert_trigger();
|
||||
FOR EACH ROW EXECUTE FUNCTION deposits_insert_trigger();
|
||||
|
||||
CREATE OR REPLACE FUNCTION deposits_by_coin_delete_trigger()
|
||||
CREATE OR REPLACE FUNCTION deposits_update_trigger()
|
||||
RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
was_ready BOOLEAN;
|
||||
DECLARE
|
||||
is_ready BOOLEAN;
|
||||
DECLARE
|
||||
was_tready BOOLEAN; -- was ready, but may be tiny
|
||||
DECLARE
|
||||
is_tready BOOLEAN; -- is ready, but may be tiny
|
||||
BEGIN
|
||||
DELETE FROM deposits_by_coin
|
||||
WHERE coin_pub = OLD.coin_pub
|
||||
AND shard = OLD.shard
|
||||
AND deposit_serial_id = OLD.deposit_serial_id;
|
||||
RETURN OLD;
|
||||
was_ready = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked);
|
||||
is_ready = NOT (NEW.done OR NEW.tiny OR NEW.extension_blocked);
|
||||
was_tready = NOT (OLD.done OR OLD.extension_blocked);
|
||||
is_tready = NOT (NEW.done OR NEW.extension_blocked);
|
||||
IF (was_ready AND NOT is_ready)
|
||||
THEN
|
||||
DELETE FROM deposits_by_ready
|
||||
WHERE wire_deadline = OLD.wire_deadline
|
||||
AND shard = OLD.shard
|
||||
AND coin_pub = OLD.coin_pub
|
||||
AND deposit_serial_id = OLD.deposit_serial_id;
|
||||
END IF;
|
||||
IF (was_tready AND NOT is_tready)
|
||||
THEN
|
||||
DELETE FROM deposits_for_matching
|
||||
WHERE refund_deadline = OLD.refund_deadline
|
||||
AND shard = OLD.shard
|
||||
AND coin_pub = OLD.coin_pub
|
||||
AND deposit_serial_id = OLD.deposit_serial_id;
|
||||
END IF;
|
||||
IF (is_ready AND NOT was_ready)
|
||||
THEN
|
||||
INSERT INTO deposits_by_ready
|
||||
(wire_deadline
|
||||
,shard
|
||||
,coin_pub
|
||||
,deposit_serial_id)
|
||||
VALUES
|
||||
(NEW.wire_deadline
|
||||
,NEW.shard
|
||||
,NEW.coin_pub
|
||||
,NEW.deposit_serial_id);
|
||||
END IF;
|
||||
IF (is_tready AND NOT was_tready)
|
||||
THEN
|
||||
INSERT INTO deposits_for_matching
|
||||
(refund_deadline
|
||||
,shard
|
||||
,coin_pub
|
||||
,deposit_serial_id)
|
||||
VALUES
|
||||
(NEW.refund_deadline
|
||||
,NEW.shard
|
||||
,NEW.coin_pub
|
||||
,NEW.deposit_serial_id);
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END $$;
|
||||
COMMENT ON FUNCTION deposits_update_trigger()
|
||||
IS 'Replicate deposits changes into materialized indices.';
|
||||
|
||||
CREATE TRIGGER deposits_on_update
|
||||
AFTER UPDATE
|
||||
ON deposits
|
||||
FOR EACH ROW EXECUTE FUNCTION deposits_update_trigger();
|
||||
|
||||
CREATE OR REPLACE FUNCTION deposits_delete_trigger()
|
||||
RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
was_ready BOOLEAN;
|
||||
DECLARE
|
||||
was_tready BOOLEAN; -- is ready, but may be tiny
|
||||
BEGIN
|
||||
was_ready = NOT (OLD.done OR OLD.tiny OR OLD.extension_blocked);
|
||||
was_tready = NOT (OLD.done OR OLD.extension_blocked);
|
||||
|
||||
IF (was_ready)
|
||||
THEN
|
||||
DELETE FROM deposits_by_ready
|
||||
WHERE wire_deadline = OLD.wire_deadline
|
||||
AND shard = OLD.shard
|
||||
AND coin_pub = OLD.coin_pub
|
||||
AND deposit_serial_id = OLD.deposit_serial_id;
|
||||
END IF;
|
||||
IF (was_tready)
|
||||
THEN
|
||||
DELETE FROM deposits_for_matching
|
||||
WHERE refund_deadline = OLD.refund_deadline
|
||||
AND shard = OLD.shard
|
||||
AND coin_pub = OLD.coin_pub
|
||||
AND deposit_serial_id = OLD.deposit_serial_id;
|
||||
END IF;
|
||||
RETURN NEW;
|
||||
END $$;
|
||||
COMMENT ON FUNCTION deposits_by_coin_delete_trigger()
|
||||
IS 'Replicate deposits deletions into deposits_by_coin table.';
|
||||
COMMENT ON FUNCTION deposits_delete_trigger()
|
||||
IS 'Replicate deposit deletions into materialized indices.';
|
||||
|
||||
CREATE TRIGGER deposits_on_delete
|
||||
AFTER DELETE
|
||||
ON deposits
|
||||
FOR EACH ROW EXECUTE FUNCTION deposits_by_coin_delete_trigger();
|
||||
|
||||
ON deposits
|
||||
FOR EACH ROW EXECUTE FUNCTION deposits_delete_trigger();
|
||||
|
||||
|
||||
CREATE TABLE IF NOT EXISTS refunds
|
||||
@ -2011,7 +2116,7 @@ DECLARE
|
||||
BEGIN
|
||||
-- Shards: INSERT extension_details (by extension_details_serial_id)
|
||||
-- INSERT wire_targets (by h_payto), on CONFLICT DO NOTHING;
|
||||
-- INSERT deposits (by shard + merchant_pub + h_payto), ON CONFLICT DO NOTHING;
|
||||
-- INSERT deposits (by coin_pub, shard), ON CONFLICT DO NOTHING;
|
||||
-- UPDATE known_coins (by coin_pub)
|
||||
|
||||
IF NOT NULL in_extension_details
|
||||
@ -2356,27 +2461,26 @@ DECLARE
|
||||
DECLARE
|
||||
deposit_frac INT8; -- amount that was originally deposited
|
||||
BEGIN
|
||||
-- Shards: SELECT deposits (by shard, coin_pub, h_contract_terms, merchant_pub)
|
||||
-- Shards: SELECT deposits (coin_pub, shard, h_contract_terms, merchant_pub)
|
||||
-- INSERT refunds (by deposit_serial_id, rtransaction_id) ON CONFLICT DO NOTHING
|
||||
-- SELECT refunds (by deposit_serial_id)
|
||||
-- UPDATE known_coins (by coin_pub)
|
||||
|
||||
SELECT
|
||||
dep.deposit_serial_id
|
||||
,dep.amount_with_fee_val
|
||||
,dep.amount_with_fee_frac
|
||||
,dep.done
|
||||
deposit_serial_id
|
||||
,amount_with_fee_val
|
||||
,amount_with_fee_frac
|
||||
,done
|
||||
INTO
|
||||
dsi
|
||||
,deposit_val
|
||||
,deposit_frac
|
||||
,out_gone
|
||||
FROM deposits_by_coin dbc
|
||||
JOIN deposits dep USING (shard,deposit_serial_id)
|
||||
WHERE dbc.coin_pub=in_coin_pub
|
||||
AND dep.shard=in_deposit_shard
|
||||
AND dep.merchant_pub=in_merchant_pub
|
||||
AND dep.h_contract_terms=in_h_contract_terms;
|
||||
FROM deposits
|
||||
WHERE coin_pub=in_coin_pub
|
||||
AND shard=in_deposit_shard
|
||||
AND merchant_pub=in_merchant_pub
|
||||
AND h_contract_terms=in_h_contract_terms;
|
||||
|
||||
IF NOT FOUND
|
||||
THEN
|
||||
|
@ -990,12 +990,11 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
",rtransaction_id "
|
||||
",amount_with_fee_val "
|
||||
",amount_with_fee_frac "
|
||||
") SELECT dbc.deposit_serial_id, $3, $5, $6, $7"
|
||||
" FROM deposits_by_coin dbc"
|
||||
" JOIN deposits dep USING (shard,deposit_serial_id)"
|
||||
" WHERE dbc.coin_pub=$1"
|
||||
" AND dep.h_contract_terms=$4"
|
||||
" AND dep.merchant_pub=$2",
|
||||
") SELECT deposit_serial_id, $3, $5, $6, $7"
|
||||
" FROM deposits" /* FIXME: check if adding additional AND on the 'shard' would help (possibly after reviewing indices on deposits!) */
|
||||
" WHERE coin_pub=$1"
|
||||
" AND h_contract_terms=$4"
|
||||
" AND merchant_pub=$2",
|
||||
7),
|
||||
/* Query the 'refunds' by coin public key */
|
||||
GNUNET_PQ_make_prepare (
|
||||
@ -1010,12 +1009,11 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
",denom.fee_refund_val "
|
||||
",denom.fee_refund_frac "
|
||||
",ref.refund_serial_id"
|
||||
" FROM deposits_by_coin dbc"
|
||||
" FROM deposits dep"
|
||||
" JOIN refunds ref USING (deposit_serial_id)"
|
||||
" JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id = dep.deposit_serial_id)"
|
||||
" JOIN known_coins kc ON (dbc.coin_pub = kc.coin_pub)"
|
||||
" JOIN known_coins kc ON (dep.coin_pub = kc.coin_pub)"
|
||||
" JOIN denominations denom USING (denominations_serial)"
|
||||
" WHERE dbc.coin_pub=$1;",
|
||||
" WHERE dep.coin_pub=$1;",
|
||||
1),
|
||||
/* Query the 'refunds' by coin public key, merchant_pub and contract hash */
|
||||
GNUNET_PQ_make_prepare (
|
||||
@ -1023,10 +1021,9 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
"SELECT"
|
||||
" ref.amount_with_fee_val"
|
||||
",ref.amount_with_fee_frac"
|
||||
" FROM deposits_by_coin dbc"
|
||||
" FROM deposits dep"
|
||||
" JOIN refunds ref USING (shard,deposit_serial_id)"
|
||||
" JOIN deposits dep ON (dbc.shard = dep.shard AND dbc.deposit_serial_id = dep.deposit_serial_id)"
|
||||
" WHERE dbc.coin_pub=$1"
|
||||
" WHERE dep.coin_pub=$1"
|
||||
" AND dep.merchant_pub=$2"
|
||||
" AND dep.h_contract_terms=$3;",
|
||||
3),
|
||||
@ -1053,6 +1050,7 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
/* Lock deposit table; NOTE: we may want to eventually shard the
|
||||
deposit table to avoid this lock being the main point of
|
||||
contention limiting transaction performance. */
|
||||
// FIXME: check if this query is even still used!
|
||||
GNUNET_PQ_make_prepare (
|
||||
"lock_deposit",
|
||||
"LOCK TABLE deposits;",
|
||||
@ -1098,12 +1096,11 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
",dep.h_contract_terms"
|
||||
",dep.wire_salt"
|
||||
",wt.payto_uri AS receiver_wire_account"
|
||||
" FROM deposits_by_coin dbc"
|
||||
" JOIN deposits dep USING (shard,deposit_serial_id)"
|
||||
" JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)"
|
||||
" FROM deposits dep"
|
||||
" JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)"
|
||||
" JOIN denominations USING (denominations_serial)"
|
||||
" JOIN wire_targets wt USING (wire_target_h_payto)"
|
||||
" WHERE dbc.coin_pub=$1"
|
||||
" WHERE dep.coin_pub=$1"
|
||||
" AND dep.merchant_pub=$3"
|
||||
" AND dep.h_contract_terms=$2;",
|
||||
3),
|
||||
@ -1150,12 +1147,11 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
",denom.fee_deposit_val"
|
||||
",denom.fee_deposit_frac"
|
||||
",dep.wire_deadline"
|
||||
" FROM deposits_by_coin dbc"
|
||||
" JOIN deposits dep USING (shard,deposit_serial_id)"
|
||||
" FROM deposits dep"
|
||||
" JOIN wire_targets wt USING (wire_target_h_payto)"
|
||||
" JOIN known_coins kc ON (kc.coin_pub = dbc.coin_pub)"
|
||||
" JOIN known_coins kc ON (kc.coin_pub = dep.coin_pub)"
|
||||
" JOIN denominations denom USING (denominations_serial)"
|
||||
" WHERE dbc.coin_pub=$1"
|
||||
" WHERE dep.coin_pub=$1"
|
||||
" AND dep.merchant_pub=$3"
|
||||
" AND dep.h_contract_terms=$2;",
|
||||
3),
|
||||
@ -1163,7 +1159,7 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
GNUNET_PQ_make_prepare (
|
||||
"deposits_get_ready",
|
||||
"SELECT"
|
||||
" deposit_serial_id"
|
||||
" dep.deposit_serial_id"
|
||||
",amount_with_fee_val"
|
||||
",amount_with_fee_frac"
|
||||
",denom.fee_deposit_val"
|
||||
@ -1173,47 +1169,46 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
",wire_target_serial_id"
|
||||
",merchant_pub"
|
||||
",kc.coin_pub"
|
||||
" FROM deposits"
|
||||
" FROM deposits_by_ready dbr"
|
||||
" JOIN deposits dep"
|
||||
" ON (dbr.coin_pub = dep.coin_pub AND dbr.deposit_serial_id = dep.deposit_serial_id)"
|
||||
" JOIN wire_targets "
|
||||
" USING (wire_target_h_payto)"
|
||||
" JOIN known_coins kc"
|
||||
" USING (coin_pub)"
|
||||
" ON (kc.coin_pub = dep.coin_pub)"
|
||||
" JOIN denominations denom"
|
||||
" USING (denominations_serial)"
|
||||
" WHERE "
|
||||
" shard >= $2"
|
||||
" AND shard <= $3"
|
||||
" AND done=FALSE"
|
||||
" AND extension_blocked=FALSE"
|
||||
" AND tiny=FALSE"
|
||||
" AND wire_deadline<=$1"
|
||||
" WHERE dbr.wire_deadline<=$1"
|
||||
" AND dbr.shard >= $2"
|
||||
" AND dbr.shard <= $3"
|
||||
" AND (kyc_ok OR $4)"
|
||||
" ORDER BY "
|
||||
" shard ASC"
|
||||
" ,wire_deadline ASC"
|
||||
" dbr.wire_deadline ASC"
|
||||
" ,dbr.shard ASC"
|
||||
" LIMIT 1;",
|
||||
4),
|
||||
/* Used in #postgres_iterate_matching_deposits() */
|
||||
GNUNET_PQ_make_prepare (
|
||||
"deposits_iterate_matching",
|
||||
"SELECT"
|
||||
" deposit_serial_id"
|
||||
",amount_with_fee_val"
|
||||
",amount_with_fee_frac"
|
||||
" dep.deposit_serial_id"
|
||||
",dep.amount_with_fee_val"
|
||||
",dep.amount_with_fee_frac"
|
||||
",denom.fee_deposit_val"
|
||||
",denom.fee_deposit_frac"
|
||||
",h_contract_terms"
|
||||
",kc.coin_pub"
|
||||
" FROM deposits"
|
||||
" JOIN known_coins kc USING (coin_pub)"
|
||||
" JOIN denominations denom USING (denominations_serial)"
|
||||
" WHERE shard=$4"
|
||||
" AND merchant_pub=$1"
|
||||
" AND wire_target_h_payto=$2"
|
||||
" AND done=FALSE"
|
||||
" AND extension_blocked=FALSE"
|
||||
" AND refund_deadline<$3"
|
||||
" ORDER BY refund_deadline ASC"
|
||||
",dep.h_contract_terms"
|
||||
",dfm.coin_pub"
|
||||
" FROM deposits_for_matching dfm"
|
||||
" JOIN deposits dep "
|
||||
" ON (dep.coin_pub = dfm.coin_pub and dep.deposit_serial_id = dfm.deposit_serial_id)"
|
||||
" JOIN known_coins kc"
|
||||
" ON (dep.coin_pub = kc.coin_pub)"
|
||||
" JOIN denominations denom"
|
||||
" USING (denominations_serial)"
|
||||
" WHERE dfm.refund_deadline<$3"
|
||||
" AND dfm.shard=$4"
|
||||
" AND dep.merchant_pub=$1"
|
||||
" AND dep.wire_target_h_payto=$2"
|
||||
" LIMIT "
|
||||
TALER_QUOTE (
|
||||
TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";",
|
||||
@ -1223,16 +1218,16 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
"mark_deposit_tiny",
|
||||
"UPDATE deposits"
|
||||
" SET tiny=TRUE"
|
||||
" WHERE shard=$2"
|
||||
" AND deposit_serial_id=$1",
|
||||
" WHERE coin_pub=$1"
|
||||
" AND deposit_serial_id=$2",
|
||||
2),
|
||||
/* Used in #postgres_mark_deposit_done() */
|
||||
GNUNET_PQ_make_prepare (
|
||||
"mark_deposit_done",
|
||||
"UPDATE deposits"
|
||||
" SET done=TRUE"
|
||||
" WHERE shard=$2"
|
||||
" AND deposit_serial_id=$1;",
|
||||
" WHERE coin_pub=$1"
|
||||
" AND deposit_serial_id=$2;",
|
||||
2),
|
||||
/* Used in #postgres_get_coin_transactions() to obtain information
|
||||
about how a coin has been spend with /deposit requests. */
|
||||
@ -1255,16 +1250,14 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
",dep.coin_sig"
|
||||
",dep.deposit_serial_id"
|
||||
",dep.done"
|
||||
" FROM deposits_by_coin dbc"
|
||||
" JOIN deposits dep"
|
||||
" USING (shard,deposit_serial_id)"
|
||||
" FROM deposits dep"
|
||||
" JOIN wire_targets wt"
|
||||
" USING (wire_target_h_payto)"
|
||||
" JOIN known_coins kc"
|
||||
" ON (kc.coin_pub = dbc.coin_pub)"
|
||||
" ON (kc.coin_pub = dep.coin_pub)"
|
||||
" JOIN denominations denoms"
|
||||
" USING (denominations_serial)"
|
||||
" WHERE dbc.coin_pub=$1;",
|
||||
" WHERE dep.coin_pub=$1;",
|
||||
1),
|
||||
|
||||
/* Used in #postgres_get_link_data(). */
|
||||
@ -1329,20 +1322,18 @@ prepare_statements (struct PostgresClosure *pg)
|
||||
",wt.payto_uri"
|
||||
",denom.fee_deposit_val"
|
||||
",denom.fee_deposit_frac"
|
||||
" FROM deposits_by_coin dbc"
|
||||
" JOIN deposits dep"
|
||||
" USING (shard,deposit_serial_id)"
|
||||
" FROM deposits dep"
|
||||
" JOIN wire_targets wt"
|
||||
" USING (wire_target_h_payto)"
|
||||
" JOIN aggregation_tracking"
|
||||
" USING (deposit_serial_id)"
|
||||
" JOIN known_coins kc"
|
||||
" ON (kc.coin_pub = dbc.coin_pub)"
|
||||
" ON (kc.coin_pub = dep.coin_pub)"
|
||||
" JOIN denominations denom"
|
||||
" USING (denominations_serial)"
|
||||
" JOIN wire_out"
|
||||
" USING (wtid_raw)"
|
||||
" WHERE dbc.coin_pub=$1"
|
||||
" WHERE dep.coin_pub=$1"
|
||||
" AND dep.merchant_pub=$3"
|
||||
" AND dep.h_contract_terms=$2",
|
||||
3),
|
||||
@ -5898,14 +5889,13 @@ postgres_have_deposit2 (
|
||||
*/
|
||||
static enum GNUNET_DB_QueryStatus
|
||||
postgres_mark_deposit_tiny (void *cls,
|
||||
const struct TALER_MerchantPublicKeyP *merchant_pub,
|
||||
const struct TALER_CoinSpendPublicKeyP *coin_pub,
|
||||
uint64_t rowid)
|
||||
{
|
||||
struct PostgresClosure *pg = cls;
|
||||
uint64_t deposit_shard = compute_shard (merchant_pub);
|
||||
struct GNUNET_PQ_QueryParam params[] = {
|
||||
GNUNET_PQ_query_param_auto_from_type (coin_pub),
|
||||
GNUNET_PQ_query_param_uint64 (&rowid),
|
||||
GNUNET_PQ_query_param_uint64 (&deposit_shard),
|
||||
GNUNET_PQ_query_param_end
|
||||
};
|
||||
|
||||
@ -5927,14 +5917,13 @@ postgres_mark_deposit_tiny (void *cls,
|
||||
*/
|
||||
static enum GNUNET_DB_QueryStatus
|
||||
postgres_mark_deposit_done (void *cls,
|
||||
const struct TALER_MerchantPublicKeyP *merchant_pub,
|
||||
const struct TALER_CoinSpendPublicKeyP *coin_pub,
|
||||
uint64_t rowid)
|
||||
{
|
||||
struct PostgresClosure *pg = cls;
|
||||
uint64_t deposit_shard = compute_shard (merchant_pub);
|
||||
struct GNUNET_PQ_QueryParam params[] = {
|
||||
GNUNET_PQ_query_param_auto_from_type (coin_pub),
|
||||
GNUNET_PQ_query_param_uint64 (&rowid),
|
||||
GNUNET_PQ_query_param_uint64 (&deposit_shard),
|
||||
GNUNET_PQ_query_param_end
|
||||
};
|
||||
|
||||
@ -6431,6 +6420,12 @@ postgres_insert_deposit (void *cls,
|
||||
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
|
||||
return qs;
|
||||
}
|
||||
if (GNUNET_TIME_timestamp_cmp (deposit->wire_deadline,
|
||||
<,
|
||||
deposit->refund_deadline))
|
||||
{
|
||||
GNUNET_break (0);
|
||||
}
|
||||
{
|
||||
uint64_t shard = compute_shard (&deposit->merchant_pub);
|
||||
struct GNUNET_PQ_QueryParam params[] = {
|
||||
|
@ -2284,7 +2284,7 @@ run (void *cls)
|
||||
"test-2"));
|
||||
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
|
||||
plugin->mark_deposit_tiny (plugin->cls,
|
||||
&deposit.merchant_pub,
|
||||
&deposit.coin.coin_pub,
|
||||
deposit_rowid));
|
||||
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
|
||||
plugin->get_ready_deposit (plugin->cls,
|
||||
@ -2306,7 +2306,7 @@ run (void *cls)
|
||||
"test-3"));
|
||||
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
|
||||
plugin->mark_deposit_done (plugin->cls,
|
||||
&deposit.merchant_pub,
|
||||
&deposit.coin.coin_pub,
|
||||
deposit_rowid));
|
||||
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
|
||||
plugin->commit (plugin->cls));
|
||||
|
@ -3060,13 +3060,13 @@ struct TALER_EXCHANGEDB_Plugin
|
||||
* returned by @e iterate_ready_deposits()
|
||||
*
|
||||
* @param cls the @e cls of this struct with the plugin-specific state
|
||||
* @param merchant_pub identifies the beneficiary of the deposit
|
||||
* @param coin_pub identifies the coin of the deposit
|
||||
* @param deposit_rowid identifies the deposit row to modify
|
||||
* @return query result status
|
||||
*/
|
||||
enum GNUNET_DB_QueryStatus
|
||||
(*mark_deposit_tiny)(void *cls,
|
||||
const struct TALER_MerchantPublicKeyP *merchant_pub,
|
||||
const struct TALER_CoinSpendPublicKeyP *coin_pub,
|
||||
uint64_t rowid);
|
||||
|
||||
|
||||
@ -3076,13 +3076,13 @@ struct TALER_EXCHANGEDB_Plugin
|
||||
* @e iterate_ready_deposits() or @e iterate_matching_deposits().
|
||||
*
|
||||
* @param cls the @e cls of this struct with the plugin-specific state
|
||||
* @param merchant_pub identifies the beneficiary of the deposit
|
||||
* @param coin_pub identifies the coin of the deposit
|
||||
* @param deposit_rowid identifies the deposit row to modify
|
||||
* @return query result status
|
||||
*/
|
||||
enum GNUNET_DB_QueryStatus
|
||||
(*mark_deposit_done)(void *cls,
|
||||
const struct TALER_MerchantPublicKeyP *merchant_pub,
|
||||
const struct TALER_CoinSpendPublicKeyP *coin_pub,
|
||||
uint64_t rowid);
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user