refactor wirewatch to enable use of batch API

This commit is contained in:
Christian Grothoff 2022-12-19 21:41:32 +01:00
parent 709ca561d2
commit b6b80e61f4
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
5 changed files with 274 additions and 82 deletions

@ -1 +1 @@
Subproject commit 20f8eb7a72e2160409f0f78264ec5198e9caa193
Subproject commit 149aa0a08d787419e02277ef231d93c6a0154a47

View File

@ -656,6 +656,185 @@ process_reply (const struct TALER_BANK_CreditDetails *details,
}
/**
* We got incoming transaction details from the bank. Add them
* to the database.
*
* @param details array of transaction details
* @param details_length length of the @a details array
*/
static void
process_reply_batched (const struct TALER_BANK_CreditDetails *details,
unsigned int details_length)
{
enum GNUNET_DB_QueryStatus qs;
bool shard_done;
uint64_t lroff = latest_row_off;
if (0 == details_length)
{
/* Server should have used 204, not 200! */
GNUNET_break_op (0);
transaction_completed ();
return;
}
/* check serial IDs for range constraints */
for (unsigned int i = 0; i<details_length; i++)
{
const struct TALER_BANK_CreditDetails *cd = &details[i];
if (cd->serial_id < lroff)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Serial ID %llu not monotonic (got %llu before). Failing!\n",
(unsigned long long) cd->serial_id,
(unsigned long long) lroff);
db_plugin->rollback (db_plugin->cls);
GNUNET_SCHEDULER_shutdown ();
return;
}
if (cd->serial_id > shard_end)
{
/* we are *past* the current shard (likely because the serial_id of the
shard_end happens to not exist in the DB). So commit and stop this
iteration! */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serial ID %llu past shard end at %llu, ending iteration early!\n",
(unsigned long long) cd->serial_id,
(unsigned long long) shard_end);
details_length = i;
progress = true;
lroff = cd->serial_id - 1;
break;
}
lroff = cd->serial_id;
}
if (0 != details_length)
{
enum GNUNET_DB_QueryStatus qss[details_length];
struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Importing %u transactions\n",
details_length);
for (unsigned int i = 0; i<details_length; i++)
{
const struct TALER_BANK_CreditDetails *cd = &details[i];
struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i];
res->reserve_pub = &cd->reserve_pub;
res->balance = &cd->amount;
res->execution_time = cd->execution_date;
res->sender_account_details = cd->debit_account_uri;
res->exchange_account_name = ai->section_name;
res->wire_reference = cd->serial_id;
}
qs = db_plugin->batch_reserves_in_insert (db_plugin->cls,
reserves,
details_length,
qss);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for batch_reserves_in_insert. Rolling back.\n");
handle_soft_error ();
return;
default:
break;
}
for (unsigned int i = 0; i<details_length; i++)
{
const struct TALER_BANK_CreditDetails *cd = &details[i];
switch (qss[i])
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
i);
handle_soft_error ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* Either wirewatch was freshly started after the system was
shutdown and we're going over an incomplete shard again
after being restarted, or the shard lock period was too
short (number of workers set incorrectly?) and a 2nd
wirewatcher has been stealing our work while we are still
at it. */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Attempted to import transaction %llu (%s) twice. "
"This should happen rarely (if not, ask for support).\n",
(unsigned long long) cd->serial_id,
job_name);
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Imported transaction %llu.",
(unsigned long long) cd->serial_id);
/* normal case */
progress = true;
break;
}
}
}
latest_row_off = lroff;
shard_done = (shard_end <= latest_row_off);
if (shard_done)
{
/* shard is complete, mark this as well */
qs = db_plugin->complete_shard (db_plugin->cls,
job_name,
shard_start,
shard_end);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for complete_shard. Rolling back.\n");
handle_soft_error ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0);
/* Not expected, but let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
progress = true;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Completed shard %s (%llu,%llu] after %s\n",
job_name,
(unsigned long long) shard_start,
(unsigned long long) shard_end,
GNUNET_STRINGS_relative_time_to_string (
GNUNET_TIME_absolute_get_duration (shard_start_time),
true));
break;
}
shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
shard_open = false;
transaction_completed ();
return;
}
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
NULL);
}
/**
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
@ -667,7 +846,11 @@ static void
history_cb (void *cls,
const struct TALER_BANK_CreditHistoryResponse *reply)
{
static int batch_mode = -1;
(void) cls;
if (-1 == batch_mode)
batch_mode = (NULL != getenv ("TALER_USE_BATCH"));
GNUNET_assert (NULL == task);
hh = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@ -676,8 +859,12 @@ history_cb (void *cls,
switch (reply->http_status)
{
case MHD_HTTP_OK:
if (0 == batch_mode)
process_reply (reply->details.success.details,
reply->details.success.details_length);
else
process_reply_batched (reply->details.success.details,
reply->details.success.details_length);
return;
case MHD_HTTP_NO_CONTENT:
transaction_completed ();

View File

@ -54,7 +54,8 @@ compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
enum GNUNET_DB_QueryStatus
TEH_PG_batch2_reserves_in_insert (void *cls,
const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
const struct
TALER_EXCHANGEDB_ReserveInInfo *reserves,
unsigned int reserves_length,
enum GNUNET_DB_QueryStatus *results)
{
@ -102,14 +103,14 @@ TEH_PG_batch2_reserves_in_insert (void *cls,
pg->legal_reserve_expiration_time));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating reserve %s with expiration in %s\n",
TALER_B2S (&(reserves->reserve_pub)),
TALER_B2S (reserves->reserve_pub),
GNUNET_STRINGS_relative_time_to_string (
pg->idle_reserve_expiration_time,
GNUNET_NO));
{
if (GNUNET_OK !=
TEH_PG_start_read_committed(pg,
TEH_PG_start_read_committed (pg,
"READ_COMMITED"))
{
GNUNET_break (0);
@ -120,31 +121,31 @@ TEH_PG_batch2_reserves_in_insert (void *cls,
time; we do this before adding the actual transaction to "reserves_in",
as for a new reserve it can't be a duplicate 'add' operation, and as
the 'add' operation needs the reserve entry as a foreign key. */
for (unsigned int i=0;i<reserves_length;i++)
for (unsigned int i = 0; i<reserves_length; i++)
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
notify_s[i] = compute_notify_on_reserve (&reserve->reserve_pub);
notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
}
for (unsigned int i=0;i<(reserves_length & ~1);i+=2)
for (unsigned int i = 0; i<(reserves_length & ~1); i += 2)
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve0 = &reserves[i];
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve1 = &reserves[i+1];
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve1 = &reserves[i + 1];
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&reserve0->reserve_pub),
GNUNET_PQ_query_param_auto_from_type (reserve0->reserve_pub),
GNUNET_PQ_query_param_timestamp (&expiry),
GNUNET_PQ_query_param_timestamp (&gc),
GNUNET_PQ_query_param_uint64 (&reserve0->wire_reference),
TALER_PQ_query_param_amount (&reserve0->balance),
TALER_PQ_query_param_amount (reserve0->balance),
GNUNET_PQ_query_param_string (reserve0->exchange_account_name),
GNUNET_PQ_query_param_timestamp (&reserve0->execution_time),
GNUNET_PQ_query_param_auto_from_type (&h_payto),
GNUNET_PQ_query_param_string (reserve0->sender_account_details),
GNUNET_PQ_query_param_timestamp (&reserve_expiration),
GNUNET_PQ_query_param_string (notify_s[i]),
GNUNET_PQ_query_param_auto_from_type (&reserve1->reserve_pub),
GNUNET_PQ_query_param_auto_from_type (reserve1->reserve_pub),
GNUNET_PQ_query_param_uint64 (&reserve1->wire_reference),
TALER_PQ_query_param_amount (&reserve1->balance),
TALER_PQ_query_param_amount (reserve1->balance),
GNUNET_PQ_query_param_string (reserve1->exchange_account_name),
GNUNET_PQ_query_param_timestamp (&reserve1->execution_time),
GNUNET_PQ_query_param_auto_from_type (&h_payto),
@ -190,7 +191,8 @@ TEH_PG_batch2_reserves_in_insert (void *cls,
}
if (reserves_length & 1)
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[reserves_length-1];
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve =
&reserves[reserves_length - 1];
// single insert logic here
}
GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
@ -201,7 +203,8 @@ TEH_PG_batch2_reserves_in_insert (void *cls,
conflicts2[i] = conflicted2;
// fprintf(stdout, "%d", conflicts[i]);
// fprintf(stdout, "%d", conflicts2[i]);
if ((!conflicts[i] && transaction_duplicate) ||(!conflicts2[i] && transaction_duplicate2))
if ((! conflicts[i] && transaction_duplicate) || (! conflicts2[i] &&
transaction_duplicate2))
{
GNUNET_break (0);
TEH_PG_rollback (pg);
@ -218,12 +221,12 @@ TEH_PG_batch2_reserves_in_insert (void *cls,
return cs;
}
if (!need_update)
if (! need_update)
goto exit;
// begin serializable
{
if (GNUNET_OK !=
TEH_PG_start(pg,
TEH_PG_start (pg,
"reserve-insert-continued"))
{
GNUNET_break (0);
@ -236,17 +239,17 @@ TEH_PG_batch2_reserves_in_insert (void *cls,
"reserves_in_add_transaction",
"SELECT batch_reserves_update"
" ($1,$2,$3,$4,$5,$6,$7,$8,$9);");
for (unsigned int i=0;i<reserves_length;i++)
for (unsigned int i = 0; i<reserves_length; i++)
{
if (! conflicts[i])
continue;
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
GNUNET_PQ_query_param_auto_from_type (reserve->reserve_pub),
GNUNET_PQ_query_param_timestamp (&expiry),
GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
TALER_PQ_query_param_amount (&reserve->balance),
TALER_PQ_query_param_amount (reserve->balance),
GNUNET_PQ_query_param_string (reserve->exchange_account_name),
GNUNET_PQ_query_param_bool (conflicted),
GNUNET_PQ_query_param_auto_from_type (&h_payto),
@ -274,8 +277,8 @@ TEH_PG_batch2_reserves_in_insert (void *cls,
return cs;
}
exit:
for (unsigned int i=0;i<reserves_length;i++)
exit:
for (unsigned int i = 0; i<reserves_length; i++)
GNUNET_free (notify_s[i]);
return reserves_length;

View File

@ -54,9 +54,9 @@ compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
enum GNUNET_DB_QueryStatus
TEH_PG_batch_reserves_in_insert (void *cls,
const struct
TALER_EXCHANGEDB_ReserveInInfo *reserves,
TEH_PG_batch_reserves_in_insert (
void *cls,
const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
unsigned int reserves_length,
enum GNUNET_DB_QueryStatus *results)
{
@ -96,14 +96,14 @@ TEH_PG_batch_reserves_in_insert (void *cls,
pg->legal_reserve_expiration_time));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating reserve %s with expiration in %s\n",
TALER_B2S (&(reserves->reserve_pub)),
TALER_B2S (reserves->reserve_pub),
GNUNET_STRINGS_relative_time_to_string (
pg->idle_reserve_expiration_time,
GNUNET_NO));
{
if (GNUNET_OK !=
TEH_PG_start_read_committed(pg,
TEH_PG_start_read_committed (pg,
"READ_COMMITED"))
{
GNUNET_break (0);
@ -117,18 +117,18 @@ TEH_PG_batch_reserves_in_insert (void *cls,
for (unsigned int i = 0; i<reserves_length; i++)
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
notify_s[i] = compute_notify_on_reserve (&reserve->reserve_pub);
notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
}
for (unsigned int i=0;i<reserves_length;i++)
for (unsigned int i = 0; i<reserves_length; i++)
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
GNUNET_PQ_query_param_auto_from_type (reserve->reserve_pub),
GNUNET_PQ_query_param_timestamp (&expiry),
GNUNET_PQ_query_param_timestamp (&gc),
GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
TALER_PQ_query_param_amount (&reserve->balance),
TALER_PQ_query_param_amount (reserve->balance),
GNUNET_PQ_query_param_string (reserve->exchange_account_name),
GNUNET_PQ_query_param_timestamp (&reserve->execution_time),
GNUNET_PQ_query_param_auto_from_type (&h_payto),
@ -170,7 +170,7 @@ TEH_PG_batch_reserves_in_insert (void *cls,
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
conflicts[i] = conflicted;
// fprintf(stdout, "%d", conflicts[i]);
if (!conflicts[i] && transaction_duplicate)
if (! conflicts[i] && transaction_duplicate)
{
GNUNET_break (0);
TEH_PG_rollback (pg);
@ -187,12 +187,12 @@ TEH_PG_batch_reserves_in_insert (void *cls,
return cs;
}
if (!need_update)
if (! need_update)
goto exit;
// begin serializable
{
if (GNUNET_OK !=
TEH_PG_start(pg,
TEH_PG_start (pg,
"reserve-insert-continued"))
{
GNUNET_break (0);
@ -205,17 +205,17 @@ TEH_PG_batch_reserves_in_insert (void *cls,
"reserves_in_add_transaction",
"SELECT batch_reserves_update"
" ($1,$2,$3,$4,$5,$6,$7,$8,$9);");
for (unsigned int i=0;i<reserves_length;i++)
for (unsigned int i = 0; i<reserves_length; i++)
{
if (! conflicts[i])
continue;
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
GNUNET_PQ_query_param_auto_from_type (reserve->reserve_pub),
GNUNET_PQ_query_param_timestamp (&expiry),
GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
TALER_PQ_query_param_amount (&reserve->balance),
TALER_PQ_query_param_amount (reserve->balance),
GNUNET_PQ_query_param_string (reserve->exchange_account_name),
GNUNET_PQ_query_param_bool (conflicted),
GNUNET_PQ_query_param_auto_from_type (&h_payto),
@ -243,8 +243,8 @@ TEH_PG_batch_reserves_in_insert (void *cls,
return cs;
}
exit:
for (unsigned int i=0;i<reserves_length;i++)
exit:
for (unsigned int i = 0; i<reserves_length; i++)
GNUNET_free (notify_s[i]);
return reserves_length;

View File

@ -2573,8 +2573,8 @@ struct TALER_EXCHANGEDB_KycStatus
struct TALER_EXCHANGEDB_ReserveInInfo
{
struct TALER_ReservePublicKeyP reserve_pub;
struct TALER_Amount balance;
const struct TALER_ReservePublicKeyP *reserve_pub;
const struct TALER_Amount *balance;
struct GNUNET_TIME_Timestamp execution_time;
const char *sender_account_details;
const char *exchange_account_name;
@ -3447,12 +3447,13 @@ struct TALER_EXCHANGEDB_Plugin
* set to the status of the
*/
enum GNUNET_DB_QueryStatus
(*batch_reserves_in_insert)(void *cls,
const struct
TALER_EXCHANGEDB_ReserveInInfo *reserves,
(*batch_reserves_in_insert)(
void *cls,
const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
unsigned int reserves_length,
enum GNUNET_DB_QueryStatus *results);
/**
* Insert a batch of incoming transaction into reserves. New reserves are
* also created through this function.
@ -3464,12 +3465,13 @@ struct TALER_EXCHANGEDB_Plugin
* set to the status of the
*/
enum GNUNET_DB_QueryStatus
(*batch2_reserves_in_insert)(void *cls,
const struct
TALER_EXCHANGEDB_ReserveInInfo *reserves,
(*batch2_reserves_in_insert)(
void *cls,
const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
unsigned int reserves_length,
enum GNUNET_DB_QueryStatus *results);
/**
* Locate a nonce for use with a particular public key.
*