From 5a9d7ac8356e8c44d59de64b55a2df8f7b619e82 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 18 Mar 2017 02:40:27 +0100 Subject: [PATCH] adjust exchangedb plugin to provide wire_out tracking API --- .../perf_taler_exchangedb_interpreter.c | 4 +- src/exchangedb/plugin_exchangedb_postgres.c | 145 ++++++++--- src/exchangedb/test_exchangedb.c | 242 +++++++++++------- src/include/taler_exchangedb_plugin.h | 60 ++++- 4 files changed, 309 insertions(+), 142 deletions(-) diff --git a/src/exchangedb/perf_taler_exchangedb_interpreter.c b/src/exchangedb/perf_taler_exchangedb_interpreter.c index b869ad413..26c2764f8 100644 --- a/src/exchangedb/perf_taler_exchangedb_interpreter.c +++ b/src/exchangedb/perf_taler_exchangedb_interpreter.c @@ -1692,7 +1692,7 @@ interpret (struct PERF_TALER_EXCHANGEDB_interpreter_state *state) */ int PERF_TALER_EXCHANGEDB_interpret (struct TALER_EXCHANGEDB_Plugin *db_plugin, - struct PERF_TALER_EXCHANGEDB_Cmd cmd[]) + struct PERF_TALER_EXCHANGEDB_Cmd cmd[]) { int ret; struct PERF_TALER_EXCHANGEDB_interpreter_state state = @@ -1702,6 +1702,8 @@ PERF_TALER_EXCHANGEDB_interpret (struct TALER_EXCHANGEDB_Plugin *db_plugin, if (GNUNET_SYSERR == ret) return ret; state.session = db_plugin->get_session (db_plugin->cls); + if (NULL == state.session) + return GNUNET_SYSERR; GNUNET_assert (NULL != state.session); ret = interpret (&state); cmd_clean (cmd); diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index b00bc7bc1..b7a3b5f70 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -215,6 +215,8 @@ postgres_drop_tables (void *cls) "DROP TABLE IF EXISTS prewire;"); SQLEXEC_ (conn, "DROP TABLE IF EXISTS aggregation_tracking;"); + SQLEXEC_ (conn, + "DROP TABLE IF EXISTS wire_out;"); SQLEXEC_ (conn, "DROP TABLE IF EXISTS wire_fee;"); SQLEXEC_ (conn, @@ -503,6 +505,17 @@ postgres_create_tables (void *cls) SQLEXEC_INDEX("CREATE INDEX prepare_iteration_index " "ON prewire(type,finished)"); + /* This table contains the data for + wire transfers the exchange has executed. */ + SQLEXEC("CREATE TABLE IF NOT EXISTS wire_out " + "(wireout_uuid BIGSERIAL PRIMARY KEY" + ",execution_date INT8 NOT NULL" + ",wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")" + ",wire_target TEXT NOT NULL" + ",amount_val INT8 NOT NULL" + ",amount_frac INT4 NOT NULL" + ",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL" + ")"); #undef SQLEXEC #undef SQLEXEC_INDEX @@ -1291,6 +1304,18 @@ postgres_prepare (PGconn *db_conn) "($1, $2, $3, $4, $5, $6, $7)", 7, NULL); + /* Used in #postgres_store_wire_transfer_out */ + PREPARE ("insert_wire_out", + "INSERT INTO wire_out " + "(execution_date" + ",wtid_raw" + ",wire_target" + ",amount_val" + ",amount_frac" + ",amount_curr" + ") VALUES " + "($1, $2, $3, $4, $5, $6)", + 6, NULL); /* Used in #postgres_wire_prepare_data_insert() to store wire transfer information before actually committing it with the bank */ @@ -1328,16 +1353,19 @@ postgres_prepare (PGconn *db_conn) " WHERE finished=true", 0, NULL); - /* Used in #postgres_select_prepare_above_serial_id() */ + /* Used in #postgres_select_wire__out_above_serial_id() */ PREPARE ("audit_get_wire_incr", "SELECT" - " type" - ",buf" - ",finished" - ",prewire_uuid" - " FROM prewire" - " WHERE prewire_uuid>=$1" - " ORDER BY prewire_uuid ASC", + " wireout_uuid" + ",execution_date" + ",wtid_raw" + ",wire_target" + ",amount_val" + ",amount_frac" + ",amount_curr" + " FROM wire_out" + " WHERE wireout_uuid>=$1" + " ORDER BY wireout_uuid ASC", 1, NULL); PREPARE ("gc_denominations", @@ -4605,6 +4633,49 @@ postgres_wire_prepare_data_get (void *cls, } +/** + * Store information about an outgoing wire transfer that was executed. + * + * @param cls closure + * @param session database connection + * @param date time of the wire transfer + * @param wtid subject of the wire transfer + * @param wire details about the receiver account of the wire transfer + * @param amount amount that was transmitted + * @return #GNUNET_OK on success + * #GNUNET_SYSERR on DB errors + */ +static int +postgres_store_wire_transfer_out (void *cls, + struct TALER_EXCHANGEDB_Session *session, + struct GNUNET_TIME_Absolute date, + const struct TALER_WireTransferIdentifierRawP *wtid, + const json_t *wire, + const struct TALER_Amount *amount) +{ + PGresult *result; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_absolute_time (&date), + GNUNET_PQ_query_param_auto_from_type (wtid), + TALER_PQ_query_param_json (wire), + TALER_PQ_query_param_amount (amount), + GNUNET_PQ_query_param_end + }; + + result = GNUNET_PQ_exec_prepared (session->conn, + "insert_wire_out", + params); + if (PGRES_COMMAND_OK != PQresultStatus (result)) + { + BREAK_DB_ERR (result); + PQclear (result); + return GNUNET_SYSERR; + } + PQclear (result); + return GNUNET_OK; +} + + /** * Function called to perform "garbage collection" on the * database, expiring records we no longer require. @@ -5166,7 +5237,7 @@ postgres_select_reserves_out_above_serial_id (void *cls, /** * Function called to select all wire transfers the exchange - * executed or plans to execute. + * executed. * * @param cls closure * @param session database connection @@ -5178,13 +5249,12 @@ postgres_select_reserves_out_above_serial_id (void *cls, * #GNUNET_SYSERR on DB errors */ static int -postgres_select_prepare_above_serial_id (void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_WirePreparationCallback cb, - void *cb_cls) +postgres_select_wire_out_above_serial_id (void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_WireTransferOutCallback cb, + void *cb_cls) { - struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_end @@ -5201,7 +5271,6 @@ postgres_select_prepare_above_serial_id (void *cls, return GNUNET_SYSERR; } int nrows; - int i; nrows = PQntuples (result); if (0 == nrows) @@ -5211,24 +5280,25 @@ postgres_select_prepare_above_serial_id (void *cls, PQclear (result); return GNUNET_NO; } - for (i=0;iwire_prepare_data_insert = &postgres_wire_prepare_data_insert; plugin->wire_prepare_data_mark_finished = &postgres_wire_prepare_data_mark_finished; plugin->wire_prepare_data_get = &postgres_wire_prepare_data_get; + plugin->store_wire_transfer_out = &postgres_store_wire_transfer_out; plugin->gc = &postgres_gc; plugin->select_deposits_above_serial_id = &postgres_select_deposits_above_serial_id; plugin->select_refreshs_above_serial_id = &postgres_select_refreshs_above_serial_id; plugin->select_refunds_above_serial_id = &postgres_select_refunds_above_serial_id; plugin->select_reserves_in_above_serial_id = &postgres_select_reserves_in_above_serial_id; plugin->select_reserves_out_above_serial_id = &postgres_select_reserves_out_above_serial_id; - plugin->select_prepare_above_serial_id = &postgres_select_prepare_above_serial_id; + plugin->select_wire_out_above_serial_id = &postgres_select_wire_out_above_serial_id; return plugin; } diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 2097b0aa5..fba162592 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -79,13 +79,6 @@ dead_prepare_cb (void *cls, } -/** - * Counter used in auditor-related db functions. Used to count - * expected rows. - */ -unsigned int auditor_row_cnt; - - /** * Callback that is called with wire prepare data * and then marks it as finished. @@ -111,27 +104,6 @@ mark_prepare_cb (void *cls, rowid)); } -/** - * Callback with data about a prepared wire transfer. - * - * @param cls closure - * @param rowid row identifier used to mark prepared transaction as done - * @param wire_method which wire method is this preparation data for - * @param buf transaction data that was persisted, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - * @param finished did we complete the transfer yet? - */ -void -audit_wire_cb (void *cls, - uint64_t rowid, - const char *wire_method, - const char *buf, - size_t buf_size, - int finished) -{ - auditor_row_cnt++; - return; -} /** * Test API relating to persisting the wire plugins preparation data. @@ -163,14 +135,6 @@ test_wire_prepare (struct TALER_EXCHANGEDB_Session *session) session, &dead_prepare_cb, NULL)); - auditor_row_cnt = 0; - FAILIF (GNUNET_OK != - plugin->select_prepare_above_serial_id (plugin->cls, - session, - 0, - &audit_wire_cb, - NULL)); - FAILIF (1 != auditor_row_cnt); return GNUNET_OK; drop: return GNUNET_SYSERR; @@ -503,6 +467,13 @@ check_transfer_data (void *cls, } +/** + * Counter used in auditor-related db functions. Used to count + * expected rows. + */ +static unsigned int auditor_row_cnt; + + /** * Function called with details about coins that were melted, * with the goal of auditing the refresh's execution. @@ -532,6 +503,7 @@ audit_refresh_session_cb (void *cls, return GNUNET_OK; } + /** * Function to test melting of coins as part of a refresh session * @@ -1211,6 +1183,142 @@ test_wire_fees (struct TALER_EXCHANGEDB_Session *session) } +static struct GNUNET_TIME_Absolute wire_out_date; + +static struct TALER_WireTransferIdentifierRawP wire_out_wtid; + +static json_t *wire_out_account; + +static struct TALER_Amount wire_out_amount; + + +/** + * Callback with data about an executed wire transfer. + * + * @param cls closure + * @param rowid identifier of the respective row in the database + * @param date timestamp of the wire transfer (roughly) + * @param wtid wire transfer subject + * @param wire wire transfer details of the receiver + * @param amount amount that was wired + */ +static void +audit_wire_cb (void *cls, + uint64_t rowid, + struct GNUNET_TIME_Absolute date, + const struct TALER_WireTransferIdentifierRawP *wtid, + const json_t *wire, + const struct TALER_Amount *amount) +{ + auditor_row_cnt++; + GNUNET_assert (0 == + TALER_amount_cmp (amount, + &wire_out_amount)); + GNUNET_assert (0 == + memcmp (wtid, + &wire_out_wtid, + sizeof (*wtid))); + GNUNET_assert (date.abs_value_us == wire_out_date.abs_value_us); +} + + +/** + * Test API relating to wire_out handling. + * + * @param session database session to use for the test + * @return #GNUNET_OK on success + */ +static int +test_wire_out (struct TALER_EXCHANGEDB_Session *session, + const struct TALER_EXCHANGEDB_Deposit *deposit) +{ + auditor_row_cnt = 0; + memset (&wire_out_wtid, 42, sizeof (wire_out_wtid)); + wire_out_date = GNUNET_TIME_absolute_get (); + (void) GNUNET_TIME_round_abs (&wire_out_date); + wire_out_account = json_loads ("{ \"account\":\"1\" }", 0, NULL); + GNUNET_assert (NULL != wire_out_account); + GNUNET_assert (GNUNET_OK == + TALER_string_to_amount (CURRENCY ":1", + &wire_out_amount)); + FAILIF (GNUNET_OK != + plugin->store_wire_transfer_out (plugin->cls, + session, + wire_out_date, + &wire_out_wtid, + wire_out_account, + &wire_out_amount)); + FAILIF (GNUNET_OK != + plugin->select_wire_out_above_serial_id (plugin->cls, + session, + 0, + &audit_wire_cb, + NULL)); + FAILIF (1 != auditor_row_cnt); + + /* setup values for wire transfer aggregation data */ + merchant_pub_wt = deposit->merchant_pub; + h_wire_wt = deposit->h_wire; + h_proposal_data_wt = deposit->h_proposal_data; + coin_pub_wt = deposit->coin.coin_pub; + execution_time_wt = GNUNET_TIME_absolute_get (); + coin_value_wt = deposit->amount_with_fee; + coin_fee_wt = fee_deposit; + GNUNET_assert (GNUNET_OK == + TALER_amount_subtract (&transfer_value_wt, + &coin_value_wt, + &coin_fee_wt)); + FAILIF (GNUNET_NO != + plugin->lookup_wire_transfer (plugin->cls, + session, + &wtid_wt, + &cb_wt_never, + NULL)); + + { + struct GNUNET_HashCode h_proposal_data_wt2 = h_proposal_data_wt; + + h_proposal_data_wt2.bits[0]++; + FAILIF (GNUNET_NO != + plugin->wire_lookup_deposit_wtid (plugin->cls, + session, + &h_proposal_data_wt2, + &h_wire_wt, + &coin_pub_wt, + &merchant_pub_wt, + &cb_wtid_never, + NULL)); + } + /* insert WT data */ + FAILIF (GNUNET_OK != + plugin->insert_aggregation_tracking (plugin->cls, + session, + &wtid_wt, + deposit_rowid, + execution_time_wt)); + FAILIF (GNUNET_OK != + plugin->lookup_wire_transfer (plugin->cls, + session, + &wtid_wt, + &cb_wt_check, + &cb_wt_never)); + FAILIF (GNUNET_OK != + plugin->wire_lookup_deposit_wtid (plugin->cls, + session, + &h_proposal_data_wt, + &h_wire_wt, + &coin_pub_wt, + &merchant_pub_wt, + &cb_wtid_check, + &cb_wtid_never)); + + + return GNUNET_OK; + drop: + return GNUNET_SYSERR; +} + + /** * Main function that will be run by the scheduler. * @@ -1234,7 +1342,6 @@ run (void *cls) struct TALER_EXCHANGEDB_Refund refund; struct TALER_EXCHANGEDB_TransactionList *tl; struct TALER_EXCHANGEDB_TransactionList *tlp; - struct TALER_WireTransferIdentifierRawP wtid; json_t *wire; json_t *just; json_t *sndr; @@ -1296,7 +1403,9 @@ run (void *cls) result = 4; sndr = json_loads ("{ \"account\":\"1\" }", 0, NULL); + GNUNET_assert (NULL != sndr); just = json_loads ("{ \"justification\":\"1\" }", 0, NULL); + GNUNET_assert (NULL != just); FAILIF (GNUNET_OK != plugin->reserves_in_insert (plugin->cls, session, @@ -1631,64 +1740,11 @@ run (void *cls) plugin->free_coin_transaction_list (plugin->cls, tl); - FAILIF (GNUNET_OK != test_wire_prepare (session)); - - /* setup values for wire transfer aggregation data */ - memset (&wtid, 42, sizeof (wtid)); - merchant_pub_wt = deposit.merchant_pub; - h_wire_wt = deposit.h_wire; - h_proposal_data_wt = deposit.h_proposal_data; - coin_pub_wt = deposit.coin.coin_pub; - execution_time_wt = GNUNET_TIME_absolute_get (); - coin_value_wt = deposit.amount_with_fee; - coin_fee_wt = fee_deposit; - GNUNET_assert (GNUNET_OK == - TALER_amount_subtract (&transfer_value_wt, - &coin_value_wt, - &coin_fee_wt)); - FAILIF (GNUNET_NO != - plugin->lookup_wire_transfer (plugin->cls, - session, - &wtid_wt, - &cb_wt_never, - NULL)); - - { - struct GNUNET_HashCode h_proposal_data_wt2 = h_proposal_data_wt; - - h_proposal_data_wt2.bits[0]++; - FAILIF (GNUNET_NO != - plugin->wire_lookup_deposit_wtid (plugin->cls, - session, - &h_proposal_data_wt2, - &h_wire_wt, - &coin_pub_wt, - &merchant_pub_wt, - &cb_wtid_never, - NULL)); - } - /* insert WT data */ FAILIF (GNUNET_OK != - plugin->insert_aggregation_tracking (plugin->cls, - session, - &wtid_wt, - deposit_rowid, - execution_time_wt)); + test_wire_prepare (session)); FAILIF (GNUNET_OK != - plugin->lookup_wire_transfer (plugin->cls, - session, - &wtid_wt, - &cb_wt_check, - &cb_wt_never)); - FAILIF (GNUNET_OK != - plugin->wire_lookup_deposit_wtid (plugin->cls, - session, - &h_proposal_data_wt, - &h_wire_wt, - &coin_pub_wt, - &merchant_pub_wt, - &cb_wtid_check, - &cb_wtid_never)); + test_wire_out (session, + &deposit)); FAILIF (GNUNET_OK != test_gc (session)); FAILIF (GNUNET_OK != diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 34092789e..ef49074e4 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -780,6 +780,26 @@ typedef void const struct TALER_Amount *coin_fee); +/** + * Function called with the results of the lookup of the + * wire transfer data of the exchange. + * + * @param cls closure + * @param rowid identifier of the respective row in the database + * @param date timestamp of the wire transfer (roughly) + * @param wtid wire transfer subject + * @param wire wire transfer details of the receiver + * @param amount amount that was wired + */ +typedef void +(*TALER_EXCHANGEDB_WireTransferOutCallback)(void *cls, + uint64_t rowid, + struct GNUNET_TIME_Absolute date, + const struct TALER_WireTransferIdentifierRawP *wtid, + const json_t *wire, + const struct TALER_Amount *amount); + + /** * Callback with data about a prepared wire transfer. * @@ -1619,6 +1639,27 @@ struct TALER_EXCHANGEDB_Plugin void *cb_cls); + /** + * Store information about an outgoing wire transfer that was executed. + * + * @param cls closure + * @param session database connection + * @param date time of the wire transfer + * @param wtid subject of the wire transfer + * @param wire details about the receiver account of the wire transfer + * @param amount amount that was transmitted + * @return #GNUNET_OK on success + * #GNUNET_SYSERR on DB errors + */ + int + (*store_wire_transfer_out)(void *cls, + struct TALER_EXCHANGEDB_Session *session, + struct GNUNET_TIME_Absolute date, + const struct TALER_WireTransferIdentifierRawP *wtid, + const json_t *wire, + const struct TALER_Amount *amount); + + /** * Function called to perform "garbage collection" on the * database, expiring records we no longer require. @@ -1738,15 +1779,12 @@ struct TALER_EXCHANGEDB_Plugin /** - * FIXME: this is NOT the API we want here, as we cannot exactly determine the - * important WTID from the callback! - * - * Function called to select all wire transfers the exchange - * executed or plans to execute. + * Function called to select outgoing wire transfers the exchange + * executed, ordered by serial ID (monotonically increasing). * * @param cls closure * @param session database connection - * @param serial_id highest serial ID to exclude (select strictly larger) + * @param serial_id lowest serial ID to include (select larger or equal) * @param cb function to call for ONE unfinished item * @param cb_cls closure for @a cb * @return #GNUNET_OK on success, @@ -1754,11 +1792,11 @@ struct TALER_EXCHANGEDB_Plugin * #GNUNET_SYSERR on DB errors */ int - (*select_prepare_above_serial_id)(void *cls, - struct TALER_EXCHANGEDB_Session *session, - uint64_t serial_id, - TALER_EXCHANGEDB_WirePreparationCallback cb, - void *cb_cls); + (*select_wire_out_above_serial_id)(void *cls, + struct TALER_EXCHANGEDB_Session *session, + uint64_t serial_id, + TALER_EXCHANGEDB_WireTransferOutCallback cb, + void *cb_cls); };