adjust exchangedb plugin to provide wire_out tracking API

This commit is contained in:
Christian Grothoff 2017-03-18 02:40:27 +01:00
parent 543b4f7a7c
commit 5a9d7ac835
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
4 changed files with 309 additions and 142 deletions

View File

@ -1702,6 +1702,8 @@ PERF_TALER_EXCHANGEDB_interpret (struct TALER_EXCHANGEDB_Plugin *db_plugin,
if (GNUNET_SYSERR == ret) if (GNUNET_SYSERR == ret)
return ret; return ret;
state.session = db_plugin->get_session (db_plugin->cls); state.session = db_plugin->get_session (db_plugin->cls);
if (NULL == state.session)
return GNUNET_SYSERR;
GNUNET_assert (NULL != state.session); GNUNET_assert (NULL != state.session);
ret = interpret (&state); ret = interpret (&state);
cmd_clean (cmd); cmd_clean (cmd);

View File

@ -215,6 +215,8 @@ postgres_drop_tables (void *cls)
"DROP TABLE IF EXISTS prewire;"); "DROP TABLE IF EXISTS prewire;");
SQLEXEC_ (conn, SQLEXEC_ (conn,
"DROP TABLE IF EXISTS aggregation_tracking;"); "DROP TABLE IF EXISTS aggregation_tracking;");
SQLEXEC_ (conn,
"DROP TABLE IF EXISTS wire_out;");
SQLEXEC_ (conn, SQLEXEC_ (conn,
"DROP TABLE IF EXISTS wire_fee;"); "DROP TABLE IF EXISTS wire_fee;");
SQLEXEC_ (conn, SQLEXEC_ (conn,
@ -503,6 +505,17 @@ postgres_create_tables (void *cls)
SQLEXEC_INDEX("CREATE INDEX prepare_iteration_index " SQLEXEC_INDEX("CREATE INDEX prepare_iteration_index "
"ON prewire(type,finished)"); "ON prewire(type,finished)");
/* This table contains the data for
wire transfers the exchange has executed. */
SQLEXEC("CREATE TABLE IF NOT EXISTS wire_out "
"(wireout_uuid BIGSERIAL PRIMARY KEY"
",execution_date INT8 NOT NULL"
",wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")"
",wire_target TEXT NOT NULL"
",amount_val INT8 NOT NULL"
",amount_frac INT4 NOT NULL"
",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
")");
#undef SQLEXEC #undef SQLEXEC
#undef SQLEXEC_INDEX #undef SQLEXEC_INDEX
@ -1291,6 +1304,18 @@ postgres_prepare (PGconn *db_conn)
"($1, $2, $3, $4, $5, $6, $7)", "($1, $2, $3, $4, $5, $6, $7)",
7, NULL); 7, NULL);
/* Used in #postgres_store_wire_transfer_out */
PREPARE ("insert_wire_out",
"INSERT INTO wire_out "
"(execution_date"
",wtid_raw"
",wire_target"
",amount_val"
",amount_frac"
",amount_curr"
") VALUES "
"($1, $2, $3, $4, $5, $6)",
6, NULL);
/* Used in #postgres_wire_prepare_data_insert() to store /* Used in #postgres_wire_prepare_data_insert() to store
wire transfer information before actually committing it with the bank */ wire transfer information before actually committing it with the bank */
@ -1328,16 +1353,19 @@ postgres_prepare (PGconn *db_conn)
" WHERE finished=true", " WHERE finished=true",
0, NULL); 0, NULL);
/* Used in #postgres_select_prepare_above_serial_id() */ /* Used in #postgres_select_wire__out_above_serial_id() */
PREPARE ("audit_get_wire_incr", PREPARE ("audit_get_wire_incr",
"SELECT" "SELECT"
" type" " wireout_uuid"
",buf" ",execution_date"
",finished" ",wtid_raw"
",prewire_uuid" ",wire_target"
" FROM prewire" ",amount_val"
" WHERE prewire_uuid>=$1" ",amount_frac"
" ORDER BY prewire_uuid ASC", ",amount_curr"
" FROM wire_out"
" WHERE wireout_uuid>=$1"
" ORDER BY wireout_uuid ASC",
1, NULL); 1, NULL);
PREPARE ("gc_denominations", PREPARE ("gc_denominations",
@ -4605,6 +4633,49 @@ postgres_wire_prepare_data_get (void *cls,
} }
/**
* Store information about an outgoing wire transfer that was executed.
*
* @param cls closure
* @param session database connection
* @param date time of the wire transfer
* @param wtid subject of the wire transfer
* @param wire details about the receiver account of the wire transfer
* @param amount amount that was transmitted
* @return #GNUNET_OK on success
* #GNUNET_SYSERR on DB errors
*/
static int
postgres_store_wire_transfer_out (void *cls,
struct TALER_EXCHANGEDB_Session *session,
struct GNUNET_TIME_Absolute date,
const struct TALER_WireTransferIdentifierRawP *wtid,
const json_t *wire,
const struct TALER_Amount *amount)
{
PGresult *result;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_absolute_time (&date),
GNUNET_PQ_query_param_auto_from_type (wtid),
TALER_PQ_query_param_json (wire),
TALER_PQ_query_param_amount (amount),
GNUNET_PQ_query_param_end
};
result = GNUNET_PQ_exec_prepared (session->conn,
"insert_wire_out",
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
BREAK_DB_ERR (result);
PQclear (result);
return GNUNET_SYSERR;
}
PQclear (result);
return GNUNET_OK;
}
/** /**
* Function called to perform "garbage collection" on the * Function called to perform "garbage collection" on the
* database, expiring records we no longer require. * database, expiring records we no longer require.
@ -5166,7 +5237,7 @@ postgres_select_reserves_out_above_serial_id (void *cls,
/** /**
* Function called to select all wire transfers the exchange * Function called to select all wire transfers the exchange
* executed or plans to execute. * executed.
* *
* @param cls closure * @param cls closure
* @param session database connection * @param session database connection
@ -5178,13 +5249,12 @@ postgres_select_reserves_out_above_serial_id (void *cls,
* #GNUNET_SYSERR on DB errors * #GNUNET_SYSERR on DB errors
*/ */
static int static int
postgres_select_prepare_above_serial_id (void *cls, postgres_select_wire_out_above_serial_id (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
uint64_t serial_id, uint64_t serial_id,
TALER_EXCHANGEDB_WirePreparationCallback cb, TALER_EXCHANGEDB_WireTransferOutCallback cb,
void *cb_cls) void *cb_cls)
{ {
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint64 (&serial_id), GNUNET_PQ_query_param_uint64 (&serial_id),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
@ -5201,7 +5271,6 @@ postgres_select_prepare_above_serial_id (void *cls,
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
int nrows; int nrows;
int i;
nrows = PQntuples (result); nrows = PQntuples (result);
if (0 == nrows) if (0 == nrows)
@ -5211,24 +5280,25 @@ postgres_select_prepare_above_serial_id (void *cls,
PQclear (result); PQclear (result);
return GNUNET_NO; return GNUNET_NO;
} }
for (i=0;i<nrows;i++) for (int i=0;i<nrows;i++)
{ {
char *wire_method; uint64_t rowid;
void *buf; struct GNUNET_TIME_Absolute date;
size_t buf_size; struct TALER_WireTransferIdentifierRawP wtid;
uint8_t finished; json_t *wire;
uint64_t uuid; struct TALER_Amount amount;
struct GNUNET_PQ_ResultSpec rs[] = { struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_string ("type", GNUNET_PQ_result_spec_uint64 ("wireout_uuid",
&wire_method), &rowid),
GNUNET_PQ_result_spec_variable_size ("buf", GNUNET_PQ_result_spec_absolute_time ("execution_date",
&buf, &date),
&buf_size), GNUNET_PQ_result_spec_auto_from_type ("wtid_raw",
GNUNET_PQ_result_spec_auto_from_type ("finished", &wtid),
&finished), TALER_PQ_result_spec_json ("wire_target",
GNUNET_PQ_result_spec_uint64 ("prewire_uuid", &wire),
&uuid), TALER_PQ_result_spec_amount ("amount",
&amount),
GNUNET_PQ_result_spec_end GNUNET_PQ_result_spec_end
}; };
@ -5243,11 +5313,11 @@ postgres_select_prepare_above_serial_id (void *cls,
} }
cb (cb_cls, cb (cb_cls,
uuid, rowid,
wire_method, date,
buf, &wtid,
buf_size, wire,
finished); &amount);
GNUNET_PQ_cleanup_result (rs); GNUNET_PQ_cleanup_result (rs);
} }
@ -5347,13 +5417,14 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->wire_prepare_data_insert = &postgres_wire_prepare_data_insert; plugin->wire_prepare_data_insert = &postgres_wire_prepare_data_insert;
plugin->wire_prepare_data_mark_finished = &postgres_wire_prepare_data_mark_finished; plugin->wire_prepare_data_mark_finished = &postgres_wire_prepare_data_mark_finished;
plugin->wire_prepare_data_get = &postgres_wire_prepare_data_get; plugin->wire_prepare_data_get = &postgres_wire_prepare_data_get;
plugin->store_wire_transfer_out = &postgres_store_wire_transfer_out;
plugin->gc = &postgres_gc; plugin->gc = &postgres_gc;
plugin->select_deposits_above_serial_id = &postgres_select_deposits_above_serial_id; plugin->select_deposits_above_serial_id = &postgres_select_deposits_above_serial_id;
plugin->select_refreshs_above_serial_id = &postgres_select_refreshs_above_serial_id; plugin->select_refreshs_above_serial_id = &postgres_select_refreshs_above_serial_id;
plugin->select_refunds_above_serial_id = &postgres_select_refunds_above_serial_id; plugin->select_refunds_above_serial_id = &postgres_select_refunds_above_serial_id;
plugin->select_reserves_in_above_serial_id = &postgres_select_reserves_in_above_serial_id; plugin->select_reserves_in_above_serial_id = &postgres_select_reserves_in_above_serial_id;
plugin->select_reserves_out_above_serial_id = &postgres_select_reserves_out_above_serial_id; plugin->select_reserves_out_above_serial_id = &postgres_select_reserves_out_above_serial_id;
plugin->select_prepare_above_serial_id = &postgres_select_prepare_above_serial_id; plugin->select_wire_out_above_serial_id = &postgres_select_wire_out_above_serial_id;
return plugin; return plugin;
} }

