fix idle transaction issue introduced earlier

This commit is contained in:
Christian Grothoff 2021-12-09 22:15:30 +01:00
parent 6dd4a90abd
commit 889625a90f
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC

View File

@ -362,6 +362,7 @@ static void
handle_soft_error (struct WireAccount *wa)
{
db_plugin->rollback (db_plugin->cls);
wa->started_transaction = false;
if (1 < wa->batch_size)
{
wa->batch_thresh = wa->batch_size;
@ -377,18 +378,58 @@ handle_soft_error (struct WireAccount *wa)
/**
* We are finished with the current transaction, try
* to commit and then schedule the next iteration.
* We are done with a shard, move on to the next one.
*
* @param wa wire account to commit for
* @param wa wire account for which we completed a shard
*/
static void
do_commit (struct WireAccount *wa)
shard_completed (struct WireAccount *wa)
{
/* transaction success, update #last_row_off */
wa->batch_start = wa->latest_row_off;
if (wa->batch_size < MAXIMUM_BATCH_SIZE)
{
int delta;
delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
if (delta < 0)
delta = -delta;
wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
wa->batch_size + delta + 1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
}
if (wa->delay)
{
wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa_pos = wa_pos->next;
if (NULL == wa_pos)
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
}
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
&find_transfers,
NULL);
}
/**
* We are finished with the current shard. Update the database, marking the
* shard as finished.
*
* @param wa wire account to commit for
* @return true on success
*/
static bool
mark_shard_done (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
if (wa->shard_end <= wa->latest_row_off)
{
if (wa->shard_end > wa->latest_row_off)
return false; /* actually, not done! */
/* shard is complete, mark this as well */
qs = db_plugin->complete_shard (db_plugin->cls,
wa->job_name,
@ -400,12 +441,12 @@ do_commit (struct WireAccount *wa)
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
GNUNET_SCHEDULER_shutdown ();
return;
return false;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for complete_shard. Rolling back.\n");
handle_soft_error (wa);
return;
return false;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* already existed, ok, let's just continue */
break;
@ -415,7 +456,23 @@ do_commit (struct WireAccount *wa)
break;
}
}
return true;
}
/**
* We are finished with the current transaction, try
* to commit and then schedule the next iteration.
*
* @param wa wire account to commit for
*/
static void
do_commit (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
wa->started_transaction = false;
mark_shard_done (wa);
qs = db_plugin->commit (db_plugin->cls);
switch (qs)
{
@ -432,43 +489,7 @@ do_commit (struct WireAccount *wa)
/* normal case */
break;
}
/* transaction success, update #last_row_off */
wa->batch_start = wa->latest_row_off;
if (wa->batch_size < MAXIMUM_BATCH_SIZE)
{
int delta;
delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
if (delta < 0)
delta = -delta;
wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
wa->batch_size + delta + 1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
}
if ( (wa->delay) &&
(test_mode) &&
(NULL == wa->next) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown due to test mode!\n");
GNUNET_SCHEDULER_shutdown ();
return;
}
if (wa->delay)
{
wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa_pos = wa_pos->next;
if (NULL == wa_pos)
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
}
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
&find_transfers,
NULL);
shard_completed (wa);
}
@ -510,9 +531,20 @@ history_cb (void *cls,
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"End of list. Committing progress!\n");
wa->started_transaction = false;
do_commit (wa);
}
else
{
if ( (wa->delay) &&
(test_mode) &&
(NULL == wa->next) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown due to test mode!\n");
GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK;
}
}
return GNUNET_OK; /* will be ignored anyway */
}
if (serial_id < wa->latest_row_off)
@ -542,11 +574,13 @@ history_cb (void *cls,
wa->delay = false;
if (wa->started_transaction)
{
wa->started_transaction = false;
do_commit (wa);
}
else
GNUNET_break (0); /* how did this happen */
{
if (mark_shard_done (wa))
shard_completed (wa);
}
wa->hh = NULL;
return GNUNET_SYSERR;
}
@ -587,6 +621,7 @@ history_cb (void *cls,
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
wa->started_transaction = false;
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
@ -703,7 +738,11 @@ find_transfers (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start request for account history!\n");
if (wa_pos->started_transaction)
{
db_plugin->rollback (db_plugin->cls);
wa_pos->started_transaction = false;
}
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;