clean up reserve_get logic

This commit is contained in:
Christian Grothoff 2023-04-21 22:30:37 +02:00
parent 03deaeb108
commit 5290453e36
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
3 changed files with 146 additions and 177 deletions

@ -1 +1 @@
Subproject commit 1ec4596bf4925ee24fc06d3e74d2a553b8239870 Subproject commit bf43b20a0362ac19bcf1bab9c33215e55d8d9f36

View File

@ -1,6 +1,6 @@
/* /*
This file is part of TALER This file is part of TALER
Copyright (C) 2014-2022 Taler Systems SA Copyright (C) 2014-2023 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software terms of the GNU Affero General Public License as published by the Free Software
@ -62,6 +62,16 @@ struct ReservePoller
*/ */
struct GNUNET_TIME_Absolute timeout; struct GNUNET_TIME_Absolute timeout;
/**
* Public key of the reserve the inquiry is about.
*/
struct TALER_ReservePublicKeyP reserve_pub;
/**
* Balance of the reserve, set in the callback.
*/
struct TALER_Amount balance;
/** /**
* True if we are still suspended. * True if we are still suspended.
*/ */
@ -84,13 +94,10 @@ static struct ReservePoller *rp_tail;
void void
TEH_reserves_get_cleanup () TEH_reserves_get_cleanup ()
{ {
struct ReservePoller *rp; for (struct ReservePoller *rp = rp_head;
NULL != rp;
while (NULL != (rp = rp_head)) rp = rp->next)
{ {
GNUNET_CONTAINER_DLL_remove (rp_head,
rp_tail,
rp);
if (rp->suspended) if (rp->suspended)
{ {
rp->suspended = false; rp->suspended = false;
@ -115,11 +122,14 @@ rp_cleanup (struct TEH_RequestContext *rc)
if (NULL != rp->eh) if (NULL != rp->eh)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Cancelling DB event listening\n"); "Cancelling DB event listening on cleanup (odd unless during shutdown)\n");
TEH_plugin->event_listen_cancel (TEH_plugin->cls, TEH_plugin->event_listen_cancel (TEH_plugin->cls,
rp->eh); rp->eh);
rp->eh = NULL; rp->eh = NULL;
} }
GNUNET_CONTAINER_DLL_remove (rp_head,
rp_tail,
rp);
GNUNET_free (rp); GNUNET_free (rp);
} }
@ -143,20 +153,14 @@ db_event_cb (void *cls,
(void) extra; (void) extra;
(void) extra_size; (void) extra_size;
if (NULL == rp)
return; /* event triggered while main transaction
was still running */
if (! rp->suspended) if (! rp->suspended)
return; /* might get multiple wake-up events */ return; /* might get multiple wake-up events */
rp->suspended = false;
GNUNET_async_scope_enter (&rc->async_scope_id, GNUNET_async_scope_enter (&rc->async_scope_id,
&old_scope); &old_scope);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Resuming from long-polling on reserve\n"); "Resuming from long-polling on reserve\n");
TEH_check_invariants (); TEH_check_invariants ();
GNUNET_CONTAINER_DLL_remove (rp_head, rp->suspended = false;
rp_tail,
rp);
MHD_resume_connection (rp->connection); MHD_resume_connection (rp->connection);
TALER_MHD_daemon_trigger (); TALER_MHD_daemon_trigger ();
TEH_check_invariants (); TEH_check_invariants ();
@ -164,191 +168,154 @@ db_event_cb (void *cls,
} }
/**
* Closure for #reserve_history_transaction.
*/
struct ReserveHistoryContext
{
/**
* Public key of the reserve the inquiry is about.
*/
struct TALER_ReservePublicKeyP reserve_pub;
/**
* Balance of the reserve, set in the callback.
*/
struct TALER_Amount balance;
/**
* Set to true if we did not find the reserve.
*/
bool not_found;
};
/**
* Function implementing /reserves/ GET transaction.
* Execute a /reserves/ GET. Given the public key of a reserve,
* return the associated transaction history. Runs the
* transaction logic; IF it returns a non-error code, the transaction
* logic MUST NOT queue a MHD response. IF it returns an hard error,
* the transaction logic MUST queue a MHD response and set @a mhd_ret.
* IF it returns the soft error code, the function MAY be called again
* to retry and MUST not queue a MHD response.
*
* @param cls a `struct ReserveHistoryContext *`
* @param connection MHD request which triggered the transaction
* @param[out] mhd_ret set to MHD response status for @a connection,
* if transaction failed (!)
* @return transaction status
*/
static enum GNUNET_DB_QueryStatus
reserve_balance_transaction (void *cls,
struct MHD_Connection *connection,
MHD_RESULT *mhd_ret)
{
struct ReserveHistoryContext *rsc = cls;
enum GNUNET_DB_QueryStatus qs;
qs = TEH_plugin->get_reserve_balance (TEH_plugin->cls,
&rsc->reserve_pub,
&rsc->balance);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_break (0);
*mhd_ret
= TALER_MHD_reply_with_error (connection,
MHD_HTTP_INTERNAL_SERVER_ERROR,
TALER_EC_GENERIC_DB_FETCH_FAILED,
"get_reserve_balance");
}
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
rsc->not_found = true;
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
rsc->not_found = false;
return qs;
}
MHD_RESULT MHD_RESULT
TEH_handler_reserves_get (struct TEH_RequestContext *rc, TEH_handler_reserves_get (struct TEH_RequestContext *rc,
const char *const args[1]) const char *const args[1])
{ {
struct ReserveHistoryContext rsc; struct ReservePoller *rp = rc->rh_ctx;
struct GNUNET_TIME_Relative timeout = GNUNET_TIME_UNIT_ZERO;
struct GNUNET_DB_EventHandler *eh = NULL;
if (GNUNET_OK != if (NULL == rp)
GNUNET_STRINGS_string_to_data (args[0],
strlen (args[0]),
&rsc.reserve_pub,
sizeof (rsc.reserve_pub)))
{ {
GNUNET_break_op (0); struct GNUNET_TIME_Relative timeout
return TALER_MHD_reply_with_error (rc->connection, = GNUNET_TIME_UNIT_ZERO;
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_RESERVE_PUB_MALFORMED,
args[0]);
}
{
const char *long_poll_timeout_ms;
long_poll_timeout_ms rp = GNUNET_new (struct ReservePoller);
= MHD_lookup_connection_value (rc->connection, rp->connection = rc->connection;
MHD_GET_ARGUMENT_KIND, rc->rh_ctx = rp;
"timeout_ms"); rc->rh_cleaner = &rp_cleanup;
if (NULL != long_poll_timeout_ms) GNUNET_CONTAINER_DLL_insert (rp_head,
rp_tail,
rp);
if (GNUNET_OK !=
GNUNET_STRINGS_string_to_data (args[0],
strlen (args[0]),
&rp->reserve_pub,
sizeof (rp->reserve_pub)))
{ {
unsigned int timeout_ms; GNUNET_break_op (0);
char dummy; return TALER_MHD_reply_with_error (rc->connection,
MHD_HTTP_BAD_REQUEST,
if (1 != sscanf (long_poll_timeout_ms, TALER_EC_GENERIC_RESERVE_PUB_MALFORMED,
"%u%c", args[0]);
&timeout_ms,
&dummy))
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (rc->connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"timeout_ms (must be non-negative number)");
}
timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
timeout_ms);
} }
{
const char *long_poll_timeout_ms;
long_poll_timeout_ms
= MHD_lookup_connection_value (rc->connection,
MHD_GET_ARGUMENT_KIND,
"timeout_ms");
if (NULL != long_poll_timeout_ms)
{
unsigned int timeout_ms;
char dummy;
if (1 != sscanf (long_poll_timeout_ms,
"%u%c",
&timeout_ms,
&dummy))
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (rc->connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"timeout_ms (must be non-negative number)");
}
timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
timeout_ms);
}
}
rp->timeout = GNUNET_TIME_relative_to_absolute (timeout);
} }
if ( (! GNUNET_TIME_relative_is_zero (timeout)) &&
(NULL == rc->rh_ctx) ) if ( (GNUNET_TIME_absolute_is_future (rp->timeout)) &&
(NULL == rp->eh) )
{ {
struct TALER_ReserveEventP rep = { struct TALER_ReserveEventP rep = {
.header.size = htons (sizeof (rep)), .header.size = htons (sizeof (rep)),
.header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING), .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
.reserve_pub = rsc.reserve_pub .reserve_pub = rp->reserve_pub
}; };
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting DB event listening\n"); "Starting DB event listening\n");
eh = TEH_plugin->event_listen (TEH_plugin->cls, rp->eh = TEH_plugin->event_listen (
timeout, TEH_plugin->cls,
&rep.header, GNUNET_TIME_absolute_get_remaining (rp->timeout),
&db_event_cb, &rep.header,
rc); &db_event_cb,
rp);
} }
{ {
MHD_RESULT mhd_ret; enum GNUNET_DB_QueryStatus qs;
if (GNUNET_OK != qs = TEH_plugin->get_reserve_balance (TEH_plugin->cls,
TEH_DB_run_transaction (rc->connection, &rp->reserve_pub,
"get reserve balance", &rp->balance);
TEH_MT_REQUEST_OTHER, switch (qs)
&mhd_ret,
&reserve_balance_transaction,
&rsc))
{ {
if (NULL != eh) case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_break (0); /* single-shot query should never have soft-errors */
if (NULL != rp->eh)
{
TEH_plugin->event_listen_cancel (TEH_plugin->cls, TEH_plugin->event_listen_cancel (TEH_plugin->cls,
eh); rp->eh);
return mhd_ret; rp->eh = NULL;
} }
}
/* generate proper response */
if (rsc.not_found)
{
struct ReservePoller *rp = rc->rh_ctx;
if ( (NULL != rp) ||
(GNUNET_TIME_relative_is_zero (timeout)) )
{
return TALER_MHD_reply_with_error (rc->connection, return TALER_MHD_reply_with_error (rc->connection,
MHD_HTTP_NOT_FOUND, MHD_HTTP_INTERNAL_SERVER_ERROR,
TALER_EC_EXCHANGE_RESERVES_STATUS_UNKNOWN, TALER_EC_GENERIC_DB_SOFT_FAILURE,
args[0]); "get_reserve_balance");
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
if (NULL != rp->eh)
{
TEH_plugin->event_listen_cancel (TEH_plugin->cls,
rp->eh);
rp->eh = NULL;
}
return TALER_MHD_reply_with_error (rc->connection,
MHD_HTTP_INTERNAL_SERVER_ERROR,
TALER_EC_GENERIC_DB_FETCH_FAILED,
"get_reserve_balance");
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
if (NULL != rp->eh)
{
TEH_plugin->event_listen_cancel (TEH_plugin->cls,
rp->eh);
rp->eh = NULL;
}
return TALER_MHD_REPLY_JSON_PACK (rc->connection,
MHD_HTTP_OK,
TALER_JSON_pack_amount ("balance",
&rp->balance));
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
if ( (NULL != rp) ||
(GNUNET_TIME_absolute_is_future (rp->timeout)) )
{
if (NULL != rp->eh)
{
TEH_plugin->event_listen_cancel (TEH_plugin->cls,
rp->eh);
rp->eh = NULL;
}
return TALER_MHD_reply_with_error (rc->connection,
MHD_HTTP_NOT_FOUND,
TALER_EC_EXCHANGE_RESERVES_STATUS_UNKNOWN,
args[0]);
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Long-polling on reserve for %s\n",
GNUNET_STRINGS_relative_time_to_string (
GNUNET_TIME_absolute_get_remaining (rp->timeout),
true));
rp->suspended = true;
MHD_suspend_connection (rc->connection);
return MHD_YES;
} }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Long-polling on reserve for %s\n",
GNUNET_STRINGS_relative_time_to_string (timeout,
GNUNET_YES));
rp = GNUNET_new (struct ReservePoller);
rp->connection = rc->connection;
rp->timeout = GNUNET_TIME_relative_to_absolute (timeout);
rp->eh = eh;
rc->rh_ctx = rp;
rc->rh_cleaner = &rp_cleanup;
rp->suspended = true;
GNUNET_CONTAINER_DLL_insert (rp_head,
rp_tail,
rp);
MHD_suspend_connection (rc->connection);
return MHD_YES;
} }
if (NULL != eh) GNUNET_break (0);
TEH_plugin->event_listen_cancel (TEH_plugin->cls, return MHD_NO;
eh);
return TALER_MHD_REPLY_JSON_PACK (
rc->connection,
MHD_HTTP_OK,
TALER_JSON_pack_amount ("balance",
&rsc.balance));
} }

View File

@ -722,8 +722,6 @@ TEH_PG_reserves_in_insert (
&transaction_duplicate[i], &transaction_duplicate[i],
&conflicts[i], &conflicts[i],
&reserve_uuid[i]); &reserve_uuid[i]);
fprintf (stdout, "reserve uuid : %ld c :%d t:%d\n", reserve_uuid[i],
conflicts[i], transaction_duplicate[i]);
if (qs2<0) if (qs2<0)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@ -825,7 +823,11 @@ TEH_PG_reserves_in_insert (
cs = TEH_PG_commit (pg); cs = TEH_PG_commit (pg);
if (cs < 0) if (cs < 0)
{
for (unsigned int i = 0; i<reserves_length; i++)
GNUNET_free (notify_s[i]);
return cs; return cs;
}
} }
exit: exit: