towards removing tiny bit

This commit is contained in:
Christian Grothoff 2022-03-27 10:32:28 +02:00
parent 646c9ad061
commit d0a69da895
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
5 changed files with 582 additions and 107 deletions

View File

@ -82,9 +82,9 @@ DROP TABLE IF EXISTS denominations CASCADE;
DROP TABLE IF EXISTS cs_nonce_locks CASCADE;
DROP FUNCTION IF EXISTS add_constraints_to_cs_nonce_locks_partition;
DROP TABLE IF EXISTS deposits_by_coin CASCADE;
DROP TABLE IF EXISTS global_fee CASCADE;
DROP TABLE IF EXISTS recoup_by_reserve CASCADE;
DROP TABLE IF EXISTS aggregation_transient CASCADE;
DROP TABLE IF EXISTS partners CASCADE;

View File

@ -772,7 +772,7 @@ CREATE TABLE IF NOT EXISTS deposits_by_ready_default
CREATE TABLE IF NOT EXISTS deposits_for_matching
(refund_deadline INT8 NOT NULL
,shard INT8 NOT NULL
,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)
,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32) REFERENCES known_coins (coin_pub) ON DELETE CASCADE
,deposit_serial_id INT8
)
@ -782,7 +782,7 @@ COMMENT ON TABLE deposits_for_matching
CREATE INDEX IF NOT EXISTS deposits_for_matching_main_index
ON deposits_for_matching
(refund_deadline ASC, shard, coin_pub);
(refund_deadline ASC, merchant_pub, coin_pub);
CREATE TABLE IF NOT EXISTS deposits_for_matching_default
PARTITION OF deposits_for_matching
@ -818,12 +818,12 @@ BEGIN
THEN
INSERT INTO deposits_for_matching
(refund_deadline
,shard
,merchant_pub
,coin_pub
,deposit_serial_id)
VALUES
(NEW.refund_deadline
,NEW.shard
,NEW.merchant_pub
,NEW.coin_pub
,NEW.deposit_serial_id);
END IF;
@ -866,7 +866,7 @@ BEGIN
THEN
DELETE FROM deposits_for_matching
WHERE refund_deadline = OLD.refund_deadline
AND shard = OLD.shard
AND merchant_pub = OLD.merchant_pub
AND coin_pub = OLD.coin_pub
AND deposit_serial_id = OLD.deposit_serial_id;
END IF;
@ -887,12 +887,12 @@ BEGIN
THEN
INSERT INTO deposits_for_matching
(refund_deadline
,shard
,merchant_pub
,coin_pub
,deposit_serial_id)
VALUES
(NEW.refund_deadline
,NEW.shard
,NEW.merchant_pub
,NEW.coin_pub
,NEW.deposit_serial_id);
END IF;
@ -930,7 +930,7 @@ BEGIN
THEN
DELETE FROM deposits_for_matching
WHERE refund_deadline = OLD.refund_deadline
AND shard = OLD.shard
AND merchant_pub = OLD.merchant_pub
AND coin_pub = OLD.coin_pub
AND deposit_serial_id = OLD.deposit_serial_id;
END IF;
@ -1040,21 +1040,64 @@ $$;
SELECT add_constraints_to_wire_out_partition('default');
CREATE OR REPLACE FUNCTION wire_out_delete_trigger()
RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
DELETE FROM aggregation_tracking
WHERE wtid_raw = OLD.wtid_raw;
RETURN OLD;
END $$;
COMMENT ON FUNCTION wire_out_delete_trigger()
IS 'Replicate reserve_out deletions into aggregation_tracking. This replaces an earlier use of an ON DELETE CASCADE that required a DEFERRABLE constraint and conflicted with nice partitioning.';
CREATE TRIGGER wire_out_on_delete
AFTER DELETE
ON wire_out
FOR EACH ROW EXECUTE FUNCTION wire_out_delete_trigger();
-- ------------------------------ aggregation_transient ----------------------------------------
-- Note: this table is not yet used; it is designed
-- to allow us to get rid of the 'tiny BOOL' and
-- the associated need to look at tiny
-- deposits repeatedly.
CREATE TABLE IF NOT EXISTS aggregation_transient
(amount_val INT8 NOT NULL
,amount_frac INT4 NOT NULL
,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)
,exchange_account_section TEXT NOT NULL
,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)
)
PARTITION BY HASH (wire_target_h_payto);
COMMENT ON TABLE aggregation_transient
IS 'aggregations currently happening (lacking wire_out, usually because the amount is too low); this table is not replicated';
COMMENT ON COLUMN aggregation_transient.amount_val
IS 'Sum of all of the aggregated deposits (without deposit fees)';
COMMENT ON COLUMN aggregation_transient.wtid_raw
IS 'identifier of the wire transfer';
CREATE TABLE IF NOT EXISTS aggregation_transient_default
PARTITION OF aggregation_transient
FOR VALUES WITH (MODULUS 1, REMAINDER 0);
-- ------------------------------ aggregation_tracking ----------------------------------------
-- FIXME-URGENT: add colum coin_pub to select by coin_pub + deposit_serial_id for more efficient deposit lookup!?
-- Or which direction(s) is this table used? Is the partitioning sane??
CREATE TABLE IF NOT EXISTS aggregation_tracking
(aggregation_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY -- UNIQUE
,deposit_serial_id INT8 PRIMARY KEY -- REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE
,wtid_raw BYTEA NOT NULL CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE
,wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=32)
)
PARTITION BY HASH (deposit_serial_id);
COMMENT ON TABLE aggregation_tracking
IS 'mapping from wire transfer identifiers (WTID) to deposits (and back)';
COMMENT ON COLUMN aggregation_tracking.wtid_raw
IS 'We first create entries in the aggregation_tracking table and then finally the wire_out entry once we know the total amount. Hence the constraint must be deferrable and we cannot use a wireout_uuid here, because we do not have it when these rows are created. Changing the logic to first INSERT a dummy row into wire_out and then UPDATEing that row in the same transaction would theoretically reduce per-deposit storage costs by 5 percent (24/~460 bytes).';
IS 'identifier of the wire transfer';
CREATE TABLE IF NOT EXISTS aggregation_tracking_default
PARTITION OF aggregation_tracking
@ -1070,7 +1113,7 @@ BEGIN
EXECUTE FORMAT (
'ALTER TABLE aggregation_tracking_' || partition_suffix || ' '
'ADD CONSTRAINT aggregation_tracking_' || partition_suffix || '_aggregation_serial_id_key '
'UNIQUE (aggregation_serial_id) '
'UNIQUE (aggregation_serial_id);'
);
END
$$;