View File

@ -79,13 +79,6 @@ dead_prepare_cb (void *cls,
} }
/**
* Counter used in auditor-related db functions. Used to count
* expected rows.
*/
unsigned int auditor_row_cnt;
/** /**
* Callback that is called with wire prepare data * Callback that is called with wire prepare data
* and then marks it as finished. * and then marks it as finished.
@ -111,27 +104,6 @@ mark_prepare_cb (void *cls,
rowid)); rowid));
} }
/**
* Callback with data about a prepared wire transfer.
*
* @param cls closure
* @param rowid row identifier used to mark prepared transaction as done
* @param wire_method which wire method is this preparation data for
* @param buf transaction data that was persisted, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error
* @param finished did we complete the transfer yet?
*/
void
audit_wire_cb (void *cls,
uint64_t rowid,
const char *wire_method,
const char *buf,
size_t buf_size,
int finished)
{
auditor_row_cnt++;
return;
}
/** /**
* Test API relating to persisting the wire plugins preparation data. * Test API relating to persisting the wire plugins preparation data.
@ -163,14 +135,6 @@ test_wire_prepare (struct TALER_EXCHANGEDB_Session *session)
session, session,
&dead_prepare_cb, &dead_prepare_cb,
NULL)); NULL));
auditor_row_cnt = 0;
FAILIF (GNUNET_OK !=
plugin->select_prepare_above_serial_id (plugin->cls,
session,
0,
&audit_wire_cb,
NULL));
FAILIF (1 != auditor_row_cnt);
return GNUNET_OK; return GNUNET_OK;
drop: drop:
return GNUNET_SYSERR; return GNUNET_SYSERR;
@ -503,6 +467,13 @@ check_transfer_data (void *cls,
} }
/**
* Counter used in auditor-related db functions. Used to count
* expected rows.
*/
static unsigned int auditor_row_cnt;
/** /**
* Function called with details about coins that were melted, * Function called with details about coins that were melted,
* with the goal of auditing the refresh's execution. * with the goal of auditing the refresh's execution.
@ -532,6 +503,7 @@ audit_refresh_session_cb (void *cls,
return GNUNET_OK; return GNUNET_OK;
} }
/** /**
* Function to test melting of coins as part of a refresh session * Function to test melting of coins as part of a refresh session
* *
@ -1211,6 +1183,142 @@ test_wire_fees (struct TALER_EXCHANGEDB_Session *session)
} }
static struct GNUNET_TIME_Absolute wire_out_date;
static struct TALER_WireTransferIdentifierRawP wire_out_wtid;
static json_t *wire_out_account;
static struct TALER_Amount wire_out_amount;
/**
* Callback with data about an executed wire transfer.
*
* @param cls closure
* @param rowid identifier of the respective row in the database
* @param date timestamp of the wire transfer (roughly)
* @param wtid wire transfer subject
* @param wire wire transfer details of the receiver
* @param amount amount that was wired
*/
static void
audit_wire_cb (void *cls,
uint64_t rowid,
struct GNUNET_TIME_Absolute date,
const struct TALER_WireTransferIdentifierRawP *wtid,
const json_t *wire,
const struct TALER_Amount *amount)
{
auditor_row_cnt++;
GNUNET_assert (0 ==
TALER_amount_cmp (amount,
&wire_out_amount));
GNUNET_assert (0 ==
memcmp (wtid,
&wire_out_wtid,
sizeof (*wtid)));
GNUNET_assert (date.abs_value_us == wire_out_date.abs_value_us);
}
/**
* Test API relating to wire_out handling.
*
* @param session database session to use for the test
* @return #GNUNET_OK on success
*/
static int
test_wire_out (struct TALER_EXCHANGEDB_Session *session,
const struct TALER_EXCHANGEDB_Deposit *deposit)
{
auditor_row_cnt = 0;
memset (&wire_out_wtid, 42, sizeof (wire_out_wtid));
wire_out_date = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&wire_out_date);
wire_out_account = json_loads ("{ \"account\":\"1\" }", 0, NULL);
GNUNET_assert (NULL != wire_out_account);
GNUNET_assert (GNUNET_OK ==
TALER_string_to_amount (CURRENCY ":1",
&wire_out_amount));
FAILIF (GNUNET_OK !=
plugin->store_wire_transfer_out (plugin->cls,
session,
wire_out_date,
&wire_out_wtid,
wire_out_account,
&wire_out_amount));
FAILIF (GNUNET_OK !=
plugin->select_wire_out_above_serial_id (plugin->cls,
session,
0,
&audit_wire_cb,
NULL));
FAILIF (1 != auditor_row_cnt);
/* setup values for wire transfer aggregation data */
merchant_pub_wt = deposit->merchant_pub;
h_wire_wt = deposit->h_wire;
h_proposal_data_wt = deposit->h_proposal_data;
coin_pub_wt = deposit->coin.coin_pub;
execution_time_wt = GNUNET_TIME_absolute_get ();
coin_value_wt = deposit->amount_with_fee;
coin_fee_wt = fee_deposit;
GNUNET_assert (GNUNET_OK ==
TALER_amount_subtract (&transfer_value_wt,
&coin_value_wt,
&coin_fee_wt));
FAILIF (GNUNET_NO !=
plugin->lookup_wire_transfer (plugin->cls,
session,
&wtid_wt,
&cb_wt_never,
NULL));
{
struct GNUNET_HashCode h_proposal_data_wt2 = h_proposal_data_wt;
h_proposal_data_wt2.bits[0]++;
FAILIF (GNUNET_NO !=
plugin->wire_lookup_deposit_wtid (plugin->cls,
session,
&h_proposal_data_wt2,
&h_wire_wt,
&coin_pub_wt,
&merchant_pub_wt,
&cb_wtid_never,
NULL));
}
/* insert WT data */
FAILIF (GNUNET_OK !=
plugin->insert_aggregation_tracking (plugin->cls,
session,
&wtid_wt,
deposit_rowid,
execution_time_wt));
FAILIF (GNUNET_OK !=
plugin->lookup_wire_transfer (plugin->cls,
session,
&wtid_wt,
&cb_wt_check,
&cb_wt_never));
FAILIF (GNUNET_OK !=
plugin->wire_lookup_deposit_wtid (plugin->cls,
session,
&h_proposal_data_wt,
&h_wire_wt,
&coin_pub_wt,
&merchant_pub_wt,
&cb_wtid_check,
&cb_wtid_never));
return GNUNET_OK;
drop:
return GNUNET_SYSERR;
}
/** /**
* Main function that will be run by the scheduler. * Main function that will be run by the scheduler.
* *
@ -1234,7 +1342,6 @@ run (void *cls)
struct TALER_EXCHANGEDB_Refund refund; struct TALER_EXCHANGEDB_Refund refund;
struct TALER_EXCHANGEDB_TransactionList *tl; struct TALER_EXCHANGEDB_TransactionList *tl;
struct TALER_EXCHANGEDB_TransactionList *tlp; struct TALER_EXCHANGEDB_TransactionList *tlp;
struct TALER_WireTransferIdentifierRawP wtid;
json_t *wire; json_t *wire;
json_t *just; json_t *just;
json_t *sndr; json_t *sndr;
@ -1296,7 +1403,9 @@ run (void *cls)
result = 4; result = 4;
sndr = json_loads ("{ \"account\":\"1\" }", 0, NULL); sndr = json_loads ("{ \"account\":\"1\" }", 0, NULL);
GNUNET_assert (NULL != sndr);
just = json_loads ("{ \"justification\":\"1\" }", 0, NULL); just = json_loads ("{ \"justification\":\"1\" }", 0, NULL);
GNUNET_assert (NULL != just);
FAILIF (GNUNET_OK != FAILIF (GNUNET_OK !=
plugin->reserves_in_insert (plugin->cls, plugin->reserves_in_insert (plugin->cls,
session, session,
@ -1631,64 +1740,11 @@ run (void *cls)
plugin->free_coin_transaction_list (plugin->cls, plugin->free_coin_transaction_list (plugin->cls,
tl); tl);
FAILIF (GNUNET_OK != test_wire_prepare (session));
/* setup values for wire transfer aggregation data */
memset (&wtid, 42, sizeof (wtid));
merchant_pub_wt = deposit.merchant_pub;
h_wire_wt = deposit.h_wire;
h_proposal_data_wt = deposit.h_proposal_data;
coin_pub_wt = deposit.coin.coin_pub;
execution_time_wt = GNUNET_TIME_absolute_get ();
coin_value_wt = deposit.amount_with_fee;
coin_fee_wt = fee_deposit;
GNUNET_assert (GNUNET_OK ==
TALER_amount_subtract (&transfer_value_wt,
&coin_value_wt,
&coin_fee_wt));
FAILIF (GNUNET_NO !=
plugin->lookup_wire_transfer (plugin->cls,
session,
&wtid_wt,
&cb_wt_never,
NULL));
{
struct GNUNET_HashCode h_proposal_data_wt2 = h_proposal_data_wt;
h_proposal_data_wt2.bits[0]++;
FAILIF (GNUNET_NO !=
plugin->wire_lookup_deposit_wtid (plugin->cls,
session,
&h_proposal_data_wt2,
&h_wire_wt,
&coin_pub_wt,
&merchant_pub_wt,
&cb_wtid_never,
NULL));
}
/* insert WT data */
FAILIF (GNUNET_OK != FAILIF (GNUNET_OK !=
plugin->insert_aggregation_tracking (plugin->cls, test_wire_prepare (session));
session,
&wtid_wt,
deposit_rowid,
execution_time_wt));
FAILIF (GNUNET_OK != FAILIF (GNUNET_OK !=
plugin->lookup_wire_transfer (plugin->cls, test_wire_out (session,
session, &deposit));
&wtid_wt,
&cb_wt_check,
&cb_wt_never));
FAILIF (GNUNET_OK !=
plugin->wire_lookup_deposit_wtid (plugin->cls,
session,
&h_proposal_data_wt,
&h_wire_wt,
&coin_pub_wt,
&merchant_pub_wt,
&cb_wtid_check,
&cb_wtid_never));
FAILIF (GNUNET_OK != FAILIF (GNUNET_OK !=
test_gc (session)); test_gc (session));
FAILIF (GNUNET_OK != FAILIF (GNUNET_OK !=

View File

@ -780,6 +780,26 @@ typedef void
const struct TALER_Amount *coin_fee); const struct TALER_Amount *coin_fee);
/**
* Function called with the results of the lookup of the
* wire transfer data of the exchange.
*
* @param cls closure
* @param rowid identifier of the respective row in the database
* @param date timestamp of the wire transfer (roughly)
* @param wtid wire transfer subject
* @param wire wire transfer details of the receiver
* @param amount amount that was wired
*/
typedef void
(*TALER_EXCHANGEDB_WireTransferOutCallback)(void *cls,
uint64_t rowid,
struct GNUNET_TIME_Absolute date,
const struct TALER_WireTransferIdentifierRawP *wtid,
const json_t *wire,
const struct TALER_Amount *amount);
/** /**
* Callback with data about a prepared wire transfer. * Callback with data about a prepared wire transfer.
* *
@ -1619,6 +1639,27 @@ struct TALER_EXCHANGEDB_Plugin
void *cb_cls); void *cb_cls);
/**
* Store information about an outgoing wire transfer that was executed.
*
* @param cls closure
* @param session database connection
* @param date time of the wire transfer
* @param wtid subject of the wire transfer
* @param wire details about the receiver account of the wire transfer
* @param amount amount that was transmitted
* @return #GNUNET_OK on success
* #GNUNET_SYSERR on DB errors
*/
int
(*store_wire_transfer_out)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
struct GNUNET_TIME_Absolute date,
const struct TALER_WireTransferIdentifierRawP *wtid,
const json_t *wire,
const struct TALER_Amount *amount);
/** /**
* Function called to perform "garbage collection" on the * Function called to perform "garbage collection" on the
* database, expiring records we no longer require. * database, expiring records we no longer require.
@ -1738,15 +1779,12 @@ struct TALER_EXCHANGEDB_Plugin
/** /**
* FIXME: this is NOT the API we want here, as we cannot exactly determine the * Function called to select outgoing wire transfers the exchange
* important WTID from the callback! * executed, ordered by serial ID (monotonically increasing).
*
* Function called to select all wire transfers the exchange
* executed or plans to execute.
* *
* @param cls closure * @param cls closure
* @param session database connection * @param session database connection
* @param serial_id highest serial ID to exclude (select strictly larger) * @param serial_id lowest serial ID to include (select larger or equal)
* @param cb function to call for ONE unfinished item * @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return #GNUNET_OK on success, * @return #GNUNET_OK on success,
@ -1754,10 +1792,10 @@ struct TALER_EXCHANGEDB_Plugin
* #GNUNET_SYSERR on DB errors * #GNUNET_SYSERR on DB errors
*/ */
int int
(*select_prepare_above_serial_id)(void *cls, (*select_wire_out_above_serial_id)(void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
uint64_t serial_id, uint64_t serial_id,
TALER_EXCHANGEDB_WirePreparationCallback cb, TALER_EXCHANGEDB_WireTransferOutCallback cb,
void *cb_cls); void *cb_cls);
}; };