diff options
| author | Christian Grothoff <grothoff@gnunet.org> | 2022-10-08 21:09:13 +0200 | 
|---|---|---|
| committer | Christian Grothoff <grothoff@gnunet.org> | 2022-10-08 21:09:13 +0200 | 
| commit | 04cf1dc08858940b7f6be02f5b28aa0f7e3c41ba (patch) | |
| tree | 783b458afbe4f1ea764b06e0406edba049cf2b4a /src/exchangedb | |
| parent | f2a3a28d46e8b61ff4c68c639156be7b00dd6f51 (diff) | |
more db refactoring
Diffstat (limited to 'src/exchangedb')
| -rw-r--r-- | src/exchangedb/Makefile.am | 1 | ||||
| -rw-r--r-- | src/exchangedb/pg_lookup_records_by_table.c (renamed from src/exchangedb/lrbt_callbacks.c) | 235 | ||||
| -rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 242 | 
3 files changed, 235 insertions, 243 deletions
| diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index d46b6456..1bd781c4 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -75,6 +75,7 @@ libtaler_plugin_exchangedb_postgres_la_SOURCES = \    pg_insert_reserve_open_deposit.c pg_insert_reserve_open_deposit.h \    pg_iterate_kyc_reference.c pg_iterate_kyc_reference.h \    pg_iterate_reserve_close_info.c pg_iterate_reserve_close_info.h \ +  pg_lookup_records_by_table.c pg_lookup_records_by_table.h \    pg_select_reserve_close_info.c pg_select_reserve_close_info.h  libtaler_plugin_exchangedb_postgres_la_LIBADD = \    $(LTLIBINTL) diff --git a/src/exchangedb/lrbt_callbacks.c b/src/exchangedb/pg_lookup_records_by_table.c index a8f68f4c..13d94e74 100644 --- a/src/exchangedb/lrbt_callbacks.c +++ b/src/exchangedb/pg_lookup_records_by_table.c @@ -18,11 +18,43 @@       SPDX-License-Identifier: AGPL3.0-or-later   */  /** - * @file exchangedb/lrbt_callbacks.c - * @brief callbacks used by postgres_lookup_records_by_table, to be - *        inlined into the plugin + * @file exchangedb/pg_lookup_records_by_table.c + * @brief implementation of lookup_records_by_table   * @author Christian Grothoff   */ +#include "platform.h" +#include "taler_error_codes.h" +#include "taler_dbevents.h" +#include "taler_pq_lib.h" +#include "pg_lookup_records_by_table.h" +#include "pg_helper.h" + + +/** + * Closure for callbacks used by #postgres_lookup_records_by_table. + */ +struct LookupRecordsByTableContext +{ +  /** +   * Plugin context. +   */ +  struct PostgresClosure *pg; + +  /** +   * Function to call with the results. +   */ +  TALER_EXCHANGEDB_ReplicationCallback cb; + +  /** +   * Closure for @a cb. +   */ +  void *cb_cls; + +  /** +   * Set to true on errors. +   */ +  bool error; +};  /** @@ -2174,4 +2206,201 @@ lrbt_cb_table_profit_drains (void *cls,  } +enum GNUNET_DB_QueryStatus +TEH_PG_lookup_records_by_table (void *cls, +                                enum TALER_EXCHANGEDB_ReplicatedTable table, +                                uint64_t serial, +                                TALER_EXCHANGEDB_ReplicationCallback cb, +                                void *cb_cls) +{ +  struct PostgresClosure *pg = cls; +  struct GNUNET_PQ_QueryParam params[] = { +    GNUNET_PQ_query_param_uint64 (&serial), +    GNUNET_PQ_query_param_end +  }; +  struct LookupRecordsByTableContext ctx = { +    .pg = pg, +    .cb = cb, +    .cb_cls = cb_cls +  }; +  GNUNET_PQ_PostgresResultHandler rh; +  const char *statement; +  enum GNUNET_DB_QueryStatus qs; + +  switch (table) +  { +  case TALER_EXCHANGEDB_RT_DENOMINATIONS: +    statement = "select_above_serial_by_table_denominations"; +    rh = &lrbt_cb_table_denominations; +    break; +  case TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS: +    statement = "select_above_serial_by_table_denomination_revocations"; +    rh = &lrbt_cb_table_denomination_revocations; +    break; +  case TALER_EXCHANGEDB_RT_WIRE_TARGETS: +    statement = "select_above_serial_by_table_wire_targets"; +    rh = &lrbt_cb_table_wire_targets; +    break; +  case TALER_EXCHANGEDB_RT_RESERVES: +    statement = "select_above_serial_by_table_reserves"; +    rh = &lrbt_cb_table_reserves; +    break; +  case TALER_EXCHANGEDB_RT_RESERVES_IN: +    statement = "select_above_serial_by_table_reserves_in"; +    rh = &lrbt_cb_table_reserves_in; +    break; +  case TALER_EXCHANGEDB_RT_RESERVES_CLOSE: +    statement = "select_above_serial_by_table_reserves_close"; +    rh = &lrbt_cb_table_reserves_close; +    break; +  case TALER_EXCHANGEDB_RT_RESERVES_OUT: +    statement = "select_above_serial_by_table_reserves_out"; +    rh = &lrbt_cb_table_reserves_out; +    break; +  case TALER_EXCHANGEDB_RT_AUDITORS: +    statement = "select_above_serial_by_table_auditors"; +    rh = &lrbt_cb_table_auditors; +    break; +  case TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS: +    statement = "select_above_serial_by_table_auditor_denom_sigs"; +    rh = &lrbt_cb_table_auditor_denom_sigs; +    break; +  case TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS: +    statement = "select_above_serial_by_table_exchange_sign_keys"; +    rh = &lrbt_cb_table_exchange_sign_keys; +    break; +  case TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS: +    statement = "select_above_serial_by_table_signkey_revocations"; +    rh = &lrbt_cb_table_signkey_revocations; +    break; +  case TALER_EXCHANGEDB_RT_KNOWN_COINS: +    statement = "select_above_serial_by_table_known_coins"; +    rh = &lrbt_cb_table_known_coins; +    break; +  case TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS: +    statement = "select_above_serial_by_table_refresh_commitments"; +    rh = &lrbt_cb_table_refresh_commitments; +    break; +  case TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS: +    statement = "select_above_serial_by_table_refresh_revealed_coins"; +    rh = &lrbt_cb_table_refresh_revealed_coins; +    break; +  case TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS: +    statement = "select_above_serial_by_table_refresh_transfer_keys"; +    rh = &lrbt_cb_table_refresh_transfer_keys; +    break; +  case TALER_EXCHANGEDB_RT_DEPOSITS: +    statement = "select_above_serial_by_table_deposits"; +    rh = &lrbt_cb_table_deposits; +    break; +  case TALER_EXCHANGEDB_RT_REFUNDS: +    statement = "select_above_serial_by_table_refunds"; +    rh = &lrbt_cb_table_refunds; +    break; +  case TALER_EXCHANGEDB_RT_WIRE_OUT: +    statement = "select_above_serial_by_table_wire_out"; +    rh = &lrbt_cb_table_wire_out; +    break; +  case TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING: +    statement = "select_above_serial_by_table_aggregation_tracking"; +    rh = &lrbt_cb_table_aggregation_tracking; +    break; +  case TALER_EXCHANGEDB_RT_WIRE_FEE: +    statement = "select_above_serial_by_table_wire_fee"; +    rh = &lrbt_cb_table_wire_fee; +    break; +  case TALER_EXCHANGEDB_RT_GLOBAL_FEE: +    statement = "select_above_serial_by_table_global_fee"; +    rh = &lrbt_cb_table_global_fee; +    break; +  case TALER_EXCHANGEDB_RT_RECOUP: +    statement = "select_above_serial_by_table_recoup"; +    rh = &lrbt_cb_table_recoup; +    break; +  case TALER_EXCHANGEDB_RT_RECOUP_REFRESH: +    statement = "select_above_serial_by_table_recoup_refresh"; +    rh = &lrbt_cb_table_recoup_refresh; +    break; +  case TALER_EXCHANGEDB_RT_EXTENSIONS: +    statement = "select_above_serial_by_table_extensions"; +    rh = &lrbt_cb_table_extensions; +    break; +  case TALER_EXCHANGEDB_RT_EXTENSION_DETAILS: +    statement = "select_above_serial_by_table_extension_details"; +    rh = &lrbt_cb_table_extension_details; +    break; +  case TALER_EXCHANGEDB_RT_PURSE_REQUESTS: +    statement = "select_above_serial_by_table_purse_requests"; +    rh = &lrbt_cb_table_purse_requests; +    break; +  case TALER_EXCHANGEDB_RT_PURSE_REFUNDS: +    statement = "select_above_serial_by_table_purse_refunds"; +    rh = &lrbt_cb_table_purse_refunds; +    break; +  case TALER_EXCHANGEDB_RT_PURSE_MERGES: +    statement = "select_above_serial_by_table_purse_merges"; +    rh = &lrbt_cb_table_purse_merges; +    break; +  case TALER_EXCHANGEDB_RT_PURSE_DEPOSITS: +    statement = "select_above_serial_by_table_purse_deposits"; +    rh = &lrbt_cb_table_purse_deposits; +    break; +  case TALER_EXCHANGEDB_RT_ACCOUNT_MERGES: +    statement = "select_above_serial_by_table_account_merges"; +    rh = &lrbt_cb_table_account_merges; +    break; +  case TALER_EXCHANGEDB_RT_HISTORY_REQUESTS: +    statement = "select_above_serial_by_table_history_requests"; +    rh = &lrbt_cb_table_history_requests; +    break; +  case TALER_EXCHANGEDB_RT_CLOSE_REQUESTS: +    statement = "select_above_serial_by_table_close_requests"; +    rh = &lrbt_cb_table_close_requests; +    break; +  case TALER_EXCHANGEDB_RT_WADS_OUT: +    statement = "select_above_serial_by_table_wads_out"; +    rh = &lrbt_cb_table_wads_out; +    break; +  case TALER_EXCHANGEDB_RT_WADS_OUT_ENTRIES: +    statement = "select_above_serial_by_table_wads_out_entries"; +    rh = &lrbt_cb_table_wads_out_entries; +    break; +  case TALER_EXCHANGEDB_RT_WADS_IN: +    statement = "select_above_serial_by_table_wads_in"; +    rh = &lrbt_cb_table_wads_in; +    break; +  case TALER_EXCHANGEDB_RT_WADS_IN_ENTRIES: +    statement = "select_above_serial_by_table_wads_in_entries"; +    rh = &lrbt_cb_table_wads_in_entries; +    break; +  case TALER_EXCHANGEDB_RT_PROFIT_DRAINS: +    statement = "select_above_serial_by_table_profit_drains"; +    rh = &lrbt_cb_table_profit_drains; +    break; +  default: +    GNUNET_break (0); +    return GNUNET_DB_STATUS_HARD_ERROR; +  } + +  qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, +                                             statement, +                                             params, +                                             rh, +                                             &ctx); +  if (qs < 0) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to run `%s'\n", +                statement); +    return qs; +  } +  if (ctx.error) +  { +    GNUNET_break (0); +    return GNUNET_DB_STATUS_HARD_ERROR; +  } +  return qs; +} + +  /* end of lrbt_callbacks.c */ diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index b828bb28..77f7efc9 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -35,6 +35,7 @@  #include "pg_insert_reserve_open_deposit.h"  #include "pg_iterate_kyc_reference.h"  #include "pg_iterate_reserve_close_info.h" +#include "pg_lookup_records_by_table.h"  #include "pg_select_reserve_close_info.h"  #include <poll.h>  #include <pthread.h> @@ -13798,245 +13799,6 @@ postgres_lookup_serial_by_table (void *cls,  /** - * Closure for callbacks used by #postgres_lookup_records_by_table. - */ -struct LookupRecordsByTableContext -{ -  /** -   * Plugin context. -   */ -  struct PostgresClosure *pg; - -  /** -   * Function to call with the results. -   */ -  TALER_EXCHANGEDB_ReplicationCallback cb; - -  /** -   * Closure for @a cb. -   */ -  void *cb_cls; - -  /** -   * Set to true on errors. -   */ -  bool error; -}; - - -#include "lrbt_callbacks.c" - - -/** - * Lookup records above @a serial number in @a table. Used in - * exchange-auditor database replication. - * - * @param cls closure - * @param table table for which we should return the serial - * @param serial largest serial number to exclude - * @param cb function to call on the records - * @param cb_cls closure for @a cb - * @return transaction status code, GNUNET_DB_STATUS_HARD_ERROR if - *         @a table does not have a serial number - */ -static enum GNUNET_DB_QueryStatus -postgres_lookup_records_by_table (void *cls, -                                  enum TALER_EXCHANGEDB_ReplicatedTable table, -                                  uint64_t serial, -                                  TALER_EXCHANGEDB_ReplicationCallback cb, -                                  void *cb_cls) -{ -  struct PostgresClosure *pg = cls; -  struct GNUNET_PQ_QueryParam params[] = { -    GNUNET_PQ_query_param_uint64 (&serial), -    GNUNET_PQ_query_param_end -  }; -  struct LookupRecordsByTableContext ctx = { -    .pg = pg, -    .cb = cb, -    .cb_cls = cb_cls -  }; -  GNUNET_PQ_PostgresResultHandler rh; -  const char *statement; -  enum GNUNET_DB_QueryStatus qs; - -  switch (table) -  { -  case TALER_EXCHANGEDB_RT_DENOMINATIONS: -    statement = "select_above_serial_by_table_denominations"; -    rh = &lrbt_cb_table_denominations; -    break; -  case TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS: -    statement = "select_above_serial_by_table_denomination_revocations"; -    rh = &lrbt_cb_table_denomination_revocations; -    break; -  case TALER_EXCHANGEDB_RT_WIRE_TARGETS: -    statement = "select_above_serial_by_table_wire_targets"; -    rh = &lrbt_cb_table_wire_targets; -    break; -  case TALER_EXCHANGEDB_RT_RESERVES: -    statement = "select_above_serial_by_table_reserves"; -    rh = &lrbt_cb_table_reserves; -    break; -  case TALER_EXCHANGEDB_RT_RESERVES_IN: -    statement = "select_above_serial_by_table_reserves_in"; -    rh = &lrbt_cb_table_reserves_in; -    break; -  case TALER_EXCHANGEDB_RT_RESERVES_CLOSE: -    statement = "select_above_serial_by_table_reserves_close"; -    rh = &lrbt_cb_table_reserves_close; -    break; -  case TALER_EXCHANGEDB_RT_RESERVES_OUT: -    statement = "select_above_serial_by_table_reserves_out"; -    rh = &lrbt_cb_table_reserves_out; -    break; -  case TALER_EXCHANGEDB_RT_AUDITORS: -    statement = "select_above_serial_by_table_auditors"; -    rh = &lrbt_cb_table_auditors; -    break; -  case TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS: -    statement = "select_above_serial_by_table_auditor_denom_sigs"; -    rh = &lrbt_cb_table_auditor_denom_sigs; -    break; -  case TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS: -    statement = "select_above_serial_by_table_exchange_sign_keys"; -    rh = &lrbt_cb_table_exchange_sign_keys; -    break; -  case TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS: -    statement = "select_above_serial_by_table_signkey_revocations"; -    rh = &lrbt_cb_table_signkey_revocations; -    break; -  case TALER_EXCHANGEDB_RT_KNOWN_COINS: -    statement = "select_above_serial_by_table_known_coins"; -    rh = &lrbt_cb_table_known_coins; -    break; -  case TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS: -    statement = "select_above_serial_by_table_refresh_commitments"; -    rh = &lrbt_cb_table_refresh_commitments; -    break; -  case TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS: -    statement = "select_above_serial_by_table_refresh_revealed_coins"; -    rh = &lrbt_cb_table_refresh_revealed_coins; -    break; -  case TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS: -    statement = "select_above_serial_by_table_refresh_transfer_keys"; -    rh = &lrbt_cb_table_refresh_transfer_keys; -    break; -  case TALER_EXCHANGEDB_RT_DEPOSITS: -    statement = "select_above_serial_by_table_deposits"; -    rh = &lrbt_cb_table_deposits; -    break; -  case TALER_EXCHANGEDB_RT_REFUNDS: -    statement = "select_above_serial_by_table_refunds"; -    rh = &lrbt_cb_table_refunds; -    break; -  case TALER_EXCHANGEDB_RT_WIRE_OUT: -    statement = "select_above_serial_by_table_wire_out"; -    rh = &lrbt_cb_table_wire_out; -    break; -  case TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING: -    statement = "select_above_serial_by_table_aggregation_tracking"; -    rh = &lrbt_cb_table_aggregation_tracking; -    break; -  case TALER_EXCHANGEDB_RT_WIRE_FEE: -    statement = "select_above_serial_by_table_wire_fee"; -    rh = &lrbt_cb_table_wire_fee; -    break; -  case TALER_EXCHANGEDB_RT_GLOBAL_FEE: -    statement = "select_above_serial_by_table_global_fee"; -    rh = &lrbt_cb_table_global_fee; -    break; -  case TALER_EXCHANGEDB_RT_RECOUP: -    statement = "select_above_serial_by_table_recoup"; -    rh = &lrbt_cb_table_recoup; -    break; -  case TALER_EXCHANGEDB_RT_RECOUP_REFRESH: -    statement = "select_above_serial_by_table_recoup_refresh"; -    rh = &lrbt_cb_table_recoup_refresh; -    break; -  case TALER_EXCHANGEDB_RT_EXTENSIONS: -    statement = "select_above_serial_by_table_extensions"; -    rh = &lrbt_cb_table_extensions; -    break; -  case TALER_EXCHANGEDB_RT_EXTENSION_DETAILS: -    statement = "select_above_serial_by_table_extension_details"; -    rh = &lrbt_cb_table_extension_details; -    break; -  case TALER_EXCHANGEDB_RT_PURSE_REQUESTS: -    statement = "select_above_serial_by_table_purse_requests"; -    rh = &lrbt_cb_table_purse_requests; -    break; -  case TALER_EXCHANGEDB_RT_PURSE_REFUNDS: -    statement = "select_above_serial_by_table_purse_refunds"; -    rh = &lrbt_cb_table_purse_refunds; -    break; -  case TALER_EXCHANGEDB_RT_PURSE_MERGES: -    statement = "select_above_serial_by_table_purse_merges"; -    rh = &lrbt_cb_table_purse_merges; -    break; -  case TALER_EXCHANGEDB_RT_PURSE_DEPOSITS: -    statement = "select_above_serial_by_table_purse_deposits"; -    rh = &lrbt_cb_table_purse_deposits; -    break; -  case TALER_EXCHANGEDB_RT_ACCOUNT_MERGES: -    statement = "select_above_serial_by_table_account_merges"; -    rh = &lrbt_cb_table_account_merges; -    break; -  case TALER_EXCHANGEDB_RT_HISTORY_REQUESTS: -    statement = "select_above_serial_by_table_history_requests"; -    rh = &lrbt_cb_table_history_requests; -    break; -  case TALER_EXCHANGEDB_RT_CLOSE_REQUESTS: -    statement = "select_above_serial_by_table_close_requests"; -    rh = &lrbt_cb_table_close_requests; -    break; -  case TALER_EXCHANGEDB_RT_WADS_OUT: -    statement = "select_above_serial_by_table_wads_out"; -    rh = &lrbt_cb_table_wads_out; -    break; -  case TALER_EXCHANGEDB_RT_WADS_OUT_ENTRIES: -    statement = "select_above_serial_by_table_wads_out_entries"; -    rh = &lrbt_cb_table_wads_out_entries; -    break; -  case TALER_EXCHANGEDB_RT_WADS_IN: -    statement = "select_above_serial_by_table_wads_in"; -    rh = &lrbt_cb_table_wads_in; -    break; -  case TALER_EXCHANGEDB_RT_WADS_IN_ENTRIES: -    statement = "select_above_serial_by_table_wads_in_entries"; -    rh = &lrbt_cb_table_wads_in_entries; -    break; -  case TALER_EXCHANGEDB_RT_PROFIT_DRAINS: -    statement = "select_above_serial_by_table_profit_drains"; -    rh = &lrbt_cb_table_profit_drains; -    break; -  default: -    GNUNET_break (0); -    return GNUNET_DB_STATUS_HARD_ERROR; -  } - -  qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, -                                             statement, -                                             params, -                                             rh, -                                             &ctx); -  if (qs < 0) -  { -    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Failed to run `%s'\n", -                statement); -    return qs; -  } -  if (ctx.error) -  { -    GNUNET_break (0); -    return GNUNET_DB_STATUS_HARD_ERROR; -  } -  return qs; -} - - -/**   * Function called to grab a work shard on an operation @a op. Runs in its   * own transaction.   * @@ -16584,7 +16346,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)    plugin->lookup_serial_by_table      = &postgres_lookup_serial_by_table;    plugin->lookup_records_by_table -    = &postgres_lookup_records_by_table; +    = &TEH_PG_lookup_records_by_table;    plugin->begin_shard      = &postgres_begin_shard;    plugin->abort_shard | 
