aboutsummaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-wirewatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r--src/exchange/taler-exchange-wirewatch.c422
1 files changed, 13 insertions, 409 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index a1a3a4ff..a7a6b004 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -495,397 +495,14 @@ transaction_completed (void)
* We got incoming transaction details from the bank. Add them
* to the database.
*
- * @param details array of transaction details
- * @param details_length length of the @a details array
- */
-static void
-process_reply (const struct TALER_BANK_CreditDetails *details,
- unsigned int details_length)
-{
- enum GNUNET_DB_QueryStatus qs;
- bool shard_done;
- uint64_t lroff = latest_row_off;
-
- if (0 == details_length)
- {
- /* Server should have used 204, not 200! */
- GNUNET_break_op (0);
- transaction_completed ();
- return;
- }
- hh_returned_data = true;
- /* check serial IDs for range constraints */
- for (unsigned int i = 0; i<details_length; i++)
- {
- const struct TALER_BANK_CreditDetails *cd = &details[i];
-
- if (cd->serial_id < lroff)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Serial ID %llu not monotonic (got %llu before). Failing!\n",
- (unsigned long long) cd->serial_id,
- (unsigned long long) lroff);
- db_plugin->rollback (db_plugin->cls);
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- if (cd->serial_id > shard_end)
- {
- /* we are *past* the current shard (likely because the serial_id of the
- shard_end happens to not exist in the DB). So commit and stop this
- iteration! */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Serial ID %llu past shard end at %llu, ending iteration early!\n",
- (unsigned long long) cd->serial_id,
- (unsigned long long) shard_end);
- details_length = i;
- progress = true;
- lroff = cd->serial_id - 1;
- break;
- }
- lroff = cd->serial_id;
- }
- if (GNUNET_OK !=
- db_plugin->start_read_committed (db_plugin->cls,
- "wirewatch check for incoming wire transfers"))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- started_transaction = true;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Importing %u transactions\n",
- details_length);
- for (unsigned int i = 0; i<details_length; i++)
- {
- const struct TALER_BANK_CreditDetails *cd = &details[i];
-
- /* FIXME #7276: Consider using Postgres multi-valued insert here,
- for up to 15x speed-up according to
- https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006
- (Note: this may require changing both the
- plugin API as well as modifying how this function is called.) */
- qs = db_plugin->reserves_in_insert (db_plugin->cls,
- &cd->reserve_pub,
- &cd->amount,
- cd->execution_date,
- cd->debit_account_uri,
- ai->section_name,
- cd->serial_id);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Got DB soft error for reserves_in_insert. Rolling back.\n");
- handle_soft_error ();
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- /* Either wirewatch was freshly started after the system was
- shutdown and we're going over an incomplete shard again
- after being restarted, or the shard lock period was too
- short (number of workers set incorrectly?) and a 2nd
- wirewatcher has been stealing our work while we are still
- at it. */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Attempted to import transaction %llu (%s) twice. "
- "This should happen rarely (if not, ask for support).\n",
- (unsigned long long) cd->serial_id,
- job_name);
- break;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Imported transaction %llu.",
- (unsigned long long) cd->serial_id);
- /* normal case */
- progress = true;
- break;
- }
- }
- latest_row_off = lroff;
- shard_done = (shard_end <= latest_row_off);
- if (shard_done)
- {
- /* shard is complete, mark this as well */
- qs = db_plugin->complete_shard (db_plugin->cls,
- job_name,
- shard_start,
- shard_end);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Got DB soft error for complete_shard. Rolling back.\n");
- handle_soft_error ();
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- GNUNET_break (0);
- /* Not expected, but let's just continue */
- break;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* normal case */
- progress = true;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Completed shard %s (%llu,%llu] after %s\n",
- job_name,
- (unsigned long long) shard_start,
- (unsigned long long) shard_end,
- GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_duration (shard_start_time),
- true));
- break;
- }
- }
- if (! progress)
- {
- db_plugin->rollback (db_plugin->cls);
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Committing %s progress (%llu,%llu] at %llu\n (%s)",
- job_name,
- (unsigned long long) shard_start,
- (unsigned long long) shard_end,
- (unsigned long long) latest_row_off,
- shard_done
- ? "shard done"
- : "shard incomplete");
- qs = db_plugin->commit (db_plugin->cls);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- /* reduce transaction size to reduce rollback probability */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Got DB soft error on commit. Reducing transaction size.\n");
- handle_soft_error ();
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- started_transaction = false;
- /* normal case */
- break;
- }
- }
- if (shard_done)
- {
- shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
- shard_open = false;
- transaction_completed ();
- return;
- }
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
- NULL);
-}
-
-
-/**
- * We got incoming transaction details from the bank. Add them
- * to the database.
- *
- * @param details array of transaction details
- * @param details_length length of the @a details array
- */
-static void
-process_reply_batched (const struct TALER_BANK_CreditDetails *details,
- unsigned int details_length)
-{
- enum GNUNET_DB_QueryStatus qs;
- bool shard_done;
- uint64_t lroff = latest_row_off;
-
- if (0 == details_length)
- {
- /* Server should have used 204, not 200! */
- GNUNET_break_op (0);
- transaction_completed ();
- return;
- }
- /* check serial IDs for range constraints */
- for (unsigned int i = 0; i<details_length; i++)
- {
- const struct TALER_BANK_CreditDetails *cd = &details[i];
-
- if (cd->serial_id < lroff)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Serial ID %llu not monotonic (got %llu before). Failing!\n",
- (unsigned long long) cd->serial_id,
- (unsigned long long) lroff);
- db_plugin->rollback (db_plugin->cls);
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- if (cd->serial_id > shard_end)
- {
- /* we are *past* the current shard (likely because the serial_id of the
- shard_end happens to not exist in the DB). So commit and stop this
- iteration! */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Serial ID %llu past shard end at %llu, ending iteration early!\n",
- (unsigned long long) cd->serial_id,
- (unsigned long long) shard_end);
- details_length = i;
- progress = true;
- lroff = cd->serial_id - 1;
- break;
- }
- lroff = cd->serial_id;
- }
- if (0 != details_length)
- {
- enum GNUNET_DB_QueryStatus qss[details_length];
- struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
-
- hh_returned_data = true;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Importing %u transactions\n",
- details_length);
- for (unsigned int i = 0; i<details_length; i++)
- {
- const struct TALER_BANK_CreditDetails *cd = &details[i];
- struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i];
-
- res->reserve_pub = &cd->reserve_pub;
- res->balance = &cd->amount;
- res->execution_time = cd->execution_date;
- res->sender_account_details = cd->debit_account_uri;
- res->exchange_account_name = ai->section_name;
- res->wire_reference = cd->serial_id;
- }
- qs = db_plugin->batch_reserves_in_insert (db_plugin->cls,
- reserves,
- details_length,
- qss);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Got DB soft error for batch_reserves_in_insert. Rolling back.\n");
- handle_soft_error ();
- return;
- default:
- break;
- }
- for (unsigned int i = 0; i<details_length; i++)
- {
- const struct TALER_BANK_CreditDetails *cd = &details[i];
-
- switch (qss[i])
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
- i);
- handle_soft_error ();
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- /* Either wirewatch was freshly started after the system was
- shutdown and we're going over an incomplete shard again
- after being restarted, or the shard lock period was too
- short (number of workers set incorrectly?) and a 2nd
- wirewatcher has been stealing our work while we are still
- at it. */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Attempted to import transaction %llu (%s) twice. "
- "This should happen rarely (if not, ask for support).\n",
- (unsigned long long) cd->serial_id,
- job_name);
- break;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Imported transaction %llu.",
- (unsigned long long) cd->serial_id);
- /* normal case */
- progress = true;
- break;
- }
- }
- }
-
- latest_row_off = lroff;
- shard_done = (shard_end <= latest_row_off);
- if (shard_done)
- {
- /* shard is complete, mark this as well */
- qs = db_plugin->complete_shard (db_plugin->cls,
- job_name,
- shard_start,
- shard_end);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Got DB soft error for complete_shard. Rolling back.\n");
- handle_soft_error ();
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- GNUNET_break (0);
- /* Not expected, but let's just continue */
- break;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* normal case */
- progress = true;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Completed shard %s (%llu,%llu] after %s\n",
- job_name,
- (unsigned long long) shard_start,
- (unsigned long long) shard_end,
- GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_duration (shard_start_time),
- true));
- break;
- }
- shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
- shard_open = false;
- transaction_completed ();
- return;
- }
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
- NULL);
-}
-
-
-/**
- * We got incoming transaction details from the bank. Add them
- * to the database.
- *
* @param batch_size desired batch size
* @param details array of transaction details
* @param details_length length of the @a details array
*/
static void
-process_reply_batched2 (unsigned int batch_size,
- const struct TALER_BANK_CreditDetails *details,
- unsigned int details_length)
+process_reply (unsigned int batch_size,
+ const struct TALER_BANK_CreditDetails *details,
+ unsigned int details_length)
{
enum GNUNET_DB_QueryStatus qs;
bool shard_done;
@@ -950,11 +567,11 @@ process_reply_batched2 (unsigned int batch_size,
res->exchange_account_name = ai->section_name;
res->wire_reference = cd->serial_id;
}
- qs = db_plugin->batch2_reserves_in_insert (db_plugin->cls,
- reserves,
- details_length,
- batch_size,
- qss);
+ qs = db_plugin->reserves_in_insert (db_plugin->cls,
+ reserves,
+ details_length,
+ batch_size,
+ qss);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
@@ -1074,7 +691,7 @@ history_cb (void *cls,
(void) cls;
if (-2 == batch_mode)
{
- const char *mode = getenv ("TALER_USE_BATCH");
+ const char *mode = getenv ("TALER_WIREWATCH_BATCH_SIZE");
char dummy;
if ( (NULL == mode) ||
@@ -1087,7 +704,7 @@ history_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Bad batch mode `%s' specified\n",
mode);
- batch_mode = -1;
+ batch_mode = 8; /* maximum supported is currently 8 */
}
}
GNUNET_assert (NULL == task);
@@ -1098,22 +715,9 @@ history_cb (void *cls,
switch (reply->http_status)
{
case MHD_HTTP_OK:
- switch (batch_mode)
- {
- case -1:
- process_reply (reply->details.success.details,
- reply->details.success.details_length);
- break;
- case 0:
- process_reply_batched (reply->details.success.details,
- reply->details.success.details_length);
- break;
- default:
- process_reply_batched2 ((unsigned int) batch_mode,
- reply->details.success.details,
- reply->details.success.details_length);
- break;
- }
+ process_reply (batch_mode,
+ reply->details.success.details,
+ reply->details.success.details_length);
return;
case MHD_HTTP_NO_CONTENT:
transaction_completed ();