View File

@ -1188,7 +1188,7 @@ prepare_statements (struct PostgresClosure *pg)
" ,dbr.shard ASC"
" LIMIT 1;",
4),
/* Used in #postgres_iterate_matching_deposits() */
/* FIXME: deprecated; Used in #postgres_iterate_matching_deposits() */
GNUNET_PQ_make_prepare (
"deposits_iterate_matching",
"SELECT"
@ -1207,14 +1207,115 @@ prepare_statements (struct PostgresClosure *pg)
" JOIN denominations denom"
" USING (denominations_serial)"
" WHERE dfm.refund_deadline<$3"
" AND dfm.shard=$4"
" AND dfm.merchant_pub=$1"
" AND dep.merchant_pub=$1"
" AND dep.wire_target_h_payto=$2"
" LIMIT "
TALER_QUOTE (
TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) ";",
3),
/* Used in #postgres_aggregate() */
GNUNET_PQ_make_prepare (
"aggregate",
"WITH rdy AS (" /* find deposits ready */
" SELECT"
" coin_pub"
" FROM deposits_for_matching"
" WHERE refund_deadline<$1"
" AND merchant_pub=$2"
" ORDER BY refund_deadline ASC" /* ordering is not critical */
" LIMIT "
TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
" )"
" ,dep AS (" /* restrict to our merchant and account */
" UPDATE deposits"
" SET done=TRUE"
" WHERE coin_pub IN (SELECT coin_pub FROM rdy)"
" AND merchant_pub=$2"
" AND wire_target_h_payto=$3"
" RETURNING"
" deposit_serial_id"
" ,coin_pub"
" ,amount_with_fee_val AS amount_val"
" ,amount_with_fee_frac AS amount_frac)"
" ,ref AS (" /* find applicable refunds */
" SELECT"
" amount_with_fee_val AS refund_val"
" ,amount_with_fee_frac AS refund_frac"
" ,coin_pub"
" FROM refunds"
" WHERE coin_pub IN (SELECT coin_pub FROM dep)"
" AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))"
" ,fees AS (" /* find deposit fees for non-refunded deposits */
" SELECT"
" denom.fee_deposit_val AS fee_val"
" ,denom.fee_deposit_frac AS fee_frac"
" FROM known_coins kc"
" JOIN denominations denom"
" USING (denominations_serial)"
" WHERE coin_pub IN (SELECT coin_pub FROM dep)"
" AND coin_pub NOT IN (SELECT coin_pub FROM ref))"
" ,dummy AS (" /* add deposits to aggregation_tracking */
" INSERT INTO aggregation_tracking"
" (deposit_serial_id"
" ,wtid_raw)"
" SELECT deposit_serial_id,$4"
" FROM dep)"
"SELECT" /* calculate totals (deposits, refunds and fees) */
" CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value"
" ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction"
" ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value"
" ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction"
" ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value"
" ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction"
" FROM dep "
" FULL OUTER JOIN ref ON (FALSE)"
" FULL OUTER JOIN fees ON (FALSE);",
4),
/* Used in #postgres_mark_deposit_tiny() */
/* Used in #postgres_create_aggregation_transient() */
GNUNET_PQ_make_prepare (
"create_aggregation_transient",
"INSERT INTO aggregation_transient"
" (amount_val"
" ,amount_frac"
" ,wire_target_h_payto"
" ,exchange_account_section"
" ,wtid_raw)"
" VALUES ($1, $2, $3, $4, $5);",
5),
/* Used in #postgres_select_aggregation_transient() */
GNUNET_PQ_make_prepare (
"select_aggregation_transient",
"SELECT"
" amount_val"
" ,amount_frac"
" ,wtid_raw"
" FROM aggregation_transient"
" WHERE wire_target_h_payto=$1"
" AND exchange_account_section=$2;",
2),
/* Used in #postgres_update_aggregation_transient() */
GNUNET_PQ_make_prepare (
"update_aggregation_transient",
"UPDATE aggregation_transient"
" SET amount_val=$1"
" ,amount_frac=$2"
" WHERE wire_target_h_payto=$3"
" AND wtid_raw=$4",
4),
/* Used in #postgres_delete_aggregation_transient() */
GNUNET_PQ_make_prepare (
"delete_aggregation_transient",
"DELETE FROM aggregation_transient"
" WHERE wire_target_h_payto=$1"
" AND wtid_raw=$2",
2),
/* FIXME-deprecated: Used in #postgres_mark_deposit_tiny() */
GNUNET_PQ_make_prepare (
"mark_deposit_tiny",
"UPDATE deposits"
@ -1222,7 +1323,7 @@ prepare_statements (struct PostgresClosure *pg)
" WHERE coin_pub=$1"
" AND deposit_serial_id=$2",
2),
/* Used in #postgres_mark_deposit_done() */
/* FIXME-deprecated: Used in #postgres_mark_deposit_done() */
GNUNET_PQ_make_prepare (
"mark_deposit_done",
"UPDATE deposits"
@ -1230,6 +1331,7 @@ prepare_statements (struct PostgresClosure *pg)
" WHERE coin_pub=$1"
" AND deposit_serial_id=$2;",
2),
/* Used in #postgres_get_coin_transactions() to obtain information
about how a coin has been spend with /deposit requests. */
GNUNET_PQ_make_prepare (
@ -2835,8 +2937,8 @@ prepare_statements (struct PostgresClosure *pg)
GNUNET_PQ_make_prepare (
"insert_into_table_refunds",
"INSERT INTO refunds"
"(coin_pub"
",refund_serial_id"
"(refund_serial_id"
",coin_pub"
",merchant_sig"
",rtransaction_id"
",amount_with_fee_val"
@ -5868,6 +5970,240 @@ postgres_have_deposit2 (
}
/**
* Aggregate all matching deposits for @a h_payto and
* @a merchant_pub, returning the total amounts.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param merchant_pub public key of the merchant
* @param wtid wire transfer ID to set for the aggregate
* @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds
* @return transaction status
*/
static enum GNUNET_DB_QueryStatus
postgres_aggregate (
void *cls,
const struct TALER_PaytoHashP *h_payto,
const struct TALER_MerchantPublicKeyP *merchant_pub,
const struct TALER_WireTransferIdentifierRawP *wtid,
struct TALER_Amount *total)
{
struct PostgresClosure *pg = cls;
struct GNUNET_TIME_Absolute now = {0};
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_auto_from_type (merchant_pub),
GNUNET_PQ_query_param_auto_from_type (h_payto),
GNUNET_PQ_query_param_auto_from_type (wtid),
GNUNET_PQ_query_param_end
};
uint64_t sum_deposit_value;
uint64_t sum_deposit_frac;
uint64_t sum_refund_value;
uint64_t sum_refund_frac;
uint64_t sum_fee_value;
uint64_t sum_fee_frac;
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
&sum_deposit_value),
GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
&sum_deposit_frac),
GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
&sum_refund_value),
GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
&sum_refund_frac),
GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
&sum_fee_value),
GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
&sum_fee_frac),
GNUNET_PQ_result_spec_end
};
enum GNUNET_DB_QueryStatus qs;
struct TALER_Amount sum_deposit;
struct TALER_Amount sum_refund;
struct TALER_Amount sum_fee;
struct TALER_Amount delta;
now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
pg->aggregator_shift);
qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
"aggregate",
params,
rs);
if (qs < 0)
{
GNUNET_assert (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs;
}
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
{
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (pg->currency,
total));
return qs;
}
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (pg->currency,
&sum_deposit));
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (pg->currency,
&sum_refund));
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (pg->currency,
&sum_fee));
sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
+ sum_deposit_value;
sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE
+ sum_refund_value;
sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE;
sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE
+ sum_fee_value;
sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \
GNUNET_assert (0 <=
TALER_amount_subtract (&delta,
&sum_deposit,
&sum_refund));
GNUNET_assert (0 <=
TALER_amount_subtract (total,
&delta,
&sum_fee));
return qs;
}
/**
* Create a new entry in the transient aggregation table.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param exchange_account_section exchange account to use
* @param wtid the raw wire transfer identifier to be used
* @param total amount to be wired in the future
* @return transaction status
*/
static enum GNUNET_DB_QueryStatus
postgres_create_aggregation_transient (
void *cls,
const struct TALER_PaytoHashP *h_payto,
const char *exchange_account_section,
const struct TALER_WireTransferIdentifierRawP *wtid,
const struct TALER_Amount *total)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_QueryParam params[] = {
TALER_PQ_query_param_amount (total),
GNUNET_PQ_query_param_auto_from_type (h_payto),
GNUNET_PQ_query_param_string (exchange_account_section),
GNUNET_PQ_query_param_auto_from_type (wtid),
GNUNET_PQ_query_param_end
};
return GNUNET_PQ_eval_prepared_non_select (pg->conn,
"create_aggregation_transient",
params);
}
/**
* Find existing entry in the transient aggregation table.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param exchange_account_section exchange account to use
* @param[out] wtid set to the raw wire transfer identifier to be used
* @param[out] total existing amount to be wired in the future
* @return transaction status
*/
static enum GNUNET_DB_QueryStatus
postgres_select_aggregation_transient (
void *cls,
const struct TALER_PaytoHashP *h_payto,
const char *exchange_account_section,
struct TALER_WireTransferIdentifierRawP *wtid,
struct TALER_Amount *total)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (h_payto),
GNUNET_PQ_query_param_string (exchange_account_section),
GNUNET_PQ_query_param_end
};
struct GNUNET_PQ_ResultSpec rs[] = {
TALER_PQ_RESULT_SPEC_AMOUNT ("amount",
total),
GNUNET_PQ_result_spec_auto_from_type ("wtid_raw",
wtid),
GNUNET_PQ_result_spec_end
};
return GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
"select_aggregation_transient",
params,
rs);
}
/**
* Update existing entry in the transient aggregation table.
* @a h_payto is only needed for query performance.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param wtid the raw wire transfer identifier to update
* @param total new total amount to be wired in the future
* @return transaction status
*/
static enum GNUNET_DB_QueryStatus
postgres_update_aggregation_transient (
void *cls,
const struct TALER_PaytoHashP *h_payto,
const struct TALER_WireTransferIdentifierRawP *wtid,
const struct TALER_Amount *total)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_QueryParam params[] = {
TALER_PQ_query_param_amount (total),
GNUNET_PQ_query_param_auto_from_type (h_payto),
GNUNET_PQ_query_param_auto_from_type (wtid),
GNUNET_PQ_query_param_end
};
return GNUNET_PQ_eval_prepared_non_select (pg->conn,
"update_aggregation_transient",
params);
}
/**
* Delete existing entry in the transient aggregation table.
* @a h_payto is only needed for query performance.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param wtid the raw wire transfer identifier to update
* @return transaction status
*/
static enum GNUNET_DB_QueryStatus
postgres_delete_aggregation_transient (
void *cls,
const struct TALER_PaytoHashP *h_payto,
const struct TALER_WireTransferIdentifierRawP *wtid)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (h_payto),
GNUNET_PQ_query_param_auto_from_type (wtid),
GNUNET_PQ_query_param_end
};
return GNUNET_PQ_eval_prepared_non_select (pg->conn,
"delete_aggregation_transient",
params);
}
/**
* Mark a deposit as tiny, thereby declaring that it cannot be
* executed by itself and should no longer be returned by
@ -6147,12 +6483,10 @@ postgres_iterate_matching_deposits (
{
struct PostgresClosure *pg = cls;
struct GNUNET_TIME_Absolute now = {0};
uint64_t shard = compute_shard (merchant_pub);
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (merchant_pub),
GNUNET_PQ_query_param_auto_from_type (h_payto),
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_uint64 (&shard),
GNUNET_PQ_query_param_end
};
struct MatchingDepositContext mdc = {
@ -9299,7 +9633,6 @@ refunds_serial_helper_cb (void *cls,
struct RefundsSerialContext *rsc = cls;
struct PostgresClosure *pg = rsc->pg;
fprintf (stderr, "Got %u results\n", num_results);
for (unsigned int i = 0; i<num_results; i++)
{
struct TALER_EXCHANGEDB_Refund refund;
@ -13081,6 +13414,15 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->get_known_coin = &postgres_get_known_coin;
plugin->get_coin_denomination = &postgres_get_coin_denomination;
plugin->have_deposit2 = &postgres_have_deposit2;
plugin->aggregate = &postgres_aggregate;
plugin->create_aggregation_transient
= &postgres_create_aggregation_transient;
plugin->select_aggregation_transient
= &postgres_select_aggregation_transient;
plugin->update_aggregation_transient
= &postgres_update_aggregation_transient;
plugin->delete_aggregation_transient
= &postgres_delete_aggregation_transient;
plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny;
plugin->mark_deposit_done = &postgres_mark_deposit_done;
plugin->get_ready_deposit = &postgres_get_ready_deposit;

View File

@ -674,48 +674,6 @@ deposit_cb (void *cls,
}
/**
* Function called with details about deposits that
* have been made. Called in the test on the
* deposit given in @a cls.
*
* @param cls closure a `struct TALER_EXCHANGEDB_Deposit *`
* @param rowid unique ID for the deposit in our DB, used for marking
* it as 'tiny' or 'done'
* @param coin_pub public key of the coin
* @param amount_with_fee amount that was deposited including fee
* @param deposit_fee amount the exchange gets to keep as transaction fees
* @param h_contract_terms hash of the proposal data known to merchant and customer
* @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
*/
static enum GNUNET_DB_QueryStatus
matching_deposit_cb (void *cls,
uint64_t rowid,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *amount_with_fee,
const struct TALER_Amount *deposit_fee,
const struct TALER_PrivateContractHashP *h_contract_terms)
{
struct TALER_EXCHANGEDB_Deposit *deposit = cls;
deposit_rowid = rowid;
if ( (0 != TALER_amount_cmp (amount_with_fee,
&deposit->amount_with_fee)) ||
(0 != TALER_amount_cmp (deposit_fee,
&deposit->deposit_fee)) ||
(0 != GNUNET_memcmp (h_contract_terms,
&deposit->h_contract_terms)) ||
(0 != GNUNET_memcmp (coin_pub,
&deposit->coin.coin_pub)) )
{
GNUNET_break (0);
return GNUNET_DB_STATUS_HARD_ERROR;
}
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
/**
* Callback for #select_deposits_above_serial_id ()
*
@ -1055,7 +1013,7 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit *deposit)
&h_payto);
auditor_row_cnt = 0;
memset (&wire_out_wtid,
42,
41,
sizeof (wire_out_wtid));
wire_out_date = GNUNET_TIME_timestamp_get ();
GNUNET_assert (GNUNET_OK ==
@ -1109,14 +1067,6 @@ test_wire_out (const struct TALER_EXCHANGEDB_Deposit *deposit)
&coin_fee2,
&kyc));
}
/* insert WT data */
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->insert_aggregation_tracking (plugin->cls,
&wire_out_wtid,
deposit_rowid));
/* Now let's fix the transient constraint violation by
putting in the WTID into the wire_out table */
{
struct TALER_ReservePublicKeyP rpub;
struct TALER_EXCHANGEDB_KycStatus kyc;
@ -2270,44 +2220,92 @@ run (void *cls)
&deposit_cb,
&deposit));
FAILIF (8 == result);
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->iterate_matching_deposits (plugin->cls,
&wire_target_h_payto,
&deposit.merchant_pub,
&matching_deposit_cb,
&deposit,
2));
{
struct TALER_Amount total;
struct TALER_WireTransferIdentifierRawP wtid;
memset (&wtid,
41,
sizeof (wtid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->aggregate (plugin->cls,
&wire_target_h_payto,
&deposit.merchant_pub,
&wtid,
&total));
}
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->commit (plugin->cls));
FAILIF (GNUNET_OK !=
plugin->start (plugin->cls,
"test-2"));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->mark_deposit_tiny (plugin->cls,
&deposit.coin.coin_pub,
deposit_rowid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->get_ready_deposit (plugin->cls,
0,
INT32_MAX,
true,
&deposit_cb,
&deposit));
plugin->rollback (plugin->cls);
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->get_ready_deposit (plugin->cls,
0,
INT32_MAX,
true,
&deposit_cb,
&deposit));
FAILIF (GNUNET_OK !=
plugin->start (plugin->cls,
"test-3"));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->mark_deposit_done (plugin->cls,
&deposit.coin.coin_pub,
deposit_rowid));
{
struct TALER_WireTransferIdentifierRawP wtid;
struct TALER_Amount total;
struct TALER_WireTransferIdentifierRawP wtid2;
struct TALER_Amount total2;
memset (&wtid,
42,
sizeof (wtid));
GNUNET_assert (GNUNET_OK ==
TALER_string_to_amount (CURRENCY ":42",
&total));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->select_aggregation_transient (plugin->cls,
&wire_target_h_payto,
"x-bank",
&wtid2,
&total2));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->create_aggregation_transient (plugin->cls,
&wire_target_h_payto,
"x-bank",
&wtid,
&total));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->select_aggregation_transient (plugin->cls,
&wire_target_h_payto,
"x-bank",
&wtid2,
&total2));
FAILIF (0 !=
GNUNET_memcmp (&wtid2,
&wtid));
FAILIF (0 !=
TALER_amount_cmp (&total2,
&total));
GNUNET_assert (GNUNET_OK ==
TALER_string_to_amount (CURRENCY ":43",
&total));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->update_aggregation_transient (plugin->cls,
&wire_target_h_payto,
&wtid,
&total));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->select_aggregation_transient (plugin->cls,
&wire_target_h_payto,
"x-bank",
&wtid2,
&total2));
FAILIF (0 !=
GNUNET_memcmp (&wtid2,
&wtid));
FAILIF (0 !=
TALER_amount_cmp (&total2,
&total));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->delete_aggregation_transient (plugin->cls,
&wire_target_h_payto,
&wtid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->select_aggregation_transient (plugin->cls,
&wire_target_h_payto,
"x-bank",
&wtid2,
&total2));
}
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->commit (plugin->cls));

