-implement long polling support on reserve status (but not yet in C client library)

This commit is contained in:
Christian Grothoff 2021-08-22 00:12:18 +02:00
parent 9ad3469f07
commit 10f9272e45
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
6 changed files with 313 additions and 33 deletions

@ -1 +1 @@
Subproject commit 2e967c48b395a3edb85982e2e349cb82e76dcb27 Subproject commit efb2a1fd64e17159c56ff3674083837b5a657a64

View File

@ -1430,8 +1430,14 @@ run_single_request (void)
} }
MHD_run (mhd); MHD_run (mhd);
} }
TEH_resume_keys_requests (true); {
MHD_stop_daemon (mhd); MHD_socket sock = MHD_quiesce_daemon (mhd);
TEH_resume_keys_requests (true);
TEH_reserves_get_cleanup ();
MHD_stop_daemon (mhd);
GNUNET_break (0 == close (sock));
}
mhd = NULL; mhd = NULL;
if (cld != waitpid (cld, if (cld != waitpid (cld,
&status, &status,
@ -1494,8 +1500,15 @@ run_main_loop (int fh,
{ {
case GNUNET_OK: case GNUNET_OK:
case GNUNET_SYSERR: case GNUNET_SYSERR:
TEH_resume_keys_requests (true); {
MHD_stop_daemon (mhd); MHD_socket sock = MHD_quiesce_daemon (mhd);
TEH_resume_keys_requests (true);
TEH_reserves_get_cleanup ();
MHD_stop_daemon (mhd);
GNUNET_break (0 == close (sock));
}
mhd = NULL;
break; break;
case GNUNET_NO: case GNUNET_NO:
{ {
@ -1507,7 +1520,9 @@ run_main_loop (int fh,
flags = fcntl (sock, F_GETFD); flags = fcntl (sock, F_GETFD);
GNUNET_assert (-1 != flags); GNUNET_assert (-1 != flags);
flags &= ~FD_CLOEXEC; flags &= ~FD_CLOEXEC;
GNUNET_assert (-1 != fcntl (sock, F_SETFD, flags)); GNUNET_assert (-1 != fcntl (sock,
F_SETFD,
flags));
chld = fork (); chld = fork ();
if (-1 == chld) if (-1 == chld)
{ {
@ -1551,6 +1566,7 @@ run_main_loop (int fh,
sleep (1); sleep (1);
/* Now we're really done, practice clean shutdown */ /* Now we're really done, practice clean shutdown */
TEH_resume_keys_requests (true); TEH_resume_keys_requests (true);
TEH_reserves_get_cleanup ();
MHD_stop_daemon (mhd); MHD_stop_daemon (mhd);
} }
break; break;

View File

@ -1,6 +1,6 @@
/* /*
This file is part of TALER This file is part of TALER
Copyright (C) 2014-2020 Taler Systems SA Copyright (C) 2014-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software terms of the GNU Affero General Public License as published by the Free Software
@ -25,6 +25,7 @@
#include <jansson.h> #include <jansson.h>
#include "taler_mhd_lib.h" #include "taler_mhd_lib.h"
#include "taler_json_lib.h" #include "taler_json_lib.h"
#include "taler_dbevents.h"
#include "taler-exchange-httpd_reserves_get.h" #include "taler-exchange-httpd_reserves_get.h"
#include "taler-exchange-httpd_responses.h" #include "taler-exchange-httpd_responses.h"
@ -49,25 +50,113 @@ struct ReservePoller
*/ */
struct MHD_Connection *connection; struct MHD_Connection *connection;
/**
* Entry in the timeout heap.
*/
struct GNUNET_CONTAINER_HeapNode *hn;
/** /**
* Subscription for the database event we are * Subscription for the database event we are
* waiting for. * waiting for.
*/ */
struct GNUNET_DB_EventHandler *eh; struct TALER_EXCHANGEDB_EventHandler *eh;
/** /**
* When will this request time out? * When will this request time out?
*/ */
struct GNUNET_TIME_Absolute timeout; struct GNUNET_TIME_Absolute timeout;
/**
* True if we are still suspended.
*/
bool suspended;
}; };
/**
* Head of list of requests in long polling.
*/
static struct ReservePoller *rp_head;
/**
* Tail of list of requests in long polling.
*/
static struct ReservePoller *rp_tail;
void
TEH_reserves_get_cleanup ()
{
struct ReservePoller *rp;
while (NULL != (rp = rp_head))
{
GNUNET_CONTAINER_DLL_remove (rp_head,
rp_tail,
rp);
if (rp->suspended)
{
rp->suspended = false;
MHD_resume_connection (rp->connection);
}
}
}
/**
* Function called once a connection is done to
* clean up the `struct ReservePoller` state.
*
* @param rc context to clean up for
*/
static void
rp_cleanup (struct TEH_RequestContext *rc)
{
struct ReservePoller *rp = rc->rh_ctx;
if (NULL != rp->eh)
{
TEH_plugin->event_listen_cancel (TEH_plugin->cls,
rp->eh);
rp->eh = NULL;
}
GNUNET_free (rp);
}
/**
* Function called on events received from Postgres.
* Wakes up long pollers.
*
* @param cls the `struct TEH_RequestContext *`
* @param extra additional event data provided
* @param extra_size number of bytes in @a extra
*/
static void
db_event_cb (void *cls,
const void *extra,
size_t extra_size)
{
struct TEH_RequestContext *rc = cls;
struct ReservePoller *rp = rc->rh_ctx;
struct GNUNET_AsyncScopeSave old_scope;
(void) extra;
(void) extra_size;
if (NULL == rp)
return; /* event triggered while main transaction
was still running */
if (! rp->suspended)
return; /* might get multiple wake-up events */
rp->suspended = false;
GNUNET_async_scope_enter (&rc->async_scope_id,
&old_scope);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Resuming from long-polling on reserve\n");
GNUNET_CONTAINER_DLL_remove (rp_head,
rp_tail,
rp);
MHD_resume_connection (rp->connection);
GNUNET_async_scope_restore (&old_scope);
}
/** /**
* Send reserve history to client. * Send reserve history to client.
* *
@ -157,6 +246,8 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
{ {
struct ReserveHistoryContext rsc; struct ReserveHistoryContext rsc;
MHD_RESULT mhd_ret; MHD_RESULT mhd_ret;
struct GNUNET_TIME_Relative timeout;
struct TALER_EXCHANGEDB_EventHandler *eh = NULL;
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_STRINGS_string_to_data (args[0], GNUNET_STRINGS_string_to_data (args[0],
@ -170,6 +261,47 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
TALER_EC_MERCHANT_GENERIC_RESERVE_PUB_MALFORMED, TALER_EC_MERCHANT_GENERIC_RESERVE_PUB_MALFORMED,
args[0]); args[0]);
} }
{
const char *long_poll_timeout_ms;
long_poll_timeout_ms
= MHD_lookup_connection_value (rc->connection,
MHD_GET_ARGUMENT_KIND,
"timeout_ms");
if (NULL != long_poll_timeout_ms)
{
unsigned int timeout_ms;
char dummy;
if (1 != sscanf (long_poll_timeout_ms,
"%u%c",
&timeout_ms,
&dummy))
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (rc->connection,
MHD_HTTP_BAD_REQUEST,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"timeout_ms (must be non-negative number)");
}
timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
timeout_ms);
}
}
if (! GNUNET_TIME_relative_is_zero (timeout))
{
struct TALER_ReserveEventP rep = {
.header.size = htons (sizeof (rep)),
.header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
.reserve_pub = rsc.reserve_pub
};
eh = TEH_plugin->event_listen (TEH_plugin->cls,
timeout,
&rep.header,
&db_event_cb,
rc);
}
rsc.rh = NULL; rsc.rh = NULL;
if (GNUNET_OK != if (GNUNET_OK !=
TEH_DB_run_transaction (rc->connection, TEH_DB_run_transaction (rc->connection,
@ -178,13 +310,33 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
&reserve_history_transaction, &reserve_history_transaction,
&rsc)) &rsc))
return mhd_ret; return mhd_ret;
/* generate proper response */ /* generate proper response */
if (NULL == rsc.rh) if (NULL == rsc.rh)
return TALER_MHD_reply_with_error (rc->connection, {
MHD_HTTP_NOT_FOUND, struct ReservePoller *rp = rc->rh_ctx;
TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN,
args[0]); if ( (NULL != rp) ||
(GNUNET_TIME_relative_is_zero (timeout)) )
{
return TALER_MHD_reply_with_error (rc->connection,
MHD_HTTP_NOT_FOUND,
TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN,
args[0]);
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Long-polling on reserve for %s\n",
GNUNET_STRINGS_relative_time_to_string (timeout,
GNUNET_YES));
rp = GNUNET_new (struct ReservePoller);
rp->connection = rc->connection;
rp->timeout = GNUNET_TIME_relative_to_absolute (timeout);
rp->eh = eh;
rc->rh_ctx = rp;
rc->rh_cleaner = &rp_cleanup;
rp->suspended = true;
MHD_suspend_connection (rc->connection);
return MHD_YES;
}
mhd_ret = reply_reserve_history_success (rc->connection, mhd_ret = reply_reserve_history_success (rc->connection,
rsc.rh); rsc.rh);
TEH_plugin->free_reserve_history (TEH_plugin->cls, TEH_plugin->free_reserve_history (TEH_plugin->cls,

View File

@ -27,6 +27,15 @@
#include "taler-exchange-httpd.h" #include "taler-exchange-httpd.h"
/**
* Shutdown reserves-get subsystem. Resumes all
* suspended long-polling clients and cleans up
* data structures.
*/
void
TEH_reserves_get_cleanup (void);
/** /**
* Handle a GET "/reserves/" request. Parses the * Handle a GET "/reserves/" request. Parses the
* given "reserve_pub" in @a args (which should contain the * given "reserve_pub" in @a args (which should contain the

View File

@ -109,6 +109,39 @@ struct TALER_EXCHANGEDB_Session
}; };
/**
* Event registration record.
*/
struct TALER_EXCHANGEDB_EventHandler
{
/**
* Underlying GNUnet event handler.
*/
struct GNUNET_DB_EventHandler *geh;
/**
* Entry in the heap.
*/
struct GNUNET_CONTAINER_HeapNode *hn;
/**
* Our timeout.
*/
struct GNUNET_TIME_Absolute timeout;
/**
* Callback to invoke (on @e timeout).
*/
GNUNET_DB_EventCallback cb;
/**
* Closure for @e cb.
*/
void *cb_cls;
};
/** /**
* Type of the "cls" argument given to each of the functions in * Type of the "cls" argument given to each of the functions in
* our API. * our API.
@ -132,6 +165,12 @@ struct PostgresClosure
*/ */
char *sql_dir; char *sql_dir;
/**
* Heap of `struct TALER_EXCHANGEDB_EventHandler`
* by timeout.
*/
struct GNUNET_CONTAINER_Heap *event_heap;
/** /**
* After how long should idle reserves be closed? * After how long should idle reserves be closed?
*/ */
@ -2832,18 +2871,41 @@ handle_events (void *cls)
} }
}; };
nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2; nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2;
struct TALER_EXCHANGEDB_EventHandler *r;
GNUNET_assert (0 == GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock)); pthread_mutex_lock (&pg->event_lock));
while (0 != pg->listener_count) while (0 != pg->listener_count)
{ {
int ret; int ret;
int timeout = -1; /* no timeout */
GNUNET_assert (0 == GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock)); pthread_mutex_unlock (&pg->event_lock));
while (1)
{
r = GNUNET_CONTAINER_heap_peek (pg->event_heap);
if (NULL == r)
break;
if (GNUNET_TIME_absolute_is_future (r->timeout))
break;
GNUNET_assert (r ==
GNUNET_CONTAINER_heap_remove_root (pg->event_heap));
r->hn = NULL;
r->cb (r->cb_cls,
NULL,
0);
}
if (NULL != r)
{
struct GNUNET_TIME_Relative rem;
rem = GNUNET_TIME_absolute_get_remaining (r->timeout);
timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us;
}
ret = poll (pfds, ret = poll (pfds,
nfds, nfds,
-1 /* no timeout */); timeout);
if (-1 == ret) if (-1 == ret)
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"poll"); "poll");
@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls,
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return handle useful to cancel the listener * @return handle useful to cancel the listener
*/ */
static struct GNUNET_DB_EventHandler * static struct TALER_EXCHANGEDB_EventHandler *
postgres_event_listen (void *cls, postgres_event_listen (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct GNUNET_TIME_Relative timeout,
const struct GNUNET_DB_EventHeaderP *es, const struct GNUNET_DB_EventHeaderP *es,
GNUNET_DB_EventCallback cb, GNUNET_DB_EventCallback cb,
void *cb_cls) void *cb_cls)
{ {
struct PostgresClosure *pg = cls; struct PostgresClosure *pg = cls;
struct GNUNET_DB_EventHandler *eh; struct TALER_EXCHANGEDB_EventHandler *eh;
struct TALER_EXCHANGEDB_Session *session;
session = postgres_get_session (pg);
eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler);
eh->cb = cb;
eh->cb_cls = cb_cls;
eh->timeout = GNUNET_TIME_relative_to_absolute (timeout);
eh->geh = GNUNET_PQ_event_listen (session->conn,
es,
cb,
cb_cls);
GNUNET_assert (NULL != eh->geh);
eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap,
eh,
eh->timeout.abs_value_us);
GNUNET_assert (0 == GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock)); pthread_mutex_lock (&pg->event_lock));
pg->listener_count++; pg->listener_count++;
@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls,
} }
GNUNET_assert (0 == GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock)); pthread_mutex_unlock (&pg->event_lock));
eh = GNUNET_PQ_event_listen (session->conn,
es,
cb,
cb_cls);
GNUNET_assert (NULL != eh);
return eh; return eh;
} }
@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls,
*/ */
static void static void
postgres_event_listen_cancel (void *cls, postgres_event_listen_cancel (void *cls,
struct GNUNET_DB_EventHandler *eh) struct TALER_EXCHANGEDB_EventHandler *eh)
{ {
struct PostgresClosure *pg = cls; struct PostgresClosure *pg = cls;
@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls,
} }
GNUNET_assert (0 == GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock)); pthread_mutex_unlock (&pg->event_lock));
GNUNET_PQ_event_listen_cancel (eh); if (NULL != eh->hn)
{
GNUNET_CONTAINER_heap_remove_node (eh->hn);
eh->hn = NULL;
}
GNUNET_PQ_event_listen_cancel (eh->geh);
GNUNET_free (eh);
} }
@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
pg = GNUNET_new (struct PostgresClosure); pg = GNUNET_new (struct PostgresClosure);
pg->cfg = cfg; pg->cfg = cfg;
pg->event_heap = GNUNET_CONTAINER_heap_create (
GNUNET_CONTAINER_HEAP_ORDER_MIN);
pg->main_self = pthread_self (); /* loaded while single-threaded! */ pg->main_self = pthread_self (); /* loaded while single-threaded! */
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_filename (cfg, GNUNET_CONFIGURATION_get_value_filename (cfg,
@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls)
GNUNET_break (0 == GNUNET_break (0 ==
close (pg->event_fd)); close (pg->event_fd));
pthread_mutex_destroy (&pg->event_lock); pthread_mutex_destroy (&pg->event_lock);
GNUNET_CONTAINER_heap_destroy (pg->event_heap);
GNUNET_free (pg->sql_dir); GNUNET_free (pg->sql_dir);
GNUNET_free (pg->currency); GNUNET_free (pg->currency);
GNUNET_free (pg); GNUNET_free (pg);

View File

@ -73,8 +73,31 @@ struct TALER_EXCHANGEDB_DenominationKeyInformationP
}; };
/**
* Signature of events signalling a reseve got funding.
*/
struct TALER_ReserveEventP
{
/**
* Of type #TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING.
*/
struct GNUNET_DB_EventHeaderP header;
/**
* Public key of the reserve the event is about.
*/
struct TALER_ReservePublicKeyP reserve_pub;
};
GNUNET_NETWORK_STRUCT_END GNUNET_NETWORK_STRUCT_END
/**
* Event registration record.
*/
struct TALER_EXCHANGEDB_EventHandler;
/** /**
* Meta data about an exchange online signing key. * Meta data about an exchange online signing key.
*/ */
@ -2149,16 +2172,16 @@ struct TALER_EXCHANGEDB_Plugin
* Register callback to be invoked on events of type @a es. * Register callback to be invoked on events of type @a es.
* *
* @param cls database context to use * @param cls database context to use
* @param session connection to use * @param timeout how long to wait at most
* @param es specification of the event to listen for * @param es specification of the event to listen for
* @param cb function to call when the event happens, possibly * @param cb function to call when the event happens, possibly
* multiple times (until cancel is invoked) * multiple times (until cancel is invoked)
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return handle useful to cancel the listener * @return handle useful to cancel the listener
*/ */
struct GNUNET_DB_EventHandler * struct TALER_EXCHANGEDB_EventHandler *
(*event_listen)(void *cls, (*event_listen)(void *cls,
struct TALER_EXCHANGEDB_Session *session, struct GNUNET_TIME_Relative timeout,
const struct GNUNET_DB_EventHeaderP *es, const struct GNUNET_DB_EventHeaderP *es,
GNUNET_DB_EventCallback cb, GNUNET_DB_EventCallback cb,
void *cb_cls); void *cb_cls);
@ -2171,7 +2194,7 @@ struct TALER_EXCHANGEDB_Plugin
*/ */
void void
(*event_listen_cancel)(void *cls, (*event_listen_cancel)(void *cls,
struct GNUNET_DB_EventHandler *eh); struct TALER_EXCHANGEDB_EventHandler *eh);
/** /**