working on mintdb for #4141

This commit is contained in:
Christian Grothoff 2016-01-27 16:42:24 +01:00
parent 0554fa7691
commit 48c2edc28d
3 changed files with 284 additions and 98 deletions

View File

@ -530,22 +530,24 @@ struct TALER_MINTDB_Session;
* corresponding wire transaction. * corresponding wire transaction.
* *
* @param cls closure * @param cls closure
* @param id transaction ID (used as future `min_id` to avoid * @param rowid unique ID for the deposit in our DB, used for marking
* iterating over transactions more than once) * it as 'tiny' or 'done'
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
* @param amount_with_fee amount that was deposited including fee * @param amount_with_fee amount that was deposited including fee
* @param deposit_fee amount the mint gets to keep as transaction fees * @param deposit_fee amount the mint gets to keep as transaction fees
* @param transaction_id unique transaction ID chosen by the merchant * @param transaction_id unique transaction ID chosen by the merchant
* @param h_contract hash of the contract between merchant and customer * @param h_contract hash of the contract between merchant and customer
* @param wire_deadline by which the merchant adviced that he would like the * @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed * wire transfer to be executed
* @param wire wire details for the merchant * @param wire wire details for the merchant, NULL from iterate_matching_deposits()
* @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
*/ */
typedef int typedef int
(*TALER_MINTDB_DepositIterator)(void *cls, (*TALER_MINTDB_DepositIterator)(void *cls,
// unsigned long long rowid, /* ? */ unsigned long long rowid,
// May also need/want Merchant pub!? const struct TALER_MerchantPublicKeyP *merchant_pub,
uint64_t id, const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *amount_with_fee,
const struct TALER_Amount *deposit_fee, const struct TALER_Amount *deposit_fee,
uint64_t transaction_id, uint64_t transaction_id,
@ -940,10 +942,10 @@ struct TALER_MINTDB_Plugin
* #GNUNET_SYSERR on error * #GNUNET_SYSERR on error
*/ */
int int
(*iterate_ready_deposits) (void *cls, (*get_ready_deposit) (void *cls,
struct TALER_MINTDB_Session *session, struct TALER_MINTDB_Session *session,
TALER_MINTDB_DepositIterator deposit_cb, TALER_MINTDB_DepositIterator deposit_cb,
void *deposit_cb_cls); void *deposit_cb_cls);
/** /**
@ -953,9 +955,10 @@ struct TALER_MINTDB_Plugin
* @param cls the @e cls of this struct with the plugin-specific state * @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database * @param session connection to the database
* @param h_wire destination of the wire transfer * @param h_wire destination of the wire transfer
* @param FIXME: do we also need merchant_pub here? * @param merchant_pub public key of the merchant
* @param deposit_cb function to call for each deposit * @param deposit_cb function to call for each deposit
* @param deposit_cb_cls closure for @a deposit_cb * @param deposit_cb_cls closure for @a deposit_cb
* @param limit maximum number of matching deposits to return
* @return number of rows processed, 0 if none exist, * @return number of rows processed, 0 if none exist,
* #GNUNET_SYSERR on error * #GNUNET_SYSERR on error
*/ */
@ -963,33 +966,10 @@ struct TALER_MINTDB_Plugin
(*iterate_matching_deposits) (void *cls, (*iterate_matching_deposits) (void *cls,
struct TALER_MINTDB_Session *session, struct TALER_MINTDB_Session *session,
const struct GNUNET_HashCode *h_wire, const struct GNUNET_HashCode *h_wire,
const struct TALER_MerchantPublicKeyP *merchant_pub,
TALER_MINTDB_DepositIterator deposit_cb, TALER_MINTDB_DepositIterator deposit_cb,
void *deposit_cb_cls); void *deposit_cb_cls,
uint32_t limit);
/**
* Obtain information about deposits. Iterates over all deposits
* above a certain ID. Use a @a min_id of 0 to start at the beginning.
* This operation is executed in its own transaction in transaction
* mode "REPEATABLE READ", i.e. we should only see valid deposits.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database
* @param min_id deposit to start at
* @param limit maximum number of transactions to fetch
* @param deposit_cb function to call for each deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @return number of rows processed, 0 if none exist,
* #GNUNET_SYSERR on error
* @deprecated this is likely dead
*/
int
(*iterate_deposits) (void *cls,
struct TALER_MINTDB_Session *session,
uint64_t min_id,
uint32_t limit,
TALER_MINTDB_DepositIterator deposit_cb,
void *deposit_cb_cls);
/** /**

View File

@ -148,8 +148,9 @@ mint_serve_process_config (const char *mint_directory)
* with the goal of executing the corresponding wire transaction. * with the goal of executing the corresponding wire transaction.
* *
* @param cls closure * @param cls closure
* @param id transaction ID (used as future `min_id` to avoid * @param row_id identifies database entry
* iterating over transactions more than once) * @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
* @param amount_with_fee amount that was deposited including fee * @param amount_with_fee amount that was deposited including fee
* @param deposit_fee amount the mint gets to keep as transaction fees * @param deposit_fee amount the mint gets to keep as transaction fees
* @param transaction_id unique transaction ID chosen by the merchant * @param transaction_id unique transaction ID chosen by the merchant
@ -161,7 +162,9 @@ mint_serve_process_config (const char *mint_directory)
*/ */
static int static int
deposit_cb (void *cls, deposit_cb (void *cls,
uint64_t id, unsigned long long row_id,
const struct TALER_MerchantPublicKeyP *merchant_pub,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *amount_with_fee,
const struct TALER_Amount *deposit_fee, const struct TALER_Amount *deposit_fee,
uint64_t transaction_id, uint64_t transaction_id,
@ -204,12 +207,12 @@ run (void *cls,
*global_ret = GNUNET_SYSERR; *global_ret = GNUNET_SYSERR;
return; return;
} }
ret = db_plugin->iterate_deposits (db_plugin->cls, ret = db_plugin->get_ready_deposit (db_plugin->cls,
session, session,
0 /* FIXME: remove? */, &deposit_cb,
128 /* FIXME: make configurable? */, NULL);
&deposit_cb, // FIXME: handle 0 == ret...
NULL);
if (GNUNET_OK != ret) if (GNUNET_OK != ret)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,

View File

@ -425,7 +425,7 @@ postgres_create_tables (void *cls,
/* This table contains the wire transfers the mint is supposed to /* This table contains the wire transfers the mint is supposed to
execute to transmit funds to the merchants (and manage refunds). */ execute to transmit funds to the merchants (and manage refunds). */
SQLEXEC("CREATE TABLE IF NOT EXISTS deposits " SQLEXEC("CREATE TABLE IF NOT EXISTS deposits "
"(serial_id BIGSERIAL" "(serial_id BIGSERIAL PRIMARY KEY"
",coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)" ",coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)"
",denom_pub BYTEA NOT NULL REFERENCES denominations (pub)" ",denom_pub BYTEA NOT NULL REFERENCES denominations (pub)"
",denom_sig BYTEA NOT NULL" ",denom_sig BYTEA NOT NULL"
@ -444,6 +444,8 @@ postgres_create_tables (void *cls,
",h_wire BYTEA NOT NULL CHECK (LENGTH(h_wire)=64)" ",h_wire BYTEA NOT NULL CHECK (LENGTH(h_wire)=64)"
",coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)" ",coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)"
",wire TEXT NOT NULL" ",wire TEXT NOT NULL"
",tiny BOOLEAN NOT NULL DEFAULT false"
",done BOOLEAN NOT NULL DEFAULT false"
")"); ")");
/* Index for get_deposit statement on coin_pub, transaction_id and merchant_pub */ /* Index for get_deposit statement on coin_pub, transaction_id and merchant_pub */
SQLEXEC_INDEX("CREATE INDEX deposits_coin_pub_index " SQLEXEC_INDEX("CREATE INDEX deposits_coin_pub_index "
@ -899,8 +901,8 @@ postgres_prepare (PGconn *db_conn)
" )", " )",
5, NULL); 5, NULL);
/* Used in #postgres_iterate_deposits() */ /* Used in #postgres_get_ready_deposit() */
PREPARE ("deposits_iterate", PREPARE ("deposits_get_ready",
"SELECT" "SELECT"
" serial_id" " serial_id"
",amount_with_fee_val" ",amount_with_fee_val"
@ -913,11 +915,50 @@ postgres_prepare (PGconn *db_conn)
",transaction_id" ",transaction_id"
",h_contract" ",h_contract"
",wire" ",wire"
",merchant_pub"
",coin_pub"
" FROM deposits" " FROM deposits"
" WHERE serial_id>=$1" " WHERE"
" ORDER BY serial_id ASC" " tiny=false AND"
" LIMIT $2;", " done=false"
2, NULL); " ORDER BY execution_time ASC"
" LIMIT 1;",
0, NULL);
/* Used in #postgres_iterate_matching_deposits() */
PREPARE ("deposits_iterate_matching",
"SELECT"
" serial_id"
",amount_with_fee_val"
",amount_with_fee_frac"
",amount_with_fee_curr"
",deposit_fee_val"
",deposit_fee_frac"
",deposit_fee_curr"
",wire_deadline"
",transaction_id"
",h_contract"
",coin_pub"
" FROM deposits"
" WHERE"
" merchant_pub=$1 AND"
" h_wire=$2 AND"
" done=false"
" ORDER BY execution_time ASC"
" LIMIT $3",
3, NULL);
/* Used in #postgres_mark_deposit_tiny() */
PREPARE ("mark_deposit_tiny",
"UPDATE deposits"
" SET tiny=true"
" WHERE serial_id=$1",
1, NULL);
/* Used in #postgres_mark_deposit_done() */
PREPARE ("mark_deposit_done",
"UPDATE deposits"
" SET done=true"
" WHERE serial_id=$1",
1, NULL);
/* Used in #postgres_get_coin_transactions() to obtain information /* Used in #postgres_get_coin_transactions() to obtain information
about how a coin has been spend with /deposit requests. */ about how a coin has been spend with /deposit requests. */
PREPARE ("get_deposit_with_coin_pub", PREPARE ("get_deposit_with_coin_pub",
@ -2039,82 +2080,133 @@ postgres_have_deposit (void *cls,
/** /**
* Obtain information about deposits. Iterates over all deposits * Mark a deposit as tiny, thereby declaring that it cannot be
* above a certain ID. Use a @a min_id of 0 to start at the beginning. * executed by itself and should no longer be returned by
* This operation is executed in its own transaction in transaction * @e iterate_ready_deposits()
* mode "REPEATABLE READ", i.e. we should only see valid deposits.
* *
* @param cls the @e cls of this struct with the plugin-specific state * @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database * @param session connection to the database
* @param min_id deposit to start at * @param deposit_rowid identifies the deposit row to modify
* @param limit maximum number of transactions to fetch * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
* @param deposit_cb function to call for each deposit */
static int
postgres_mark_deposit_tiny (void *cls,
struct TALER_MINTDB_Session *session,
unsigned long long rowid)
{
uint64_t serial_id = rowid;
struct TALER_PQ_QueryParam params[] = {
TALER_PQ_query_param_uint64 (&serial_id),
TALER_PQ_query_param_end
};
PGresult *result;
result = TALER_PQ_exec_prepared (session->conn,
"mark_deposit_tiny",
params);
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
BREAK_DB_ERR (result);
PQclear (result);
return GNUNET_SYSERR;
}
PQclear (result);
return GNUNET_OK;
}
/**
* Mark a deposit as done, thereby declaring that it cannot be
* executed at all anymore, and should no longer be returned by
* @e iterate_ready_deposits() or @e iterate_matching_deposits().
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database
* @param deposit_rowid identifies the deposit row to modify
* @return #GNUNET_OK on success, #GNUNET_SYSERR on error
*/
static int
postgres_mark_deposit_done (void *cls,
struct TALER_MINTDB_Session *session,
unsigned long long rowid)
{
uint64_t serial_id = rowid;
struct TALER_PQ_QueryParam params[] = {
TALER_PQ_query_param_uint64 (&serial_id),
TALER_PQ_query_param_end
};
PGresult *result;
result = TALER_PQ_exec_prepared (session->conn,
"mark_deposit_done",
params);
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
BREAK_DB_ERR (result);
PQclear (result);
return GNUNET_SYSERR;
}
PQclear (result);
return GNUNET_OK;
}
/**
* Obtain information about deposits that are ready to be executed.
* Such deposits must not be marked as "tiny" or "done", and the
* execution time must be in the past.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database
* @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb * @param deposit_cb_cls closure for @a deposit_cb
* @return number of rows processed, 0 if none exist, * @return number of rows processed, 0 if none exist,
* #GNUNET_SYSERR on error * #GNUNET_SYSERR on error
*/ */
static int static int
postgres_iterate_deposits (void *cls, postgres_get_ready_deposit (void *cls,
struct TALER_MINTDB_Session *session, struct TALER_MINTDB_Session *session,
uint64_t min_id, TALER_MINTDB_DepositIterator deposit_cb,
uint32_t limit, void *deposit_cb_cls)
TALER_MINTDB_DepositIterator deposit_cb,
void *deposit_cb_cls)
{ {
struct TALER_PQ_QueryParam params[] = { struct TALER_PQ_QueryParam params[] = {
TALER_PQ_query_param_uint64 (&min_id),
TALER_PQ_query_param_uint32 (&limit),
TALER_PQ_query_param_end TALER_PQ_query_param_end
}; };
PGresult *result; PGresult *result;
unsigned int i;
unsigned int n; unsigned int n;
int ret;
if (GNUNET_OK !=
postgres_start (cls, session))
return GNUNET_SYSERR;
result = PQexec (session->conn,
"SET TRANSACTION REPEATABLE READ");
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
TALER_LOG_ERROR ("Failed to set transaction to REPEATABL EREAD: %s\n",
PQresultErrorMessage (result));
GNUNET_break (0);
PQclear (result);
return GNUNET_SYSERR;
}
result = TALER_PQ_exec_prepared (session->conn, result = TALER_PQ_exec_prepared (session->conn,
"deposits_iterate", "deposits_get_ready",
params); params);
if (PGRES_TUPLES_OK != if (PGRES_TUPLES_OK !=
PQresultStatus (result)) PQresultStatus (result))
{ {
BREAK_DB_ERR (result); BREAK_DB_ERR (result);
PQclear (result); PQclear (result);
postgres_rollback (cls, session);
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if (0 == (n = PQntuples (result))) if (0 == (n = PQntuples (result)))
{ {
PQclear (result); PQclear (result);
postgres_rollback (cls, session);
return 0; return 0;
} }
for (i=0;i<n;i++) GNUNET_break (1 == n);
{ {
struct TALER_Amount amount_with_fee; struct TALER_Amount amount_with_fee;
struct TALER_Amount deposit_fee; struct TALER_Amount deposit_fee;
struct GNUNET_TIME_Absolute wire_deadline; struct GNUNET_TIME_Absolute wire_deadline;
struct GNUNET_HashCode h_contract; struct GNUNET_HashCode h_contract;
json_t *wire; struct TALER_MerchantPublicKeyP merchant_pub;
struct TALER_CoinSpendPublicKeyP coin_pub;
uint64_t transaction_id; uint64_t transaction_id;
uint64_t id; uint64_t serial_id;
int ret; json_t *wire;
struct TALER_PQ_ResultSpec rs[] = { struct TALER_PQ_ResultSpec rs[] = {
TALER_PQ_result_spec_uint64 ("id", TALER_PQ_result_spec_uint64 ("serial_id",
&id), &serial_id),
TALER_PQ_result_spec_uint64 ("transaction_id", TALER_PQ_result_spec_uint64 ("transaction_id",
&transaction_id), &transaction_id),
TALER_PQ_result_spec_amount ("amount_with_fee", TALER_PQ_result_spec_amount ("amount_with_fee",
@ -2125,20 +2217,25 @@ postgres_iterate_deposits (void *cls,
&wire_deadline), &wire_deadline),
TALER_PQ_result_spec_auto_from_type ("h_contract", TALER_PQ_result_spec_auto_from_type ("h_contract",
&h_contract), &h_contract),
TALER_PQ_result_spec_auto_from_type ("merchant_pub",
&merchant_pub),
TALER_PQ_result_spec_auto_from_type ("coin_pub",
&coin_pub),
TALER_PQ_result_spec_json ("wire", TALER_PQ_result_spec_json ("wire",
&wire), &wire),
TALER_PQ_result_spec_end TALER_PQ_result_spec_end
}; };
if (GNUNET_OK != if (GNUNET_OK !=
TALER_PQ_extract_result (result, rs, i)) TALER_PQ_extract_result (result, rs, 0))
{ {
GNUNET_break (0); GNUNET_break (0);
PQclear (result); PQclear (result);
postgres_rollback (cls, session);
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
ret = deposit_cb (deposit_cb_cls, ret = deposit_cb (deposit_cb_cls,
id, serial_id,
&merchant_pub,
&coin_pub,
&amount_with_fee, &amount_with_fee,
&deposit_fee, &deposit_fee,
transaction_id, transaction_id,
@ -2147,10 +2244,113 @@ postgres_iterate_deposits (void *cls,
wire); wire);
TALER_PQ_cleanup_result (rs); TALER_PQ_cleanup_result (rs);
PQclear (result); PQclear (result);
}
return (GNUNET_OK == ret) ? 1 : 0;
}
/**
* Obtain information about other pending deposits for the same
* destination. Those deposits must not already be "done".
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database
* @param h_wire destination of the wire transfer
* @param merchant_pub public key of the merchant
* @param deposit_cb function to call for each deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @param limit maximum number of matching deposits to return
* @return number of rows processed, 0 if none exist,
* #GNUNET_SYSERR on error
*/
static int
postgres_iterate_matching_deposits (void *cls,
struct TALER_MINTDB_Session *session,
const struct GNUNET_HashCode *h_wire,
const struct TALER_MerchantPublicKeyP *merchant_pub,
TALER_MINTDB_DepositIterator deposit_cb,
void *deposit_cb_cls,
uint32_t limit)
{
struct TALER_PQ_QueryParam params[] = {
TALER_PQ_query_param_auto_from_type (merchant_pub),
TALER_PQ_query_param_auto_from_type (h_wire),
TALER_PQ_query_param_uint32 (&limit),
TALER_PQ_query_param_end
};
PGresult *result;
unsigned int i;
unsigned int n;
result = TALER_PQ_exec_prepared (session->conn,
"deposits_iterate_matching",
params);
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
BREAK_DB_ERR (result);
PQclear (result);
return GNUNET_SYSERR;
}
if (0 == (n = PQntuples (result)))
{
PQclear (result);
return 0;
}
if (n > limit)
n = limit;
for (i=0;i<n;i++)
{
struct TALER_Amount amount_with_fee;
struct TALER_Amount deposit_fee;
struct GNUNET_TIME_Absolute wire_deadline;
struct GNUNET_HashCode h_contract;
struct TALER_MerchantPublicKeyP merchant_pub;
struct TALER_CoinSpendPublicKeyP coin_pub;
uint64_t transaction_id;
uint64_t serial_id;
int ret;
struct TALER_PQ_ResultSpec rs[] = {
TALER_PQ_result_spec_uint64 ("serial_id",
&serial_id),
TALER_PQ_result_spec_uint64 ("transaction_id",
&transaction_id),
TALER_PQ_result_spec_amount ("amount_with_fee",
&amount_with_fee),
TALER_PQ_result_spec_amount ("deposit_fee",
&deposit_fee),
TALER_PQ_result_spec_absolute_time ("wire_deadline",
&wire_deadline),
TALER_PQ_result_spec_auto_from_type ("h_contract",
&h_contract),
TALER_PQ_result_spec_auto_from_type ("merchant_pub",
&merchant_pub),
TALER_PQ_result_spec_auto_from_type ("coin_pub",
&coin_pub),
TALER_PQ_result_spec_end
};
if (GNUNET_OK !=
TALER_PQ_extract_result (result, rs, i))
{
GNUNET_break (0);
PQclear (result);
return GNUNET_SYSERR;
}
ret = deposit_cb (deposit_cb_cls,
serial_id,
&merchant_pub,
&coin_pub,
&amount_with_fee,
&deposit_fee,
transaction_id,
&h_contract,
wire_deadline,
NULL);
TALER_PQ_cleanup_result (rs);
PQclear (result);
if (GNUNET_OK != ret) if (GNUNET_OK != ret)
break; break;
} }
postgres_rollback (cls, session);
return i; return i;
} }
@ -3838,7 +4038,10 @@ libtaler_plugin_mintdb_postgres_init (void *cls)
plugin->get_reserve_history = &postgres_get_reserve_history; plugin->get_reserve_history = &postgres_get_reserve_history;
plugin->free_reserve_history = &common_free_reserve_history; plugin->free_reserve_history = &common_free_reserve_history;
plugin->have_deposit = &postgres_have_deposit; plugin->have_deposit = &postgres_have_deposit;
plugin->iterate_deposits = &postgres_iterate_deposits; plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny;
plugin->mark_deposit_done = &postgres_mark_deposit_done;
plugin->get_ready_deposit = &postgres_get_ready_deposit;
plugin->iterate_matching_deposits = &postgres_iterate_matching_deposits;
plugin->insert_deposit = &postgres_insert_deposit; plugin->insert_deposit = &postgres_insert_deposit;
plugin->get_refresh_session = &postgres_get_refresh_session; plugin->get_refresh_session = &postgres_get_refresh_session;
plugin->create_refresh_session = &postgres_create_refresh_session; plugin->create_refresh_session = &postgres_create_refresh_session;