diff options
Diffstat (limited to 'src/exchangedb/plugin_exchangedb_postgres.c')
-rw-r--r-- | src/exchangedb/plugin_exchangedb_postgres.c | 258 |
1 files changed, 198 insertions, 60 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 63ed8874..e72e96cc 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -127,6 +127,7 @@ postgres_drop_tables (void *cls) PGconn *conn; int ret; + /* FIXME: use GNUNET_PQ_connect_with_cfg instead? */ conn = GNUNET_PQ_connect (pc->connection_cfg_str); if (NULL == conn) return GNUNET_SYSERR; @@ -218,12 +219,13 @@ postgres_create_tables (void *cls) ",credit_frac INT4 NOT NULL" ",credit_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL" ",sender_account_details TEXT NOT NULL" + ",exchange_account_section TEXT NOT NULL" ",execution_date INT8 NOT NULL" ",PRIMARY KEY (reserve_pub, wire_reference)" ");"), /* Create indices on reserves_in */ GNUNET_PQ_make_try_execute ("CREATE INDEX reserves_in_execution_index" - " ON reserves_in (execution_date);"), + " ON reserves_in (exchange_account_section,execution_date);"), /* This table contains the data for wire transfers the exchange has executed to close a reserve. */ GNUNET_PQ_make_execute("CREATE TABLE IF NOT EXISTS reserves_close " @@ -366,6 +368,7 @@ postgres_create_tables (void *cls) ",execution_date INT8 NOT NULL" ",wtid_raw BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")" ",wire_target TEXT NOT NULL" + ",exchange_account_section TEXT NOT NULL" ",amount_val INT8 NOT NULL" ",amount_frac INT4 NOT NULL" ",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL" @@ -433,6 +436,7 @@ postgres_create_tables (void *cls) PGconn *conn; int ret; + /* FIXME: use GNUNET_PQ_connect_with_cfg instead? */ conn = GNUNET_PQ_connect (pc->connection_cfg_str); if (NULL == conn) return GNUNET_SYSERR; @@ -585,21 +589,23 @@ postgres_prepare (PGconn *db_conn) ",credit_val" ",credit_frac" ",credit_curr" + ",exchange_account_section" ",sender_account_details" ",execution_date" ") VALUES " - "($1, $2, $3, $4, $5, $6, $7) " + "($1, $2, $3, $4, $5, $6, $7, $8) " "ON CONFLICT DO NOTHING;", - 7), + 8), /* Used in postgres_select_reserves_in_above_serial_id() to obtain inbound transactions for reserves with serial id '\geq' the given parameter */ GNUNET_PQ_make_prepare ("reserves_in_get_latest_wire_reference", "SELECT" " wire_reference" " FROM reserves_in" + " WHERE exchange_account_section=$1" " ORDER BY reserve_in_serial_id DESC" " LIMIT 1;", - 0), + 1), /* Used in postgres_select_reserves_in_above_serial_id() to obtain inbound transactions for reserves with serial id '\geq' the given parameter */ GNUNET_PQ_make_prepare ("audit_reserves_in_get_transactions_incr", @@ -616,6 +622,22 @@ postgres_prepare (PGconn *db_conn) " WHERE reserve_in_serial_id>=$1" " ORDER BY reserve_in_serial_id;", 1), + /* Used in postgres_select_reserves_in_above_serial_id() to obtain inbound + transactions for reserves with serial id '\geq' the given parameter */ + GNUNET_PQ_make_prepare ("audit_reserves_in_get_transactions_incr_by_account", + "SELECT" + " reserve_pub" + ",wire_reference" + ",credit_val" + ",credit_frac" + ",credit_curr" + ",execution_date" + ",sender_account_details" + ",reserve_in_serial_id" + " FROM reserves_in" + " WHERE reserve_in_serial_id>=$1 AND exchange_account_section=$2" + " ORDER BY reserve_in_serial_id;", + 2), /* Used in #postgres_get_reserve_history() to obtain inbound transactions for a reserve */ GNUNET_PQ_make_prepare ("reserves_in_get_transactions", @@ -1210,12 +1232,13 @@ postgres_prepare (PGconn *db_conn) "(execution_date" ",wtid_raw" ",wire_target" + ",exchange_account_section" ",amount_val" ",amount_frac" ",amount_curr" ") VALUES " - "($1, $2, $3, $4, $5, $6);", - 6), + "($1, $2, $3, $4, $5, $6, $7);", + 7), /* Used in #postgres_wire_prepare_data_insert() to store wire transfer information before actually committing it with the bank */ GNUNET_PQ_make_prepare ("wire_prepare_data_insert", @@ -1285,6 +1308,20 @@ postgres_prepare (PGconn *db_conn) " WHERE wireout_uuid>=$1" " ORDER BY wireout_uuid ASC;", 1), + /* Used in #postgres_select_wire_out_above_serial_id_by_account() */ + GNUNET_PQ_make_prepare ("audit_get_wire_incr_by_account", + "SELECT" + " wireout_uuid" + ",execution_date" + ",wtid_raw" + ",wire_target" + ",amount_val" + ",amount_frac" + ",amount_curr" + " FROM wire_out" + " WHERE wireout_uuid>=$1 AND exchange_account_section=$2" + " ORDER BY wireout_uuid ASC;", + 2), /* Used in #postgres_insert_payback_request() to store payback information */ GNUNET_PQ_make_prepare ("payback_insert", @@ -1509,6 +1546,7 @@ postgres_get_session (void *cls) return session; } } + /* FIXME: use GNUNET_PQ_connect_with_cfg instead? */ db_conn = GNUNET_PQ_connect (pc->connection_cfg_str); if (NULL == db_conn) return NULL; @@ -1836,7 +1874,9 @@ reserves_update (void *cls, * @param reserve_pub public key of the reserve * @param balance the amount that has to be added to the reserve * @param execution_time when was the amount added - * @param sender_account_details account information for the sender + * @param sender_account_details account information for the sender (payto://-URL) + * @param exchange_account_section name of the section in the configuration for the exchange's + * account into which the deposit was made * @param wire_reference unique reference identifying the wire transfer (binary blob) * @param wire_reference_size number of bytes in @a wire_reference * @return transaction status code @@ -1847,7 +1887,8 @@ postgres_reserves_in_insert (void *cls, const struct TALER_ReservePublicKeyP *reserve_pub, const struct TALER_Amount *balance, struct GNUNET_TIME_Absolute execution_time, - const json_t *sender_account_details, + const char *sender_account_details, + const char *exchange_account_section, const void *wire_reference, size_t wire_reference_size) { @@ -1900,7 +1941,7 @@ postgres_reserves_in_insert (void *cls, as a foreign key. */ struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (reserve_pub), - TALER_PQ_query_param_json (sender_account_details), + GNUNET_PQ_query_param_string (sender_account_details), TALER_PQ_query_param_amount (balance), TALER_PQ_query_param_absolute_time (&expiry), GNUNET_PQ_query_param_end @@ -1932,7 +1973,8 @@ postgres_reserves_in_insert (void *cls, GNUNET_PQ_query_param_fixed_size (wire_reference, wire_reference_size), TALER_PQ_query_param_amount (balance), - TALER_PQ_query_param_json (sender_account_details), + GNUNET_PQ_query_param_string (exchange_account_section), + GNUNET_PQ_query_param_string (sender_account_details), TALER_PQ_query_param_absolute_time (&execution_time), GNUNET_PQ_query_param_end }; @@ -1982,6 +2024,8 @@ postgres_reserves_in_insert (void *cls, * * @param cls the @e cls of this struct with the plugin-specific state * @param session the database session handle + * @param exchange_account_name name of the section in the exchange's configuration + * for the account that we are tracking here * @param[out] wire_reference set to unique reference identifying the wire transfer (binary blob) * @param[out] wire_reference_size set to number of bytes in @a wire_reference * @return transaction status code @@ -1989,10 +2033,12 @@ postgres_reserves_in_insert (void *cls, static enum GNUNET_DB_QueryStatus postgres_get_latest_reserve_in_reference (void *cls, struct TALER_EXCHANGEDB_Session *session, + const char *exchange_account_name, void **wire_reference, size_t *wire_reference_size) { struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (exchange_account_name), GNUNET_PQ_query_param_end }; struct GNUNET_PQ_ResultSpec rs[] = { @@ -2228,8 +2274,8 @@ add_bank_to_exchange (void *cls, &bt->amount), TALER_PQ_result_spec_absolute_time ("execution_date", &bt->execution_date), - TALER_PQ_result_spec_json ("sender_account_details", - &bt->sender_account_details), + GNUNET_PQ_result_spec_string ("sender_account_details", + &bt->sender_account_details), GNUNET_PQ_result_spec_end }; @@ -2395,8 +2441,8 @@ add_exchange_to_bank (void *cls, &closing->closing_fee), TALER_PQ_result_spec_absolute_time ("execution_date", &closing->execution_date), - TALER_PQ_result_spec_json ("receiver_account", - &closing->receiver_account_details), + GNUNET_PQ_result_spec_string ("receiver_account", + &closing->receiver_account_details), GNUNET_PQ_result_spec_auto_from_type ("wtid", &closing->wtid), GNUNET_PQ_result_spec_end @@ -2716,7 +2762,7 @@ postgres_get_ready_deposit (void *cls, GNUNET_PQ_result_spec_auto_from_type ("coin_pub", &coin_pub), TALER_PQ_result_spec_json ("wire", - &wire), + &wire), GNUNET_PQ_result_spec_end }; enum GNUNET_DB_QueryStatus qs; @@ -3817,7 +3863,7 @@ add_coin_deposit (void *cls, GNUNET_PQ_result_spec_auto_from_type ("h_wire", &deposit->h_wire), TALER_PQ_result_spec_json ("wire", - &deposit->receiver_wire_account), + &deposit->receiver_wire_account), GNUNET_PQ_result_spec_auto_from_type ("coin_sig", &deposit->csig), GNUNET_PQ_result_spec_end @@ -4184,8 +4230,6 @@ handle_wt_result (void *cls, struct TALER_Amount amount_with_fee; struct TALER_Amount deposit_fee; json_t *wire; - json_t *t; - const char *wire_method; struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_uint64 ("aggregation_serial_id", &rowid), GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms", &h_contract_terms), @@ -4208,25 +4252,11 @@ handle_wt_result (void *cls, ctx->status = GNUNET_SYSERR; return; } - t = json_object_get (wire, "type"); - if (NULL == t) - { - GNUNET_break (0); - ctx->status = GNUNET_SYSERR; - return; - } - wire_method = json_string_value (t); - if (NULL == wire_method) - { - GNUNET_break (0); - ctx->status = GNUNET_SYSERR; - return; - } ctx->cb (ctx->cb_cls, rowid, &merchant_pub, - wire_method, &h_wire, + wire, exec_time, &h_contract_terms, &coin_pub, @@ -4586,14 +4616,14 @@ reserve_expired_cb (void *cls, for (unsigned int i=0;i<num_results;i++) { struct GNUNET_TIME_Absolute exp_date; - json_t *account_details; + char *account_details; struct TALER_ReservePublicKeyP reserve_pub; struct TALER_Amount remaining_balance; struct GNUNET_PQ_ResultSpec rs[] = { TALER_PQ_result_spec_absolute_time ("expiration_date", &exp_date), - TALER_PQ_result_spec_json ("account_details", - &account_details), + GNUNET_PQ_result_spec_string ("account_details", + &account_details), GNUNET_PQ_result_spec_auto_from_type ("reserve_pub", &reserve_pub), TALER_PQ_result_spec_amount ("current_balance", @@ -4680,7 +4710,7 @@ postgres_insert_reserve_closed (void *cls, struct TALER_EXCHANGEDB_Session *session, const struct TALER_ReservePublicKeyP *reserve_pub, struct GNUNET_TIME_Absolute execution_date, - const json_t *receiver_account, + const char *receiver_account, const struct TALER_WireTransferIdentifierRawP *wtid, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *closing_fee) @@ -4690,7 +4720,7 @@ postgres_insert_reserve_closed (void *cls, GNUNET_PQ_query_param_auto_from_type (reserve_pub), TALER_PQ_query_param_absolute_time (&execution_date), GNUNET_PQ_query_param_auto_from_type (wtid), - TALER_PQ_query_param_json (receiver_account), + GNUNET_PQ_query_param_string (receiver_account), TALER_PQ_query_param_amount (amount_with_fee), TALER_PQ_query_param_amount (closing_fee), GNUNET_PQ_query_param_end @@ -4891,6 +4921,8 @@ postgres_start_deferred_wire_out (void *cls, * @param date time of the wire transfer * @param wtid subject of the wire transfer * @param wire_account details about the receiver account of the wire transfer + * @param exchange_account_section configuration section of the exchange specifying the + * exchange's bank account being used * @param amount amount that was transmitted * @return transaction status code */ @@ -4900,12 +4932,14 @@ postgres_store_wire_transfer_out (void *cls, struct GNUNET_TIME_Absolute date, const struct TALER_WireTransferIdentifierRawP *wtid, const json_t *wire_account, + const char *exchange_account_section, const struct TALER_Amount *amount) { struct GNUNET_PQ_QueryParam params[] = { TALER_PQ_query_param_absolute_time (&date), GNUNET_PQ_query_param_auto_from_type (wtid), TALER_PQ_query_param_json (wire_account), + GNUNET_PQ_query_param_string (exchange_account_section), TALER_PQ_query_param_amount (amount), GNUNET_PQ_query_param_end }; @@ -4952,6 +4986,7 @@ postgres_gc (void *cls) long_ago = GNUNET_TIME_absolute_subtract (now, GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_YEARS, 10)); + /* FIXME: use GNUNET_PQ_connect_with_cfg instead? */ conn = GNUNET_PQ_connect (pc->connection_cfg_str); if (NULL == conn) return GNUNET_SYSERR; @@ -5427,7 +5462,7 @@ reserves_in_serial_helper_cb (void *cls, { struct TALER_ReservePublicKeyP reserve_pub; struct TALER_Amount credit; - json_t *sender_account_details; + char *sender_account_details; struct GNUNET_TIME_Absolute execution_date; uint64_t rowid; void *wire_reference; @@ -5442,8 +5477,8 @@ reserves_in_serial_helper_cb (void *cls, &credit), TALER_PQ_result_spec_absolute_time("execution_date", &execution_date), - TALER_PQ_result_spec_json ("sender_account_details", - &sender_account_details), + GNUNET_PQ_result_spec_string ("sender_account_details", + &sender_account_details), GNUNET_PQ_result_spec_uint64 ("reserve_in_serial_id", &rowid), GNUNET_PQ_result_spec_end @@ -5515,6 +5550,49 @@ postgres_select_reserves_in_above_serial_id (void *cls, /** + * Select inbound wire transfers into reserves_in above @a serial_id + * in monotonically increasing order by account. + * + * @param cls closure + * @param session database connection + * @param account_name name of the account to select by + * @param serial_id highest serial ID to exclude (select strictly larger) + * @param cb function to call on each result + * @param cb_cls closure for @a cb + * @return transaction status code + */ +static enum GNUNET_DB_QueryStatus +postgres_select_reserves_in_above_serial_id_by_account (void *cls, + struct TALER_EXCHANGEDB_Session *session, + const char *account_name, + uint64_t serial_id, + TALER_EXCHANGEDB_ReserveInCallback cb, + void *cb_cls) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&serial_id), + GNUNET_PQ_query_param_string (account_name), + GNUNET_PQ_query_param_end + }; + struct ReservesInSerialContext risc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; + + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "audit_reserves_in_get_transactions_incr_by_account", + params, + &reserves_in_serial_helper_cb, + &risc); + if (GNUNET_OK != risc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} + + +/** * Closure for #reserves_out_serial_helper_cb(). */ struct ReservesOutSerialContext @@ -5771,6 +5849,49 @@ postgres_select_wire_out_above_serial_id (void *cls, /** + * Function called to select all wire transfers the exchange + * executed by account. + * + * @param cls closure + * @param session database connection + * @param account_name account to select + * @param serial_id highest serial ID to exclude (select strictly larger) + * @param cb function to call for ONE unfinished item + * @param cb_cls closure for @a cb + * @return transaction status code + */ +static enum GNUNET_DB_QueryStatus +postgres_select_wire_out_above_serial_id_by_account (void *cls, + struct TALER_EXCHANGEDB_Session *session, + const char *account_name, + uint64_t serial_id, + TALER_EXCHANGEDB_WireTransferOutCallback cb, + void *cb_cls) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&serial_id), + GNUNET_PQ_query_param_string (account_name), + GNUNET_PQ_query_param_end + }; + struct WireOutSerialContext wosc = { + .cb = cb, + .cb_cls = cb_cls, + .status = GNUNET_OK + }; + enum GNUNET_DB_QueryStatus qs; + + qs = GNUNET_PQ_eval_prepared_multi_select (session->conn, + "audit_get_wire_incr_by_account", + params, + &wire_out_serial_helper_cb, + &wosc); + if (GNUNET_OK != wosc.status) + return GNUNET_DB_STATUS_HARD_ERROR; + return qs; +} + + +/** * Closure for #payback_serial_helper_cb(). */ struct PaybackSerialContext @@ -5949,7 +6070,7 @@ reserve_closed_serial_helper_cb (void *cls, { uint64_t rowid; struct TALER_ReservePublicKeyP reserve_pub; - json_t *receiver_account; + char *receiver_account; struct TALER_WireTransferIdentifierRawP wtid; struct TALER_Amount amount_with_fee; struct TALER_Amount closing_fee; @@ -5963,8 +6084,8 @@ reserve_closed_serial_helper_cb (void *cls, &execution_date), GNUNET_PQ_result_spec_auto_from_type ("wtid", &wtid), - TALER_PQ_result_spec_json ("receiver_account", - &receiver_account), + GNUNET_PQ_result_spec_string ("receiver_account", + &receiver_account), TALER_PQ_result_spec_amount ("amount", &amount_with_fee), TALER_PQ_result_spec_amount ("closing_fee", @@ -6277,7 +6398,7 @@ missing_wire_cb (void *cls, TALER_PQ_result_spec_amount ("amount_with_fee", &amount), TALER_PQ_result_spec_json ("wire", - &wire), + &wire), TALER_PQ_result_spec_absolute_time ("wire_deadline", &deadline), GNUNET_PQ_result_spec_auto_from_type ("tiny", @@ -6386,12 +6507,12 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "exchangedb-postgres", - "db_conn_str", + "CONFIG", &pg->connection_cfg_str)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "exchangedb-postgres", - "db_conn_str"); + "CONFIG"); GNUNET_free (pg); return NULL; } @@ -6455,19 +6576,36 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) plugin->start_deferred_wire_out = &postgres_start_deferred_wire_out; plugin->store_wire_transfer_out = &postgres_store_wire_transfer_out; plugin->gc = &postgres_gc; - plugin->select_deposits_above_serial_id = &postgres_select_deposits_above_serial_id; - plugin->select_refreshs_above_serial_id = &postgres_select_refreshs_above_serial_id; - plugin->select_refunds_above_serial_id = &postgres_select_refunds_above_serial_id; - plugin->select_reserves_in_above_serial_id = &postgres_select_reserves_in_above_serial_id; - plugin->select_reserves_out_above_serial_id = &postgres_select_reserves_out_above_serial_id; - plugin->select_wire_out_above_serial_id = &postgres_select_wire_out_above_serial_id; - plugin->select_payback_above_serial_id = &postgres_select_payback_above_serial_id; - plugin->select_reserve_closed_above_serial_id = &postgres_select_reserve_closed_above_serial_id; - plugin->insert_payback_request = &postgres_insert_payback_request; - plugin->get_reserve_by_h_blind = &postgres_get_reserve_by_h_blind; - plugin->insert_denomination_revocation = &postgres_insert_denomination_revocation; - plugin->get_denomination_revocation = &postgres_get_denomination_revocation; - plugin->select_deposits_missing_wire = &postgres_select_deposits_missing_wire; + plugin->select_deposits_above_serial_id + = &postgres_select_deposits_above_serial_id; + plugin->select_refreshs_above_serial_id + = &postgres_select_refreshs_above_serial_id; + plugin->select_refunds_above_serial_id + = &postgres_select_refunds_above_serial_id; + plugin->select_reserves_in_above_serial_id + = &postgres_select_reserves_in_above_serial_id; + plugin->select_reserves_in_above_serial_id_by_account + = &postgres_select_reserves_in_above_serial_id_by_account; + plugin->select_reserves_out_above_serial_id + = &postgres_select_reserves_out_above_serial_id; + plugin->select_wire_out_above_serial_id + = &postgres_select_wire_out_above_serial_id; + plugin->select_wire_out_above_serial_id_by_account + = &postgres_select_wire_out_above_serial_id_by_account; + plugin->select_payback_above_serial_id + = &postgres_select_payback_above_serial_id; + plugin->select_reserve_closed_above_serial_id + = &postgres_select_reserve_closed_above_serial_id; + plugin->insert_payback_request + = &postgres_insert_payback_request; + plugin->get_reserve_by_h_blind + = &postgres_get_reserve_by_h_blind; + plugin->insert_denomination_revocation + = &postgres_insert_denomination_revocation; + plugin->get_denomination_revocation + = &postgres_get_denomination_revocation; + plugin->select_deposits_missing_wire + = &postgres_select_deposits_missing_wire; return plugin; } |