diff options
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 422 |
1 files changed, 13 insertions, 409 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index a1a3a4ff..a7a6b004 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -495,397 +495,14 @@ transaction_completed (void) * We got incoming transaction details from the bank. Add them * to the database. * - * @param details array of transaction details - * @param details_length length of the @a details array - */ -static void -process_reply (const struct TALER_BANK_CreditDetails *details, - unsigned int details_length) -{ - enum GNUNET_DB_QueryStatus qs; - bool shard_done; - uint64_t lroff = latest_row_off; - - if (0 == details_length) - { - /* Server should have used 204, not 200! */ - GNUNET_break_op (0); - transaction_completed (); - return; - } - hh_returned_data = true; - /* check serial IDs for range constraints */ - for (unsigned int i = 0; i<details_length; i++) - { - const struct TALER_BANK_CreditDetails *cd = &details[i]; - - if (cd->serial_id < lroff) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Serial ID %llu not monotonic (got %llu before). Failing!\n", - (unsigned long long) cd->serial_id, - (unsigned long long) lroff); - db_plugin->rollback (db_plugin->cls); - GNUNET_SCHEDULER_shutdown (); - return; - } - if (cd->serial_id > shard_end) - { - /* we are *past* the current shard (likely because the serial_id of the - shard_end happens to not exist in the DB). So commit and stop this - iteration! */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Serial ID %llu past shard end at %llu, ending iteration early!\n", - (unsigned long long) cd->serial_id, - (unsigned long long) shard_end); - details_length = i; - progress = true; - lroff = cd->serial_id - 1; - break; - } - lroff = cd->serial_id; - } - if (GNUNET_OK != - db_plugin->start_read_committed (db_plugin->cls, - "wirewatch check for incoming wire transfers")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; - } - started_transaction = true; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Importing %u transactions\n", - details_length); - for (unsigned int i = 0; i<details_length; i++) - { - const struct TALER_BANK_CreditDetails *cd = &details[i]; - - /* FIXME #7276: Consider using Postgres multi-valued insert here, - for up to 15x speed-up according to - https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006 - (Note: this may require changing both the - plugin API as well as modifying how this function is called.) */ - qs = db_plugin->reserves_in_insert (db_plugin->cls, - &cd->reserve_pub, - &cd->amount, - cd->execution_date, - cd->debit_account_uri, - ai->section_name, - cd->serial_id); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for reserves_in_insert. Rolling back.\n"); - handle_soft_error (); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* Either wirewatch was freshly started after the system was - shutdown and we're going over an incomplete shard again - after being restarted, or the shard lock period was too - short (number of workers set incorrectly?) and a 2nd - wirewatcher has been stealing our work while we are still - at it. */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Attempted to import transaction %llu (%s) twice. " - "This should happen rarely (if not, ask for support).\n", - (unsigned long long) cd->serial_id, - job_name); - break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Imported transaction %llu.", - (unsigned long long) cd->serial_id); - /* normal case */ - progress = true; - break; - } - } - latest_row_off = lroff; - shard_done = (shard_end <= latest_row_off); - if (shard_done) - { - /* shard is complete, mark this as well */ - qs = db_plugin->complete_shard (db_plugin->cls, - job_name, - shard_start, - shard_end); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for complete_shard. Rolling back.\n"); - handle_soft_error (); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_break (0); - /* Not expected, but let's just continue */ - break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ - progress = true; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Completed shard %s (%llu,%llu] after %s\n", - job_name, - (unsigned long long) shard_start, - (unsigned long long) shard_end, - GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_duration (shard_start_time), - true)); - break; - } - } - if (! progress) - { - db_plugin->rollback (db_plugin->cls); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Committing %s progress (%llu,%llu] at %llu\n (%s)", - job_name, - (unsigned long long) shard_start, - (unsigned long long) shard_end, - (unsigned long long) latest_row_off, - shard_done - ? "shard done" - : "shard incomplete"); - qs = db_plugin->commit (db_plugin->cls); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - /* reduce transaction size to reduce rollback probability */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error on commit. Reducing transaction size.\n"); - handle_soft_error (); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - started_transaction = false; - /* normal case */ - break; - } - } - if (shard_done) - { - shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); - shard_open = false; - transaction_completed (); - return; - } - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - NULL); -} - - -/** - * We got incoming transaction details from the bank. Add them - * to the database. - * - * @param details array of transaction details - * @param details_length length of the @a details array - */ -static void -process_reply_batched (const struct TALER_BANK_CreditDetails *details, - unsigned int details_length) -{ - enum GNUNET_DB_QueryStatus qs; - bool shard_done; - uint64_t lroff = latest_row_off; - - if (0 == details_length) - { - /* Server should have used 204, not 200! */ - GNUNET_break_op (0); - transaction_completed (); - return; - } - /* check serial IDs for range constraints */ - for (unsigned int i = 0; i<details_length; i++) - { - const struct TALER_BANK_CreditDetails *cd = &details[i]; - - if (cd->serial_id < lroff) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Serial ID %llu not monotonic (got %llu before). Failing!\n", - (unsigned long long) cd->serial_id, - (unsigned long long) lroff); - db_plugin->rollback (db_plugin->cls); - GNUNET_SCHEDULER_shutdown (); - return; - } - if (cd->serial_id > shard_end) - { - /* we are *past* the current shard (likely because the serial_id of the - shard_end happens to not exist in the DB). So commit and stop this - iteration! */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Serial ID %llu past shard end at %llu, ending iteration early!\n", - (unsigned long long) cd->serial_id, - (unsigned long long) shard_end); - details_length = i; - progress = true; - lroff = cd->serial_id - 1; - break; - } - lroff = cd->serial_id; - } - if (0 != details_length) - { - enum GNUNET_DB_QueryStatus qss[details_length]; - struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length]; - - hh_returned_data = true; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Importing %u transactions\n", - details_length); - for (unsigned int i = 0; i<details_length; i++) - { - const struct TALER_BANK_CreditDetails *cd = &details[i]; - struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i]; - - res->reserve_pub = &cd->reserve_pub; - res->balance = &cd->amount; - res->execution_time = cd->execution_date; - res->sender_account_details = cd->debit_account_uri; - res->exchange_account_name = ai->section_name; - res->wire_reference = cd->serial_id; - } - qs = db_plugin->batch_reserves_in_insert (db_plugin->cls, - reserves, - details_length, - qss); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for batch_reserves_in_insert. Rolling back.\n"); - handle_soft_error (); - return; - default: - break; - } - for (unsigned int i = 0; i<details_length; i++) - { - const struct TALER_BANK_CreditDetails *cd = &details[i]; - - switch (qss[i]) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n", - i); - handle_soft_error (); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* Either wirewatch was freshly started after the system was - shutdown and we're going over an incomplete shard again - after being restarted, or the shard lock period was too - short (number of workers set incorrectly?) and a 2nd - wirewatcher has been stealing our work while we are still - at it. */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Attempted to import transaction %llu (%s) twice. " - "This should happen rarely (if not, ask for support).\n", - (unsigned long long) cd->serial_id, - job_name); - break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Imported transaction %llu.", - (unsigned long long) cd->serial_id); - /* normal case */ - progress = true; - break; - } - } - } - - latest_row_off = lroff; - shard_done = (shard_end <= latest_row_off); - if (shard_done) - { - /* shard is complete, mark this as well */ - qs = db_plugin->complete_shard (db_plugin->cls, - job_name, - shard_start, - shard_end); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for complete_shard. Rolling back.\n"); - handle_soft_error (); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_break (0); - /* Not expected, but let's just continue */ - break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ - progress = true; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Completed shard %s (%llu,%llu] after %s\n", - job_name, - (unsigned long long) shard_start, - (unsigned long long) shard_end, - GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_duration (shard_start_time), - true)); - break; - } - shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); - shard_open = false; - transaction_completed (); - return; - } - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&continue_with_shard, - NULL); -} - - -/** - * We got incoming transaction details from the bank. Add them - * to the database. - * * @param batch_size desired batch size * @param details array of transaction details * @param details_length length of the @a details array */ static void -process_reply_batched2 (unsigned int batch_size, - const struct TALER_BANK_CreditDetails *details, - unsigned int details_length) +process_reply (unsigned int batch_size, + const struct TALER_BANK_CreditDetails *details, + unsigned int details_length) { enum GNUNET_DB_QueryStatus qs; bool shard_done; @@ -950,11 +567,11 @@ process_reply_batched2 (unsigned int batch_size, res->exchange_account_name = ai->section_name; res->wire_reference = cd->serial_id; } - qs = db_plugin->batch2_reserves_in_insert (db_plugin->cls, - reserves, - details_length, - batch_size, - qss); + qs = db_plugin->reserves_in_insert (db_plugin->cls, + reserves, + details_length, + batch_size, + qss); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -1074,7 +691,7 @@ history_cb (void *cls, (void) cls; if (-2 == batch_mode) { - const char *mode = getenv ("TALER_USE_BATCH"); + const char *mode = getenv ("TALER_WIREWATCH_BATCH_SIZE"); char dummy; if ( (NULL == mode) || @@ -1087,7 +704,7 @@ history_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Bad batch mode `%s' specified\n", mode); - batch_mode = -1; + batch_mode = 8; /* maximum supported is currently 8 */ } } GNUNET_assert (NULL == task); @@ -1098,22 +715,9 @@ history_cb (void *cls, switch (reply->http_status) { case MHD_HTTP_OK: - switch (batch_mode) - { - case -1: - process_reply (reply->details.success.details, - reply->details.success.details_length); - break; - case 0: - process_reply_batched (reply->details.success.details, - reply->details.success.details_length); - break; - default: - process_reply_batched2 ((unsigned int) batch_mode, - reply->details.success.details, - reply->details.success.details_length); - break; - } + process_reply (batch_mode, + reply->details.success.details, + reply->details.success.details_length); return; case MHD_HTTP_NO_CONTENT: transaction_completed (); |