diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index 4d892efef..59aeb3212 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -239,6 +239,7 @@ libtaler_plugin_exchangedb_postgres_la_SOURCES = \ pg_select_purse_deposits_above_serial_id.h pg_select_purse_deposits_above_serial_id.c \ pg_select_account_merges_above_serial_id.h pg_select_account_merges_above_serial_id.c \ pg_select_all_purse_decisions_above_serial_id.h pg_select_all_purse_decisions_above_serial_id.c \ + pg_batch_reserves_in_insert.h pg_batch_reserves_in_insert.c \ pg_select_reserve_open_above_serial_id.c pg_select_reserve_open_above_serial_id.h libtaler_plugin_exchangedb_postgres_la_LIBADD = \ $(LTLIBINTL) @@ -275,11 +276,13 @@ libtalerexchangedb_la_LDFLAGS = \ check_PROGRAMS = \ test-exchangedb-postgres \ bench-db-postgres\ + perf-exchangedb-reserves-in-insert-postgres\ test-exchangedb-by-j-postgres AM_TESTS_ENVIRONMENT=export TALER_PREFIX=$${TALER_PREFIX:-@libdir@};export PATH=$${TALER_PREFIX:-@prefix@}/bin:$$PATH; TESTS = \ test-exchangedb-postgres\ - test-exchangedb-by-j-postgres + test-exchangedb-by-j-postgres\ + perf-exchangedb-reserves-in-insert-postgres test_exchangedb_postgres_SOURCES = \ test_exchangedb.c test_exchangedb_postgres_LDADD = \ @@ -304,6 +307,19 @@ test_exchangedb_by_j_postgres_LDADD = \ -lgnunetutil \ $(XLIB) + +perf_exchangedb_reserves_in_insert_postgres_SOURCES = \ + perf_exchangedb_reserves_in_insert.c +perf_exchangedb_reserves_in_insert_postgres_LDADD = \ + libtalerexchangedb.la \ + $(top_builddir)/src/json/libtalerjson.la \ + $(top_builddir)/src/util/libtalerutil.la \ + $(top_builddir)/src/pq/libtalerpq.la \ + -ljansson \ + -lgnunetjson \ + -lgnunetutil \ + $(XLIB) + bench_db_postgres_SOURCES = \ bench_db.c bench_db_postgres_LDADD = \ diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c b/src/exchangedb/pg_batch_reserves_in_insert.c new file mode 100644 index 000000000..d7ce47dc2 --- /dev/null +++ b/src/exchangedb/pg_batch_reserves_in_insert.c @@ -0,0 +1,284 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ +/** + * @file exchangedb/pg_bash_reserves_in_insert.c + * @brief Implementation of the reserves_in_insert function for Postgres + * @author JOSEPHxu + */ +#include "platform.h" +#include "taler_error_codes.h" +#include "taler_dbevents.h" +#include "taler_pq_lib.h" +#include "pg_batch_reserves_in_insert.h" +#include "pg_helper.h" +#include "pg_start.h" +#include "pg_start_read_committed.h" +#include "pg_commit.h" +#include "pg_reserves_get.h" +#include "pg_reserves_update.h" +#include "pg_setup_wire_target.h" +#include "pg_event_notify.h" + + +/** + * Generate event notification for the reserve + * change. + * + * @param pg plugin state + * @param reserve_pub reserve to notfiy on + */ +static void +notify_on_reserve (struct PostgresClosure *pg, + const struct TALER_ReservePublicKeyP *reserve_pub) +{ + struct TALER_ReserveEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING), + .reserve_pub = *reserve_pub + }; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Notifying on reserve!\n"); + TEH_PG_event_notify (pg, + &rep.header, + NULL, + 0); +} + + +enum GNUNET_DB_QueryStatus +TEH_PG_batch_reserves_in_insert (void *cls, + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, + enum GNUNET_DB_QueryStatus *results) +{ + struct PostgresClosure *pg = cls; + enum GNUNET_DB_QueryStatus qs1; + struct TALER_EXCHANGEDB_Reserve reserve; + struct GNUNET_TIME_Timestamp expiry; + struct GNUNET_TIME_Timestamp gc; + uint64_t reserve_uuid; + + reserve.pub = reserves->reserve_pub; + expiry = GNUNET_TIME_absolute_to_timestamp ( + GNUNET_TIME_absolute_add (reserves->execution_time.abs_time, + pg->idle_reserve_expiration_time)); + gc = GNUNET_TIME_absolute_to_timestamp ( + GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), + pg->legal_reserve_expiration_time)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating reserve %s with expiration in %s\n", + TALER_B2S (&(reserves->reserve_pub)), + GNUNET_STRINGS_relative_time_to_string ( + pg->idle_reserve_expiration_time, + GNUNET_NO)); + /* Optimistically assume this is a new reserve, create balance for the first + time; we do this before adding the actual transaction to "reserves_in", + as for a new reserve it can't be a duplicate 'add' operation, and as + the 'add' operation needs the reserve entry as a foreign key. */ + { + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (&(reserves->reserve_pub)), + TALER_PQ_query_param_amount (&(reserves->balance)), + GNUNET_PQ_query_param_timestamp (&expiry), + GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("reserve_uuid", + &reserve_uuid), + GNUNET_PQ_result_spec_end + }; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reserve does not exist; creating a new one\n"); + /* Note: query uses 'on conflict do nothing' */ + PREPARE (pg, + "reserve_create", + "SELECT bash_reserves_in('34', '20','//asdddfs3', '60', '20'),bash_reserves_in('24', '10','//dfs3', '40', '50'),bash_reserves_in('42', '40','//d43', '40', '50'),bash_reserves_in('44', '10','//ghs3', '40', '50') AS existed from reserves;"); + + qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "reserve_create", + params, + rs); + if (qs1 < 0) + return qs1; + } + + /* Create new incoming transaction, "ON CONFLICT DO NOTHING" + is again used to guard against duplicates. */ + { + enum GNUNET_DB_QueryStatus qs2; + enum GNUNET_DB_QueryStatus qs3; + struct TALER_PaytoHashP h_payto; + + qs3 = TEH_PG_setup_wire_target (pg, + reserves->sender_account_details, + &h_payto); + if (qs3 < 0) + return qs3; + /* We do not have the UUID, so insert by public key */ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (&reserve.pub), + GNUNET_PQ_query_param_uint64 (&(reserves->wire_reference)), + TALER_PQ_query_param_amount (&(reserves->balance)), + GNUNET_PQ_query_param_string (reserves->exchange_account_name), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_timestamp (&reserves->execution_time), + GNUNET_PQ_query_param_end + }; + + PREPARE (pg, + "reserves_in_add_transaction", + "INSERT INTO reserves_in " + "(reserve_pub" + ",wire_reference" + ",credit_val" + ",credit_frac" + ",exchange_account_section" + ",wire_source_h_payto" + ",execution_date" + ") VALUES ($1, $2, $3, $4, $5, $6, $7)" + " ON CONFLICT DO NOTHING;"); + qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn, + "reserves_in_add_transaction", + params); + /* qs2 could be 0 as statement used 'ON CONFLICT DO NOTHING' */ + if (0 >= qs2) + { + if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) && + (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) ) + { + /* Conflict for the transaction, but the reserve was + just now created, that should be impossible. */ + GNUNET_break (0); /* should be impossible: reserve was fresh, + but transaction already known */ + return GNUNET_DB_STATUS_HARD_ERROR; + } + /* Transaction was already known or error. We are finished. */ + return qs2; + } + } + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs1) + { + /* New reserve, we are finished */ + notify_on_reserve (pg, + &(reserves->reserve_pub)); + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + + /* we were wrong with our optimistic assumption: + reserve did already exist, need to do an update instead */ + { + /* We need to move away from 'read committed' to serializable. + Also, we know that it should be safe to commit at this point. + (We are only run in a larger transaction for performance.) */ + enum GNUNET_DB_QueryStatus cs; + + cs = TEH_PG_commit(pg); + if (cs < 0) + return cs; + if (GNUNET_OK != + TEH_PG_start (pg, + "reserve-update-serializable")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + { + enum GNUNET_DB_QueryStatus reserve_exists; + + reserve_exists = TEH_PG_reserves_get (pg, + &reserve); + switch (reserve_exists) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + return reserve_exists; + case GNUNET_DB_STATUS_SOFT_ERROR: + return reserve_exists; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* First we got a conflict, but then we cannot select? Very strange. */ + GNUNET_break (0); + return GNUNET_DB_STATUS_SOFT_ERROR; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below */ + break; + } + } + + { + struct TALER_EXCHANGEDB_Reserve updated_reserve; + enum GNUNET_DB_QueryStatus qs3; + + /* If the reserve already existed, we need to still update the + balance; we do this after checking for duplication, as + otherwise we might have to actually pay the cost to roll this + back for duplicate transactions; like this, we should virtually + never actually have to rollback anything. */ + updated_reserve.pub = reserve.pub; + if (0 > + TALER_amount_add (&updated_reserve.balance, + &reserve.balance, + &reserves->balance)) + { + /* currency overflow or incompatible currency */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Attempt to deposit incompatible amount into reserve\n"); + return GNUNET_DB_STATUS_HARD_ERROR; + } + updated_reserve.expiry = GNUNET_TIME_timestamp_max (expiry, + reserve.expiry); + updated_reserve.gc = GNUNET_TIME_timestamp_max (gc, + reserve.gc); + qs3 = TEH_PG_reserves_update (pg, + &updated_reserve); + switch (qs3) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + return qs3; + case GNUNET_DB_STATUS_SOFT_ERROR: + return qs3; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* How can the UPDATE not work here? Very strange. */ + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below */ + break; + } + } + notify_on_reserve (pg, + &reserves->reserve_pub); + /* Go back to original transaction mode */ + { + enum GNUNET_DB_QueryStatus cs; + + cs = TEH_PG_commit (pg); + if (cs < 0) + return cs; + if (GNUNET_OK != + TEH_PG_start_read_committed (pg, + "reserve-insert-continued")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; +} diff --git a/src/exchangedb/pg_batch_reserves_in_insert.h b/src/exchangedb/pg_batch_reserves_in_insert.h new file mode 100644 index 000000000..9422096db --- /dev/null +++ b/src/exchangedb/pg_batch_reserves_in_insert.h @@ -0,0 +1,34 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ +/** + * @file exchangedb/pg_batch_reserves_in_insert.h + * @brief implementation of the batch_reserves_in_insert function for Postgres + * @author Christian Grothoff + */ +#ifndef PG_BATCH_RESERVES_IN_INSERT_H +#define PG_BATCH_RESERVES_IN_INSERT_H + +#include "taler_util.h" +#include "taler_json_lib.h" +#include "taler_exchangedb_plugin.h" + + +enum GNUNET_DB_QueryStatus +TEH_PG_batch_reserves_in_insert (void *cls, + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, + enum GNUNET_DB_QueryStatus *results); +#endif diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c index 2fcca241d..428e19231 100644 --- a/src/exchangedb/pg_reserves_in_insert.c +++ b/src/exchangedb/pg_reserves_in_insert.c @@ -31,6 +31,8 @@ #include "pg_reserves_update.h" #include "pg_setup_wire_target.h" #include "pg_event_notify.h" + + /** * Generate event notification for the reserve * change. @@ -38,7 +40,6 @@ * @param pg plugin state * @param reserve_pub reserve to notfiy on */ - static void notify_on_reserve (struct PostgresClosure *pg, const struct TALER_ReservePublicKeyP *reserve_pub) @@ -52,11 +53,12 @@ notify_on_reserve (struct PostgresClosure *pg, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Notifying on reserve!\n"); TEH_PG_event_notify (pg, - &rep.header, - NULL, - 0); + &rep.header, + NULL, + 0); } + enum GNUNET_DB_QueryStatus TEH_PG_reserves_in_insert (void *cls, const struct TALER_ReservePublicKeyP *reserve_pub, @@ -135,8 +137,8 @@ TEH_PG_reserves_in_insert (void *cls, struct TALER_PaytoHashP h_payto; qs3 = TEH_PG_setup_wire_target (pg, - sender_account_details, - &h_payto); + sender_account_details, + &h_payto); if (qs3 < 0) return qs3; /* We do not have the UUID, so insert by public key */ diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 6a2a473c1..114217004 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -207,7 +207,7 @@ #include "pg_reserves_update.h" #include "pg_setup_wire_target.h" #include "pg_compute_shard.h" - +#include "pg_batch_reserves_in_insert.h" /** * Set to 1 to enable Postgres auto_explain module. This will * slow down things a _lot_, but also provide extensive logging @@ -5446,7 +5446,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) = &TEH_PG_select_purse_by_merge_pub; plugin->set_purse_balance = &TEH_PG_set_purse_balance; - + plugin->batch_reserves_in_insert + = &TEH_PG_batch_reserves_in_insert; return plugin; } diff --git a/src/exchangedb/procedures.sql b/src/exchangedb/procedures.sql index a732ef75f..a9d90294e 100644 --- a/src/exchangedb/procedures.sql +++ b/src/exchangedb/procedures.sql @@ -2521,3 +2521,61 @@ BEGIN END $$; COMMIT; + +/*************************************************************/ + + +CREATE OR REPLACE FUNCTION bash_reserves_in( + IN amount_val INT8, + IN amount_frac INT4, + IN rpub BYTEA, + IN now INT8, + IN min_reserve_gc INT8, + OUT reserve_found BOOLEAN, + OUT ruuid INT8) +LANGUAGE plpgsql +AS $$ +DECLARE + existed BOOLEAN; + not_existed BOOLEAN; +BEGIN + SELECT reserves.reserve_uuid into ruuid from reserves + where reserves.reserve_pub = rpub; + IF ruuid IS NOT NULL + THEN + existed = TRUE; + UPDATE reserves + SET (current_balance_val + ,current_balance_frac + ,expiration_date + ,gc_date) = + (amount_val + ,amount_frac + ,now + ,min_reserve_gc) + WHERE + reserve_pub = rpub + RETURNING existed into reserve_found; + END IF; + IF NOT FOUND + THEN + SELECT MAX(reserve_uuid)+1 into ruuid from reserves; + existed = FALSE; + INSERT INTO reserves + (reserve_uuid + ,reserve_pub + ,current_balance_val + ,current_balance_frac + ,expiration_date + ,gc_date) + VALUES + (ruuid + ,rpub + ,amount_val + ,amount_frac + ,now + ,min_reserve_gc) RETURNING existed into reserve_found; + + END IF; + +END $$; diff --git a/src/exchangedb/test_exchangedb_by_j.c b/src/exchangedb/test_exchangedb_by_j.c index b81d82d65..175691e92 100644 --- a/src/exchangedb/test_exchangedb_by_j.c +++ b/src/exchangedb/test_exchangedb_by_j.c @@ -98,46 +98,46 @@ run (void *cls) goto cleanup; } - for (unsigned int i = 0; i< 7; i++) + for (unsigned int i = 0; i< 8; i++) { - static unsigned int batches[] = {1, 1, 0, 2, 4, 16, 64}; + static unsigned int batches[] = {1, 1, 0, 2, 4, 16, 64, 256}; const char *sndr = "payto://x-taler-bank/localhost:8080/1"; struct TALER_Amount value; unsigned int batch_size = batches[i]; struct GNUNET_TIME_Absolute now; struct GNUNET_TIME_Timestamp ts; struct GNUNET_TIME_Relative duration; - struct TALER_ReservePublicKeyP reserve_pub; - + struct TALER_EXCHANGEDB_ReserveInInfo reserves[batch_size]; + enum GNUNET_DB_QueryStatus *results; GNUNET_assert (GNUNET_OK == TALER_string_to_amount (CURRENCY ":1.000010", &value)); now = GNUNET_TIME_absolute_get (); ts = GNUNET_TIME_timestamp_get (); - fprintf (stdout, - "Now: %llu\n", - now.abs_value_us); plugin->start (plugin->cls, "test_by_exchange_j"); for (unsigned int k = 0; kreserves_in_insert (plugin->cls, - &reserve_pub, - &value, - ts, - sndr, - "section", - 4)); + RND_BLK (&reserves[k].reserve_pub); + reserves[k].balance = value; + reserves[k].execution_time = ts; + reserves[k].sender_account_details = sndr; + reserves[k].exchange_account_name = "name"; } + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->batch_reserves_in_insert (plugin->cls, + reserves, + batch_size, + &results)); + + plugin->commit (plugin->cls); duration = GNUNET_TIME_absolute_get_duration (now); fprintf (stdout, "for a batchsize equal to %d it took %s\n", batch_size, GNUNET_STRINGS_relative_time_to_string (duration, - GNUNET_YES) ); + GNUNET_NO) ); } drop: GNUNET_break (GNUNET_OK == @@ -174,7 +174,7 @@ main (int argc, (void) GNUNET_asprintf (&config_filename, "%s.conf", testname); - fprintf (stderr, + fprintf (stdout, "Using config: %s\n", config_filename); cfg = GNUNET_CONFIGURATION_create (); @@ -195,5 +195,4 @@ main (int argc, return result; } - -/* end of test_exchangedb.c */ +/* end of test_exchangedb_by_j.c */ diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index f21301e7e..6f5dedd05 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -2571,6 +2571,17 @@ struct TALER_EXCHANGEDB_KycStatus }; +struct TALER_EXCHANGEDB_ReserveInInfo +{ + struct TALER_ReservePublicKeyP reserve_pub; + struct TALER_Amount balance; + struct GNUNET_TIME_Timestamp execution_time; + const char *sender_account_details; + const char *exchange_account_name; + uint64_t wire_reference; +}; + + /** * Function called on each @a amount that was found to * be relevant for a KYC check. @@ -3457,6 +3468,23 @@ struct TALER_EXCHANGEDB_Plugin uint64_t wire_reference); + /** + * Insert a batch of incoming transaction into reserves. New reserves are + * also created through this function. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param reserves + * @param reserves_length length of the @a reserves array + * @param[out] results array of transaction status codes of length @a reserves_length, + * set to the status of the + */ + enum GNUNET_DB_QueryStatus + (*batch_reserves_in_insert)(void *cls, + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, + enum GNUNET_DB_QueryStatus *results); + + /** * Locate a nonce for use with a particular public key. *