From 889625a90f97a23048b3c9dad418f86acb81314b Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 9 Dec 2021 22:15:30 +0100 Subject: [PATCH] fix idle transaction issue introduced earlier --- src/exchange/taler-exchange-wirewatch.c | 179 +++++++++++++++--------- 1 file changed, 109 insertions(+), 70 deletions(-) diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index a417342a2..eb0449942 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -362,6 +362,7 @@ static void handle_soft_error (struct WireAccount *wa) { db_plugin->rollback (db_plugin->cls); + wa->started_transaction = false; if (1 < wa->batch_size) { wa->batch_thresh = wa->batch_size; @@ -376,6 +377,89 @@ handle_soft_error (struct WireAccount *wa) } +/** + * We are done with a shard, move on to the next one. + * + * @param wa wire account for which we completed a shard + */ +static void +shard_completed (struct WireAccount *wa) +{ + /* transaction success, update #last_row_off */ + wa->batch_start = wa->latest_row_off; + if (wa->batch_size < MAXIMUM_BATCH_SIZE) + { + int delta; + + delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; + if (delta < 0) + delta = -delta; + wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, + wa->batch_size + delta + 1); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Increasing batch size to %llu\n", + (unsigned long long) wa->batch_size); + } + if (wa->delay) + { + wa->delayed_until + = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); + wa_pos = wa_pos->next; + if (NULL == wa_pos) + wa_pos = wa_head; + GNUNET_assert (NULL != wa_pos); + } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until, + &find_transfers, + NULL); +} + + +/** + * We are finished with the current shard. Update the database, marking the + * shard as finished. + * + * @param wa wire account to commit for + * @return true on success + */ +static bool +mark_shard_done (struct WireAccount *wa) +{ + enum GNUNET_DB_QueryStatus qs; + + if (wa->shard_end > wa->latest_row_off) + return false; /* actually, not done! */ + /* shard is complete, mark this as well */ + qs = db_plugin->complete_shard (db_plugin->cls, + wa->job_name, + wa->shard_start, + wa->shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls); + GNUNET_SCHEDULER_shutdown (); + return false; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + handle_soft_error (wa); + return false; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* already existed, ok, let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); + + break; + } + return true; +} + + /** * We are finished with the current transaction, try * to commit and then schedule the next iteration. @@ -387,35 +471,8 @@ do_commit (struct WireAccount *wa) { enum GNUNET_DB_QueryStatus qs; - if (wa->shard_end <= wa->latest_row_off) - { - /* shard is complete, mark this as well */ - qs = db_plugin->complete_shard (db_plugin->cls, - wa->job_name, - wa->shard_start, - wa->shard_end); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for complete_shard. Rolling back.\n"); - handle_soft_error (wa); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* already existed, ok, let's just continue */ - break; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ - shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); - - break; - } - } + wa->started_transaction = false; + mark_shard_done (wa); qs = db_plugin->commit (db_plugin->cls); switch (qs) { @@ -432,43 +489,7 @@ do_commit (struct WireAccount *wa) /* normal case */ break; } - /* transaction success, update #last_row_off */ - wa->batch_start = wa->latest_row_off; - if (wa->batch_size < MAXIMUM_BATCH_SIZE) - { - int delta; - - delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; - if (delta < 0) - delta = -delta; - wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, - wa->batch_size + delta + 1); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Increasing batch size to %llu\n", - (unsigned long long) wa->batch_size); - } - if ( (wa->delay) && - (test_mode) && - (NULL == wa->next) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Shutdown due to test mode!\n"); - GNUNET_SCHEDULER_shutdown (); - return; - } - if (wa->delay) - { - wa->delayed_until - = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); - wa_pos = wa_pos->next; - if (NULL == wa_pos) - wa_pos = wa_head; - GNUNET_assert (NULL != wa_pos); - } - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until, - &find_transfers, - NULL); + shard_completed (wa); } @@ -510,9 +531,20 @@ history_cb (void *cls, { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "End of list. Committing progress!\n"); - wa->started_transaction = false; do_commit (wa); } + else + { + if ( (wa->delay) && + (test_mode) && + (NULL == wa->next) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shutdown due to test mode!\n"); + GNUNET_SCHEDULER_shutdown (); + return GNUNET_OK; + } + } return GNUNET_OK; /* will be ignored anyway */ } if (serial_id < wa->latest_row_off) @@ -542,11 +574,13 @@ history_cb (void *cls, wa->delay = false; if (wa->started_transaction) { - wa->started_transaction = false; do_commit (wa); } else - GNUNET_break (0); /* how did this happen */ + { + if (mark_shard_done (wa)) + shard_completed (wa); + } wa->hh = NULL; return GNUNET_SYSERR; } @@ -587,6 +621,7 @@ history_cb (void *cls, case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); db_plugin->rollback (db_plugin->cls); + wa->started_transaction = false; GNUNET_SCHEDULER_shutdown (); wa->hh = NULL; return GNUNET_SYSERR; @@ -703,7 +738,11 @@ find_transfers (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start request for account history!\n"); - db_plugin->rollback (db_plugin->cls); + if (wa_pos->started_transaction) + { + db_plugin->rollback (db_plugin->cls); + wa_pos->started_transaction = false; + } global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return;