diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index d46b64564..1bd781c48 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -75,6 +75,7 @@ libtaler_plugin_exchangedb_postgres_la_SOURCES = \ pg_insert_reserve_open_deposit.c pg_insert_reserve_open_deposit.h \ pg_iterate_kyc_reference.c pg_iterate_kyc_reference.h \ pg_iterate_reserve_close_info.c pg_iterate_reserve_close_info.h \ + pg_lookup_records_by_table.c pg_lookup_records_by_table.h \ pg_select_reserve_close_info.c pg_select_reserve_close_info.h libtaler_plugin_exchangedb_postgres_la_LIBADD = \ $(LTLIBINTL) diff --git a/src/exchangedb/lrbt_callbacks.c b/src/exchangedb/pg_lookup_records_by_table.c similarity index 89% rename from src/exchangedb/lrbt_callbacks.c rename to src/exchangedb/pg_lookup_records_by_table.c index a8f68f4c2..13d94e747 100644 --- a/src/exchangedb/lrbt_callbacks.c +++ b/src/exchangedb/pg_lookup_records_by_table.c @@ -18,11 +18,43 @@ SPDX-License-Identifier: AGPL3.0-or-later */ /** - * @file exchangedb/lrbt_callbacks.c - * @brief callbacks used by postgres_lookup_records_by_table, to be - * inlined into the plugin + * @file exchangedb/pg_lookup_records_by_table.c + * @brief implementation of lookup_records_by_table * @author Christian Grothoff */ +#include "platform.h" +#include "taler_error_codes.h" +#include "taler_dbevents.h" +#include "taler_pq_lib.h" +#include "pg_lookup_records_by_table.h" +#include "pg_helper.h" + + +/** + * Closure for callbacks used by #postgres_lookup_records_by_table. + */ +struct LookupRecordsByTableContext +{ + /** + * Plugin context. + */ + struct PostgresClosure *pg; + + /** + * Function to call with the results. + */ + TALER_EXCHANGEDB_ReplicationCallback cb; + + /** + * Closure for @a cb. + */ + void *cb_cls; + + /** + * Set to true on errors. + */ + bool error; +}; /** @@ -2174,4 +2206,201 @@ lrbt_cb_table_profit_drains (void *cls, } +enum GNUNET_DB_QueryStatus +TEH_PG_lookup_records_by_table (void *cls, + enum TALER_EXCHANGEDB_ReplicatedTable table, + uint64_t serial, + TALER_EXCHANGEDB_ReplicationCallback cb, + void *cb_cls) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&serial), + GNUNET_PQ_query_param_end + }; + struct LookupRecordsByTableContext ctx = { + .pg = pg, + .cb = cb, + .cb_cls = cb_cls + }; + GNUNET_PQ_PostgresResultHandler rh; + const char *statement; + enum GNUNET_DB_QueryStatus qs; + + switch (table) + { + case TALER_EXCHANGEDB_RT_DENOMINATIONS: + statement = "select_above_serial_by_table_denominations"; + rh = &lrbt_cb_table_denominations; + break; + case TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS: + statement = "select_above_serial_by_table_denomination_revocations"; + rh = &lrbt_cb_table_denomination_revocations; + break; + case TALER_EXCHANGEDB_RT_WIRE_TARGETS: + statement = "select_above_serial_by_table_wire_targets"; + rh = &lrbt_cb_table_wire_targets; + break; + case TALER_EXCHANGEDB_RT_RESERVES: + statement = "select_above_serial_by_table_reserves"; + rh = &lrbt_cb_table_reserves; + break; + case TALER_EXCHANGEDB_RT_RESERVES_IN: + statement = "select_above_serial_by_table_reserves_in"; + rh = &lrbt_cb_table_reserves_in; + break; + case TALER_EXCHANGEDB_RT_RESERVES_CLOSE: + statement = "select_above_serial_by_table_reserves_close"; + rh = &lrbt_cb_table_reserves_close; + break; + case TALER_EXCHANGEDB_RT_RESERVES_OUT: + statement = "select_above_serial_by_table_reserves_out"; + rh = &lrbt_cb_table_reserves_out; + break; + case TALER_EXCHANGEDB_RT_AUDITORS: + statement = "select_above_serial_by_table_auditors"; + rh = &lrbt_cb_table_auditors; + break; + case TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS: + statement = "select_above_serial_by_table_auditor_denom_sigs"; + rh = &lrbt_cb_table_auditor_denom_sigs; + break; + case TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS: + statement = "select_above_serial_by_table_exchange_sign_keys"; + rh = &lrbt_cb_table_exchange_sign_keys; + break; + case TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS: + statement = "select_above_serial_by_table_signkey_revocations"; + rh = &lrbt_cb_table_signkey_revocations; + break; + case TALER_EXCHANGEDB_RT_KNOWN_COINS: + statement = "select_above_serial_by_table_known_coins"; + rh = &lrbt_cb_table_known_coins; + break; + case TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS: + statement = "select_above_serial_by_table_refresh_commitments"; + rh = &lrbt_cb_table_refresh_commitments; + break; + case TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS: + statement = "select_above_serial_by_table_refresh_revealed_coins"; + rh = &lrbt_cb_table_refresh_revealed_coins; + break; + case TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS: + statement = "select_above_serial_by_table_refresh_transfer_keys"; + rh = &lrbt_cb_table_refresh_transfer_keys; + break; + case TALER_EXCHANGEDB_RT_DEPOSITS: + statement = "select_above_serial_by_table_deposits"; + rh = &lrbt_cb_table_deposits; + break; + case TALER_EXCHANGEDB_RT_REFUNDS: + statement = "select_above_serial_by_table_refunds"; + rh = &lrbt_cb_table_refunds; + break; + case TALER_EXCHANGEDB_RT_WIRE_OUT: + statement = "select_above_serial_by_table_wire_out"; + rh = &lrbt_cb_table_wire_out; + break; + case TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING: + statement = "select_above_serial_by_table_aggregation_tracking"; + rh = &lrbt_cb_table_aggregation_tracking; + break; + case TALER_EXCHANGEDB_RT_WIRE_FEE: + statement = "select_above_serial_by_table_wire_fee"; + rh = &lrbt_cb_table_wire_fee; + break; + case TALER_EXCHANGEDB_RT_GLOBAL_FEE: + statement = "select_above_serial_by_table_global_fee"; + rh = &lrbt_cb_table_global_fee; + break; + case TALER_EXCHANGEDB_RT_RECOUP: + statement = "select_above_serial_by_table_recoup"; + rh = &lrbt_cb_table_recoup; + break; + case TALER_EXCHANGEDB_RT_RECOUP_REFRESH: + statement = "select_above_serial_by_table_recoup_refresh"; + rh = &lrbt_cb_table_recoup_refresh; + break; + case TALER_EXCHANGEDB_RT_EXTENSIONS: + statement = "select_above_serial_by_table_extensions"; + rh = &lrbt_cb_table_extensions; + break; + case TALER_EXCHANGEDB_RT_EXTENSION_DETAILS: + statement = "select_above_serial_by_table_extension_details"; + rh = &lrbt_cb_table_extension_details; + break; + case TALER_EXCHANGEDB_RT_PURSE_REQUESTS: + statement = "select_above_serial_by_table_purse_requests"; + rh = &lrbt_cb_table_purse_requests; + break; + case TALER_EXCHANGEDB_RT_PURSE_REFUNDS: + statement = "select_above_serial_by_table_purse_refunds"; + rh = &lrbt_cb_table_purse_refunds; + break; + case TALER_EXCHANGEDB_RT_PURSE_MERGES: + statement = "select_above_serial_by_table_purse_merges"; + rh = &lrbt_cb_table_purse_merges; + break; + case TALER_EXCHANGEDB_RT_PURSE_DEPOSITS: + statement = "select_above_serial_by_table_purse_deposits"; + rh = &lrbt_cb_table_purse_deposits; + break; + case TALER_EXCHANGEDB_RT_ACCOUNT_MERGES: + statement = "select_above_serial_by_table_account_merges"; + rh = &lrbt_cb_table_account_merges; + break; + case TALER_EXCHANGEDB_RT_HISTORY_REQUESTS: + statement = "select_above_serial_by_table_history_requests"; + rh = &lrbt_cb_table_history_requests; + break; + case TALER_EXCHANGEDB_RT_CLOSE_REQUESTS: + statement = "select_above_serial_by_table_close_requests"; + rh = &lrbt_cb_table_close_requests; + break; + case TALER_EXCHANGEDB_RT_WADS_OUT: + statement = "select_above_serial_by_table_wads_out"; + rh = &lrbt_cb_table_wads_out; + break; + case TALER_EXCHANGEDB_RT_WADS_OUT_ENTRIES: + statement = "select_above_serial_by_table_wads_out_entries"; + rh = &lrbt_cb_table_wads_out_entries; + break; + case TALER_EXCHANGEDB_RT_WADS_IN: + statement = "select_above_serial_by_table_wads_in"; + rh = &lrbt_cb_table_wads_in; + break; + case TALER_EXCHANGEDB_RT_WADS_IN_ENTRIES: + statement = "select_above_serial_by_table_wads_in_entries"; + rh = &lrbt_cb_table_wads_in_entries; + break; + case TALER_EXCHANGEDB_RT_PROFIT_DRAINS: + statement = "select_above_serial_by_table_profit_drains"; + rh = &lrbt_cb_table_profit_drains; + break; + default: + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + statement, + params, + rh, + &ctx); + if (qs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to run `%s'\n", + statement); + return qs; + } + if (ctx.error) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + return qs; +} + + /* end of lrbt_callbacks.c */ diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index b828bb284..77f7efc9b 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -35,6 +35,7 @@ #include "pg_insert_reserve_open_deposit.h" #include "pg_iterate_kyc_reference.h" #include "pg_iterate_reserve_close_info.h" +#include "pg_lookup_records_by_table.h" #include "pg_select_reserve_close_info.h" #include #include @@ -13797,245 +13798,6 @@ postgres_lookup_serial_by_table (void *cls, } -/** - * Closure for callbacks used by #postgres_lookup_records_by_table. - */ -struct LookupRecordsByTableContext -{ - /** - * Plugin context. - */ - struct PostgresClosure *pg; - - /** - * Function to call with the results. - */ - TALER_EXCHANGEDB_ReplicationCallback cb; - - /** - * Closure for @a cb. - */ - void *cb_cls; - - /** - * Set to true on errors. - */ - bool error; -}; - - -#include "lrbt_callbacks.c" - - -/** - * Lookup records above @a serial number in @a table. Used in - * exchange-auditor database replication. - * - * @param cls closure - * @param table table for which we should return the serial - * @param serial largest serial number to exclude - * @param cb function to call on the records - * @param cb_cls closure for @a cb - * @return transaction status code, GNUNET_DB_STATUS_HARD_ERROR if - * @a table does not have a serial number - */ -static enum GNUNET_DB_QueryStatus -postgres_lookup_records_by_table (void *cls, - enum TALER_EXCHANGEDB_ReplicatedTable table, - uint64_t serial, - TALER_EXCHANGEDB_ReplicationCallback cb, - void *cb_cls) -{ - struct PostgresClosure *pg = cls; - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_uint64 (&serial), - GNUNET_PQ_query_param_end - }; - struct LookupRecordsByTableContext ctx = { - .pg = pg, - .cb = cb, - .cb_cls = cb_cls - }; - GNUNET_PQ_PostgresResultHandler rh; - const char *statement; - enum GNUNET_DB_QueryStatus qs; - - switch (table) - { - case TALER_EXCHANGEDB_RT_DENOMINATIONS: - statement = "select_above_serial_by_table_denominations"; - rh = &lrbt_cb_table_denominations; - break; - case TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS: - statement = "select_above_serial_by_table_denomination_revocations"; - rh = &lrbt_cb_table_denomination_revocations; - break; - case TALER_EXCHANGEDB_RT_WIRE_TARGETS: - statement = "select_above_serial_by_table_wire_targets"; - rh = &lrbt_cb_table_wire_targets; - break; - case TALER_EXCHANGEDB_RT_RESERVES: - statement = "select_above_serial_by_table_reserves"; - rh = &lrbt_cb_table_reserves; - break; - case TALER_EXCHANGEDB_RT_RESERVES_IN: - statement = "select_above_serial_by_table_reserves_in"; - rh = &lrbt_cb_table_reserves_in; - break; - case TALER_EXCHANGEDB_RT_RESERVES_CLOSE: - statement = "select_above_serial_by_table_reserves_close"; - rh = &lrbt_cb_table_reserves_close; - break; - case TALER_EXCHANGEDB_RT_RESERVES_OUT: - statement = "select_above_serial_by_table_reserves_out"; - rh = &lrbt_cb_table_reserves_out; - break; - case TALER_EXCHANGEDB_RT_AUDITORS: - statement = "select_above_serial_by_table_auditors"; - rh = &lrbt_cb_table_auditors; - break; - case TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS: - statement = "select_above_serial_by_table_auditor_denom_sigs"; - rh = &lrbt_cb_table_auditor_denom_sigs; - break; - case TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS: - statement = "select_above_serial_by_table_exchange_sign_keys"; - rh = &lrbt_cb_table_exchange_sign_keys; - break; - case TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS: - statement = "select_above_serial_by_table_signkey_revocations"; - rh = &lrbt_cb_table_signkey_revocations; - break; - case TALER_EXCHANGEDB_RT_KNOWN_COINS: - statement = "select_above_serial_by_table_known_coins"; - rh = &lrbt_cb_table_known_coins; - break; - case TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS: - statement = "select_above_serial_by_table_refresh_commitments"; - rh = &lrbt_cb_table_refresh_commitments; - break; - case TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS: - statement = "select_above_serial_by_table_refresh_revealed_coins"; - rh = &lrbt_cb_table_refresh_revealed_coins; - break; - case TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS: - statement = "select_above_serial_by_table_refresh_transfer_keys"; - rh = &lrbt_cb_table_refresh_transfer_keys; - break; - case TALER_EXCHANGEDB_RT_DEPOSITS: - statement = "select_above_serial_by_table_deposits"; - rh = &lrbt_cb_table_deposits; - break; - case TALER_EXCHANGEDB_RT_REFUNDS: - statement = "select_above_serial_by_table_refunds"; - rh = &lrbt_cb_table_refunds; - break; - case TALER_EXCHANGEDB_RT_WIRE_OUT: - statement = "select_above_serial_by_table_wire_out"; - rh = &lrbt_cb_table_wire_out; - break; - case TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING: - statement = "select_above_serial_by_table_aggregation_tracking"; - rh = &lrbt_cb_table_aggregation_tracking; - break; - case TALER_EXCHANGEDB_RT_WIRE_FEE: - statement = "select_above_serial_by_table_wire_fee"; - rh = &lrbt_cb_table_wire_fee; - break; - case TALER_EXCHANGEDB_RT_GLOBAL_FEE: - statement = "select_above_serial_by_table_global_fee"; - rh = &lrbt_cb_table_global_fee; - break; - case TALER_EXCHANGEDB_RT_RECOUP: - statement = "select_above_serial_by_table_recoup"; - rh = &lrbt_cb_table_recoup; - break; - case TALER_EXCHANGEDB_RT_RECOUP_REFRESH: - statement = "select_above_serial_by_table_recoup_refresh"; - rh = &lrbt_cb_table_recoup_refresh; - break; - case TALER_EXCHANGEDB_RT_EXTENSIONS: - statement = "select_above_serial_by_table_extensions"; - rh = &lrbt_cb_table_extensions; - break; - case TALER_EXCHANGEDB_RT_EXTENSION_DETAILS: - statement = "select_above_serial_by_table_extension_details"; - rh = &lrbt_cb_table_extension_details; - break; - case TALER_EXCHANGEDB_RT_PURSE_REQUESTS: - statement = "select_above_serial_by_table_purse_requests"; - rh = &lrbt_cb_table_purse_requests; - break; - case TALER_EXCHANGEDB_RT_PURSE_REFUNDS: - statement = "select_above_serial_by_table_purse_refunds"; - rh = &lrbt_cb_table_purse_refunds; - break; - case TALER_EXCHANGEDB_RT_PURSE_MERGES: - statement = "select_above_serial_by_table_purse_merges"; - rh = &lrbt_cb_table_purse_merges; - break; - case TALER_EXCHANGEDB_RT_PURSE_DEPOSITS: - statement = "select_above_serial_by_table_purse_deposits"; - rh = &lrbt_cb_table_purse_deposits; - break; - case TALER_EXCHANGEDB_RT_ACCOUNT_MERGES: - statement = "select_above_serial_by_table_account_merges"; - rh = &lrbt_cb_table_account_merges; - break; - case TALER_EXCHANGEDB_RT_HISTORY_REQUESTS: - statement = "select_above_serial_by_table_history_requests"; - rh = &lrbt_cb_table_history_requests; - break; - case TALER_EXCHANGEDB_RT_CLOSE_REQUESTS: - statement = "select_above_serial_by_table_close_requests"; - rh = &lrbt_cb_table_close_requests; - break; - case TALER_EXCHANGEDB_RT_WADS_OUT: - statement = "select_above_serial_by_table_wads_out"; - rh = &lrbt_cb_table_wads_out; - break; - case TALER_EXCHANGEDB_RT_WADS_OUT_ENTRIES: - statement = "select_above_serial_by_table_wads_out_entries"; - rh = &lrbt_cb_table_wads_out_entries; - break; - case TALER_EXCHANGEDB_RT_WADS_IN: - statement = "select_above_serial_by_table_wads_in"; - rh = &lrbt_cb_table_wads_in; - break; - case TALER_EXCHANGEDB_RT_WADS_IN_ENTRIES: - statement = "select_above_serial_by_table_wads_in_entries"; - rh = &lrbt_cb_table_wads_in_entries; - break; - case TALER_EXCHANGEDB_RT_PROFIT_DRAINS: - statement = "select_above_serial_by_table_profit_drains"; - rh = &lrbt_cb_table_profit_drains; - break; - default: - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } - - qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, - statement, - params, - rh, - &ctx); - if (qs < 0) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to run `%s'\n", - statement); - return qs; - } - if (ctx.error) - { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } - return qs; -} - - /** * Function called to grab a work shard on an operation @a op. Runs in its * own transaction. @@ -16584,7 +16346,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) plugin->lookup_serial_by_table = &postgres_lookup_serial_by_table; plugin->lookup_records_by_table - = &postgres_lookup_records_by_table; + = &TEH_PG_lookup_records_by_table; plugin->begin_shard = &postgres_begin_shard; plugin->abort_shard