adapt /admin/add/incoming to #5010

This commit is contained in:
Christian Grothoff 2017-06-23 14:13:54 +02:00
parent d4884c0c60
commit ff8633910d
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
4 changed files with 138 additions and 155 deletions

View File

@ -27,67 +27,97 @@
#include "taler-exchange-httpd_validation.h" #include "taler-exchange-httpd_validation.h"
/**
* Closure for #admin_add_incoming_transaction()
*/
struct AddIncomingContext
{
/**
* public key of the reserve
*/
struct TALER_ReservePublicKeyP reserve_pub;
/**
* amount to add to the reserve
*/
struct TALER_Amount amount;
/**
* When did we receive the wire transfer
*/
struct GNUNET_TIME_Absolute execution_time;
/**
* which account send the funds
*/
json_t *sender_account_details;
/**
* Information that uniquely identifies the transfer
*/
json_t *transfer_details;
/**
* Set to the transaction status.
*/
enum GNUNET_DB_QueryStatus qs;
};
/** /**
* Add an incoming transaction to the database. Checks if the * Add an incoming transaction to the database. Checks if the
* transaction is fresh (not a duplicate) and if so adds it to * transaction is fresh (not a duplicate) and if so adds it to
* the database. * the database.
* *
* @param connection the MHD connection to handle * If it returns a non-error code, the transaction logic MUST
* @param reserve_pub public key of the reserve * NOT queue a MHD response. IF it returns an hard error, the
* @param amount amount to add to the reserve * transaction logic MUST queue a MHD response and set @a mhd_ret. IF
* @param execution_time when did we receive the wire transfer * it returns the soft error code, the function MAY be called again to
* @param sender_account_details which account send the funds * retry and MUST not queue a MHD response.
* @param transfer_details information that uniquely identifies the transfer *
* @return MHD result code * @param cls closure with the `struct AddIncomingContext *`
* @param connection MHD request which triggered the transaction
* @param session database session to use
* @param[out] mhd_ret set to MHD response status for @a connection,
* if transaction failed (!)
* @return transaction status
*/ */
static int static enum GNUNET_DB_QueryStatus
execute_admin_add_incoming (struct MHD_Connection *connection, admin_add_incoming_transaction (void *cls,
const struct TALER_ReservePublicKeyP *reserve_pub, struct MHD_Connection *connection,
const struct TALER_Amount *amount, struct TALER_EXCHANGEDB_Session *session,
struct GNUNET_TIME_Absolute execution_time, int *mhd_ret)
const json_t *sender_account_details,
const json_t *transfer_details)
{ {
struct TALER_EXCHANGEDB_Session *session; struct AddIncomingContext *aic = cls;
int ret;
void *json_str; void *json_str;
if (NULL == (session = TEH_plugin->get_session (TEH_plugin->cls))) json_str = json_dumps (aic->transfer_details,
{
GNUNET_break (0);
return TEH_RESPONSE_reply_internal_db_error (connection,
TALER_EC_DB_SETUP_FAILED);
}
json_str = json_dumps (transfer_details,
JSON_INDENT(2)); JSON_INDENT(2));
if (NULL == json_str) if (NULL == json_str)
{ {
GNUNET_break (0); GNUNET_break (0);
return TEH_RESPONSE_reply_internal_db_error (connection, *mhd_ret = TEH_RESPONSE_reply_internal_db_error (connection,
TALER_EC_PARSER_OUT_OF_MEMORY); TALER_EC_PARSER_OUT_OF_MEMORY);
return GNUNET_DB_STATUS_HARD_ERROR;
} }
ret = TEH_plugin->reserves_in_insert (TEH_plugin->cls, aic->qs = TEH_plugin->reserves_in_insert (TEH_plugin->cls,
session, session,
reserve_pub, &aic->reserve_pub,
amount, &aic->amount,
execution_time, aic->execution_time,
sender_account_details, aic->sender_account_details,
json_str, json_str,
strlen (json_str)); strlen (json_str));
free (json_str); free (json_str);
if (GNUNET_SYSERR == ret)
if (GNUNET_DB_STATUS_HARD_ERROR == aic->qs)
{ {
GNUNET_break (0); GNUNET_break (0);
return TEH_RESPONSE_reply_internal_db_error (connection, *mhd_ret = TEH_RESPONSE_reply_internal_db_error (connection,
TALER_EC_ADMIN_ADD_INCOMING_DB_STORE); TALER_EC_ADMIN_ADD_INCOMING_DB_STORE);
return GNUNET_DB_STATUS_HARD_ERROR;
} }
return TEH_RESPONSE_reply_json_pack (connection, return aic->qs;
MHD_HTTP_OK,
"{s:s}",
"status",
(GNUNET_OK == ret)
? "NEW"
: "DUP");
} }
@ -110,23 +140,20 @@ TEH_ADMIN_handler_admin_add_incoming (struct TEH_RequestHandler *rh,
const char *upload_data, const char *upload_data,
size_t *upload_data_size) size_t *upload_data_size)
{ {
struct TALER_ReservePublicKeyP reserve_pub; struct AddIncomingContext aic;
struct TALER_Amount amount;
struct GNUNET_TIME_Absolute at;
enum TALER_ErrorCode ec; enum TALER_ErrorCode ec;
char *emsg; char *emsg;
json_t *sender_account_details;
json_t *transfer_details;
json_t *root; json_t *root;
struct GNUNET_JSON_Specification spec[] = { struct GNUNET_JSON_Specification spec[] = {
GNUNET_JSON_spec_fixed_auto ("reserve_pub", &reserve_pub), GNUNET_JSON_spec_fixed_auto ("reserve_pub", &aic.reserve_pub),
TALER_JSON_spec_amount ("amount", &amount), TALER_JSON_spec_amount ("amount", &aic.amount),
GNUNET_JSON_spec_absolute_time ("execution_date", &at), GNUNET_JSON_spec_absolute_time ("execution_date", &aic.execution_time),
GNUNET_JSON_spec_json ("sender_account_details", &sender_account_details), GNUNET_JSON_spec_json ("sender_account_details", &aic.sender_account_details),
GNUNET_JSON_spec_json ("transfer_details", &transfer_details), GNUNET_JSON_spec_json ("transfer_details", &aic.transfer_details),
GNUNET_JSON_spec_end () GNUNET_JSON_spec_end ()
}; };
int res; int res;
int mhd_ret;
res = TEH_PARSE_post_json (connection, res = TEH_PARSE_post_json (connection,
connection_cls, connection_cls,
@ -135,7 +162,8 @@ TEH_ADMIN_handler_admin_add_incoming (struct TEH_RequestHandler *rh,
&root); &root);
if (GNUNET_SYSERR == res) if (GNUNET_SYSERR == res)
return MHD_NO; return MHD_NO;
if ( (GNUNET_NO == res) || (NULL == root) ) if ( (GNUNET_NO == res) ||
(NULL == root) )
return MHD_YES; return MHD_YES;
res = TEH_PARSE_json_data (connection, res = TEH_PARSE_json_data (connection,
root, root,
@ -148,37 +176,43 @@ TEH_ADMIN_handler_admin_add_incoming (struct TEH_RequestHandler *rh,
return (GNUNET_SYSERR == res) ? MHD_NO : MHD_YES; return (GNUNET_SYSERR == res) ? MHD_NO : MHD_YES;
} }
if (TALER_EC_NONE != if (TALER_EC_NONE !=
(ec = TEH_json_validate_wireformat (sender_account_details, (ec = TEH_json_validate_wireformat (aic.sender_account_details,
GNUNET_NO, GNUNET_NO,
&emsg))) &emsg)))
{ {
GNUNET_JSON_parse_free (spec); GNUNET_JSON_parse_free (spec);
res = TEH_RESPONSE_reply_external_error (connection, mhd_ret = TEH_RESPONSE_reply_external_error (connection,
ec, ec,
emsg); emsg);
GNUNET_free (emsg); GNUNET_free (emsg);
return res; return mhd_ret;
} }
if (0 != strcasecmp (amount.currency, if (0 != strcasecmp (aic.amount.currency,
TEH_exchange_currency_string)) TEH_exchange_currency_string))
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Exchange uses currency `%s', but /admin/add/incoming tried to use currency `%s'\n", "Exchange uses currency `%s', but /admin/add/incoming tried to use currency `%s'\n",
TEH_exchange_currency_string, TEH_exchange_currency_string,
amount.currency); aic.amount.currency);
GNUNET_JSON_parse_free (spec); GNUNET_JSON_parse_free (spec);
return TEH_RESPONSE_reply_arg_invalid (connection, return TEH_RESPONSE_reply_arg_invalid (connection,
TALER_EC_ADMIN_ADD_INCOMING_CURRENCY_UNSUPPORTED, TALER_EC_ADMIN_ADD_INCOMING_CURRENCY_UNSUPPORTED,
"amount:currency"); "amount:currency");
} }
res = execute_admin_add_incoming (connection, res = TEH_DB_run_transaction (connection,
&reserve_pub, &mhd_ret,
&amount, &admin_add_incoming_transaction,
at, &aic);
sender_account_details,
transfer_details);
GNUNET_JSON_parse_free (spec); GNUNET_JSON_parse_free (spec);
return res; if (GNUNET_OK != res)
return mhd_ret;
return TEH_RESPONSE_reply_json_pack (connection,
MHD_HTTP_OK,
"{s:s}",
"status",
(GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == aic.qs)
? "NEW"
: "DUP");
} }
/* end of taler-exchange-httpd_admin.c */ /* end of taler-exchange-httpd_admin.c */

View File

@ -1952,11 +1952,9 @@ reserves_update (void *cls,
* @param sender_account_details account information for the sender * @param sender_account_details account information for the sender
* @param wire_reference unique reference identifying the wire transfer (binary blob) * @param wire_reference unique reference identifying the wire transfer (binary blob)
* @param wire_reference_size number of bytes in @a wire_reference * @param wire_reference_size number of bytes in @a wire_reference
* @return #GNUNET_OK upon success; #GNUNET_NO if the given * @return transaction status code
* @a details are already known for this @a reserve_pub,
* #GNUNET_SYSERR upon failures (DB error, incompatible currency)
*/ */
static int static enum GNUNET_DB_QueryStatus
postgres_reserves_in_insert (void *cls, postgres_reserves_in_insert (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const struct TALER_ReservePublicKeyP *reserve_pub, const struct TALER_ReservePublicKeyP *reserve_pub,
@ -1967,18 +1965,11 @@ postgres_reserves_in_insert (void *cls,
size_t wire_reference_size) size_t wire_reference_size)
{ {
struct PostgresClosure *pg = cls; struct PostgresClosure *pg = cls;
PGresult *result;
enum GNUNET_DB_QueryStatus reserve_exists; enum GNUNET_DB_QueryStatus reserve_exists;
enum GNUNET_DB_QueryStatus qs;
struct TALER_EXCHANGEDB_Reserve reserve; struct TALER_EXCHANGEDB_Reserve reserve;
struct GNUNET_TIME_Absolute expiry; struct GNUNET_TIME_Absolute expiry;
if (GNUNET_OK !=
postgres_start (cls,
session))
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
reserve.pub = *reserve_pub; reserve.pub = *reserve_pub;
reserve_exists = postgres_reserve_get (cls, reserve_exists = postgres_reserve_get (cls,
session, session,
@ -1986,7 +1977,7 @@ postgres_reserves_in_insert (void *cls,
if (0 > reserve_exists) if (0 > reserve_exists)
{ {
GNUNET_break (0); GNUNET_break (0);
goto rollback; return reserve_exists;
} }
if ( (0 == reserve.balance.value) && if ( (0 == reserve.balance.value) &&
(0 == reserve.balance.fraction) ) (0 == reserve.balance.fraction) )
@ -2030,22 +2021,22 @@ postgres_reserves_in_insert (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Reserve does not exist; creating a new one\n"); "Reserve does not exist; creating a new one\n");
result = GNUNET_PQ_exec_prepared (session->conn, qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
"reserve_create", "reserve_create",
params); params);
if (PGRES_COMMAND_OK != PQresultStatus(result)) if (0 > qs)
return qs;
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
{ {
QUERY_ERR (result, session->conn); /* Maybe DB did not detect serializiability error already,
PQclear (result); but clearly there must be one. Still odd. */
goto rollback; GNUNET_break (0);
return GNUNET_DB_STATUS_SOFT_ERROR;
} }
PQclear (result);
} }
/* Create new incoming transaction, SQL "primary key" logic /* Create new incoming transaction, SQL "primary key" logic
is used to guard against duplicates. If a duplicate is is used to guard against duplicates. If a duplicate is
detected, we rollback (which really shouldn't undo detected, we just "succeed" with no changes. */
anything) and return #GNUNET_NO to indicate that this failure
is kind-of harmless (already executed). */
{ {
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&reserve.pub), GNUNET_PQ_query_param_auto_from_type (&reserve.pub),
@ -2057,34 +2048,12 @@ postgres_reserves_in_insert (void *cls,
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
result = GNUNET_PQ_exec_prepared (session->conn, qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
"reserves_in_add_transaction", "reserves_in_add_transaction",
params); params);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
return qs;
} }
if (PGRES_COMMAND_OK != PQresultStatus(result))
{
const char *efield;
efield = PQresultErrorField (result,
PG_DIAG_SQLSTATE);
if ( (PGRES_FATAL_ERROR == PQresultStatus(result)) &&
(NULL != strstr ("23505", /* unique violation */
efield)) )
{
/* This means we had the same reserve/justification/details
before */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Uniqueness violation, deposit details already known\n");
PQclear (result);
postgres_rollback (cls,
session);
return GNUNET_NO;
}
QUERY_ERR (result, session->conn);
PQclear (result);
goto rollback;
}
PQclear (result);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == reserve_exists) if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == reserve_exists)
{ {
@ -2094,7 +2063,6 @@ postgres_reserves_in_insert (void *cls,
back for duplicate transactions; like this, we should virtually back for duplicate transactions; like this, we should virtually
never actually have to rollback anything. */ never actually have to rollback anything. */
struct TALER_EXCHANGEDB_Reserve updated_reserve; struct TALER_EXCHANGEDB_Reserve updated_reserve;
enum GNUNET_DB_QueryStatus qs;
updated_reserve.pub = reserve.pub; updated_reserve.pub = reserve.pub;
if (GNUNET_OK != if (GNUNET_OK !=
@ -2105,38 +2073,15 @@ postgres_reserves_in_insert (void *cls,
/* currency overflow or incompatible currency */ /* currency overflow or incompatible currency */
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Attempt to deposit incompatible amount into reserve\n"); "Attempt to deposit incompatible amount into reserve\n");
goto rollback; return GNUNET_DB_STATUS_HARD_ERROR;
} }
updated_reserve.expiry = GNUNET_TIME_absolute_max (expiry, updated_reserve.expiry = GNUNET_TIME_absolute_max (expiry,
reserve.expiry); reserve.expiry);
qs = reserves_update (cls, return reserves_update (cls,
session, session,
&updated_reserve); &updated_reserve);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
goto rollback; /* FIXME: #5010 */
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{
postgres_rollback (cls,
session);
return GNUNET_SYSERR;
}
} }
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
postgres_commit (cls,
session))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to commit transaction adding amount to reserve\n");
return GNUNET_SYSERR;
}
return GNUNET_OK;
rollback:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Transaction failed, doing rollback\n");
postgres_rollback (cls,
session);
return GNUNET_SYSERR;
} }

View File

@ -162,7 +162,6 @@ check_reserve (struct TALER_EXCHANGEDB_Session *session,
struct TALER_EXCHANGEDB_Reserve reserve; struct TALER_EXCHANGEDB_Reserve reserve;
reserve.pub = *pub; reserve.pub = *pub;
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->reserve_get (plugin->cls, plugin->reserve_get (plugin->cls,
session, session,
@ -1485,6 +1484,10 @@ run (void *cls)
goto drop; goto drop;
} }
FAILIF (GNUNET_OK !=
plugin->start (plugin->cls,
session));
/* test DB is empty */ /* test DB is empty */
FAILIF (GNUNET_NO != FAILIF (GNUNET_NO !=
plugin->select_payback_above_serial_id (plugin->cls, plugin->select_payback_above_serial_id (plugin->cls,
@ -1520,7 +1523,7 @@ run (void *cls)
session, session,
&rr, &rr,
&rr_size)); &rr_size));
FAILIF (GNUNET_OK != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->reserves_in_insert (plugin->cls, plugin->reserves_in_insert (plugin->cls,
session, session,
&reserve_pub, &reserve_pub,
@ -1543,7 +1546,7 @@ run (void *cls)
value.value, value.value,
value.fraction, value.fraction,
value.currency)); value.currency));
FAILIF (GNUNET_OK != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->reserves_in_insert (plugin->cls, plugin->reserves_in_insert (plugin->cls,
session, session,
&reserve_pub, &reserve_pub,
@ -1810,6 +1813,9 @@ run (void *cls)
session, session,
&deposit_cb, &deposit_cb,
&deposit)); &deposit));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->commit (plugin->cls,
session));
FAILIF (GNUNET_OK != FAILIF (GNUNET_OK !=
plugin->start (plugin->cls, plugin->start (plugin->cls,
session)); session));

View File

@ -1190,11 +1190,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param sender_account_details information about the sender's bank account * @param sender_account_details information about the sender's bank account
* @param wire_reference unique reference identifying the wire transfer (binary blob) * @param wire_reference unique reference identifying the wire transfer (binary blob)
* @param wire_reference_size number of bytes in @a wire_reference * @param wire_reference_size number of bytes in @a wire_reference
* @return #GNUNET_OK upon success; #GNUNET_NO if the given * @return transaction status code
* @a details are already known for this @a reserve_pub,
* #GNUNET_SYSERR upon failures (DB error, incompatible currency)
*/ */
int enum GNUNET_DB_QueryStatus
(*reserves_in_insert) (void *cls, (*reserves_in_insert) (void *cls,
struct TALER_EXCHANGEDB_Session *db, struct TALER_EXCHANGEDB_Session *db,
const struct TALER_ReservePublicKeyP *reserve_pub, const struct TALER_ReservePublicKeyP *reserve_pub,