modify wire auditor to deal with asynchrony of WIRE plugin API

This commit is contained in:
Christian Grothoff 2017-09-30 21:28:17 +02:00
parent 6a4f6b1836
commit 85a2d3dc0e
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC

View File

@ -87,11 +87,68 @@ static struct TALER_MasterPublicKeyP master_pub;
*/
static struct TALER_WIRE_Plugin *wp;
/**
* Active wire request for the transaction history.
*/
static struct TALER_WIRE_HistoryHandle *hh;
/**
* Query status for the incremental processing status in the auditordb.
*/
static enum GNUNET_DB_QueryStatus qsx;
/**
* Last reserve_in / reserve_out serial IDs seen.
*/
static struct TALER_AUDITORDB_WireProgressPoint pp;
/**
* Where we are in the inbound (CREDIT) transaction history.
*/
static void *in_wire_off;
/**
* Where we are in the inbound (DEBIT) transaction history.
*/
static void *out_wire_off;
/**
* Number of bytes in #in_wire_off and #out_wire_off.
*/
static size_t wire_off_size;
/* ***************************** Shutdown **************************** */
/**
* Task run on shutdown.
*/
static void
do_shutdown ()
{
if (NULL != hh)
{
wp->get_history_cancel (wp->cls,
hh);
hh = NULL;
}
if (NULL != wp)
{
TALER_WIRE_plugin_unload (wp);
wp = NULL;
}
if (NULL != adb)
{
TALER_AUDITORDB_plugin_unload (adb);
adb = NULL;
}
if (NULL != edb)
{
TALER_EXCHANGEDB_plugin_unload (edb);
edb = NULL;
}
}
/* ***************************** Report logic **************************** */
@ -140,107 +197,18 @@ report_row_minor_inconsistency (const char *table,
#endif
/* ***************************** Analyze reserves_in ************************ */
/* This logic checks the reserves_in table */
/**
* Analyze reserves for being well-formed.
*
* @param cls NULL
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
analyze_reserves_in (void *cls)
{
/* FIXME: #4958 */
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
/* ***************************** Analyze reserves_out ************************ */
/* This logic checks the reserves_out table */
/**
* Analyze reserves for being well-formed.
*
* @param cls NULL
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
analyze_reserves_out (void *cls)
{
#if 0
// FIXME: start_off != rowid!
hh = wp->get_history (wp->cls,
TALER_BANK_DIRECTION_CREDIT,
&start_off,
sizeof (start_off),
INT64_MAX,
&history_cb,
NULL);
#endif
/* FIXME: #4958 */
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
/* *************************** General transaction logic ****************** */
/**
* Type of an analysis function. Each analysis function runs in
* its own transaction scope and must thus be internally consistent.
* Commit the transaction, checkpointing our progress in the auditor
* DB.
*
* @param cls closure
* @return transaction status code
*/
typedef enum GNUNET_DB_QueryStatus
(*Analysis)(void *cls);
/**
* Perform the given @a analysis incrementally, checkpointing our
* progress in the auditor DB.
*
* @param analysis analysis to run
* @param analysis_cls closure for @a analysis
* @param qs transaction status so far
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
incremental_processing (Analysis analysis,
void *analysis_cls)
commit (enum GNUNET_DB_QueryStatus qs)
{
enum GNUNET_DB_QueryStatus qs;
enum GNUNET_DB_QueryStatus qsx;
void *in_wire_off;
void *out_wire_off;
size_t wire_off_size;
qsx = adb->get_wire_auditor_progress (adb->cls,
asession,
&master_pub,
&pp,
&in_wire_off,
&out_wire_off,
&wire_off_size);
if (0 > qsx)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
return qsx;
}
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx)
{
GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
_("First analysis using this auditor, starting audit from scratch\n"));
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Resuming audit at %llu/%llu\n"),
(unsigned long long) pp.last_reserve_in_serial_id,
(unsigned long long) pp.last_reserve_out_serial_id);
}
qs = analysis (analysis_cls);
// FIXME: wire plugin does NOT support synchronous activity!
if (0 > qs)
{
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@ -248,7 +216,11 @@ incremental_processing (Analysis analysis,
"Serialization issue, not recording progress\n");
else
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Hard database error, not recording progress\n");
"Hard error, not recording progress\n");
adb->rollback (adb->cls,
asession);
edb->rollback (edb->cls,
esession);
return qs;
}
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx)
@ -279,43 +251,7 @@ incremental_processing (Analysis analysis,
_("Concluded audit step at %llu/%llu\n"),
(unsigned long long) pp.last_reserve_in_serial_id,
(unsigned long long) pp.last_reserve_out_serial_id);
return qs;
}
/**
* Perform the given @a analysis within a transaction scope.
* Commit on success.
*
* @param analysis analysis to run
* @param analysis_cls closure for @a analysis
* @return #GNUNET_OK if @a analysis succeessfully committed,
* #GNUNET_NO if we had an error on commit (retry may help)
* #GNUNET_SYSERR on hard errors
*/
static int
transact (Analysis analysis,
void *analysis_cls)
{
int ret;
enum GNUNET_DB_QueryStatus qs;
ret = adb->start (adb->cls,
asession);
if (GNUNET_OK != ret)
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
ret = edb->start (edb->cls,
esession);
if (GNUNET_OK != ret)
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
qs = incremental_processing (analysis,
analysis_cls);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
qs = edb->commit (edb->cls,
@ -353,46 +289,45 @@ transact (Analysis analysis,
}
/* ***************************** Analyze reserves_in ************************ */
/**
* Initialize DB sessions and run the analysis.
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
*
* @param cls closure
* @param dir direction of the transfer
* @param row_off identification of the position at which we are querying
* @param row_off_size number of bytes in @a row_off
* @param details details about the wire transfer
* @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
*/
static void
setup_sessions_and_run ()
static int
history_credit_cb (void *cls,
enum TALER_BANK_Direction dir,
const void *row_off,
size_t row_off_size,
const struct TALER_WIRE_TransferDetails *details)
{
esession = edb->get_session (edb->cls);
if (NULL == esession)
if (NULL == details)
{
fprintf (stderr,
"Failed to initialize exchange session.\n");
global_ret = 1;
return;
/* end of operation */
hh = NULL;
/* TODO: also check DEBITs! */
commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT);
GNUNET_SCHEDULER_shutdown ();
return GNUNET_SYSERR;
}
asession = adb->get_session (adb->cls);
if (NULL == asession)
{
fprintf (stderr,
"Failed to initialize auditor session.\n");
global_ret = 1;
return;
}
wp = TALER_WIRE_plugin_load (cfg,
wire_plugin);
if (NULL == wp)
{
fprintf (stderr,
"Failed to load wire plugin `%s'\n",
wire_plugin);
global_ret = 1;
return;
}
// FIXME: wire plugin does NOT support synchronous activity!
transact (&analyze_reserves_in,
NULL);
transact (&analyze_reserves_out,
NULL);
/* TODO: implement actual checks! */
return GNUNET_OK;
}
/* ***************************** Setup logic ************************ */
/**
* Main function that will be run.
*
@ -407,6 +342,8 @@ run (void *cls,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
int ret;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Launching auditor\n");
cfg = c;
@ -458,18 +395,100 @@ run (void *cls,
GNUNET_break (GNUNET_OK ==
adb->create_tables (adb->cls));
}
GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
NULL);
esession = edb->get_session (edb->cls);
if (NULL == esession)
{
fprintf (stderr,
"Failed to initialize exchange session.\n");
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
asession = adb->get_session (adb->cls);
if (NULL == asession)
{
fprintf (stderr,
"Failed to initialize auditor session.\n");
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
wp = TALER_WIRE_plugin_load (cfg,
wire_plugin);
if (NULL == wp)
{
fprintf (stderr,
"Failed to load wire plugin `%s'\n",
wire_plugin);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting audit\n");
setup_sessions_and_run ();
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Audit complete\n");
if (NULL != wp)
TALER_WIRE_plugin_unload (wp);
if (NULL != adb)
TALER_AUDITORDB_plugin_unload (adb);
if (NULL != edb)
TALER_EXCHANGEDB_plugin_unload (edb);
ret = adb->start (adb->cls,
asession);
if (GNUNET_OK != ret)
{
GNUNET_break (0);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
ret = edb->start (edb->cls,
esession);
if (GNUNET_OK != ret)
{
GNUNET_break (0);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
qsx = adb->get_wire_auditor_progress (adb->cls,
asession,
&master_pub,
&pp,
&in_wire_off,
&out_wire_off,
&wire_off_size);
if (0 > qsx)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx)
{
GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
_("First analysis using this auditor, starting audit from scratch\n"));
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Resuming audit at %llu/%llu\n"),
(unsigned long long) pp.last_reserve_in_serial_id,
(unsigned long long) pp.last_reserve_out_serial_id);
}
hh = wp->get_history (wp->cls,
TALER_BANK_DIRECTION_CREDIT,
in_wire_off,
wire_off_size,
INT64_MAX,
&history_credit_cb,
NULL);
if (NULL == hh)
{
fprintf (stderr,
"Failed to obtain bank transaction history\n");
commit (GNUNET_DB_STATUS_HARD_ERROR);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
}