View File

@ -3145,6 +3145,98 @@ struct TALER_EXCHANGEDB_Plugin
uint32_t limit);
/**
* Aggregate all matching deposits for @a h_payto and
* @a merchant_pub, returning the total amounts.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param merchant_pub public key of the merchant
* @param wtid wire transfer ID to set for the aggregate
* @param[out] total set to the sum of the total deposits minus applicable deposit fees and refunds
* @return transaction status
*/
enum GNUNET_DB_QueryStatus
(*aggregate)(
void *cls,
const struct TALER_PaytoHashP *h_payto,
const struct TALER_MerchantPublicKeyP *merchant_pub,
const struct TALER_WireTransferIdentifierRawP *wtid,
struct TALER_Amount *total);
/**
* Create a new entry in the transient aggregation table.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param exchange_account_section exchange account to use
* @param wtid the raw wire transfer identifier to be used
* @param total amount to be wired in the future
* @return transaction status
*/
enum GNUNET_DB_QueryStatus
(*create_aggregation_transient)(
void *cls,
const struct TALER_PaytoHashP *h_payto,
const char *exchange_account_section,
const struct TALER_WireTransferIdentifierRawP *wtid,
const struct TALER_Amount *total);
/**
* Find existing entry in the transient aggregation table.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param exchange_account_section exchange account to use
* @param[out] wtid set to the raw wire transfer identifier to be used
* @param[out] total existing amount to be wired in the future
* @return transaction status
*/
enum GNUNET_DB_QueryStatus
(*select_aggregation_transient)(
void *cls,
const struct TALER_PaytoHashP *h_payto,
const char *exchange_account_section,
struct TALER_WireTransferIdentifierRawP *wtid,
struct TALER_Amount *total);
/**
* Update existing entry in the transient aggregation table.
* @a h_payto is only needed for query performance.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param wtid the raw wire transfer identifier to update
* @param total new total amount to be wired in the future
* @return transaction status
*/
enum GNUNET_DB_QueryStatus
(*update_aggregation_transient)(
void *cls,
const struct TALER_PaytoHashP *h_payto,
const struct TALER_WireTransferIdentifierRawP *wtid,
const struct TALER_Amount *total);
/**
* Delete existing entry in the transient aggregation table.
* @a h_payto is only needed for query performance.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param h_payto destination of the wire transfer
* @param wtid the raw wire transfer identifier to update
* @return transaction status
*/
enum GNUNET_DB_QueryStatus
(*delete_aggregation_transient)(
void *cls,
const struct TALER_PaytoHashP *h_payto,
const struct TALER_WireTransferIdentifierRawP *wtid);
/**
* Lookup melt commitment data under the given @a rc.
*