wirewatch spring cleaning

This commit is contained in:
Christian Grothoff 2022-05-21 21:07:24 +02:00
parent a6494f9905
commit 737937291c
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC

View File

@ -106,6 +106,12 @@ struct WireAccount
*/
struct GNUNET_TIME_Absolute shard_start_time;
/**
* How long did we take to finish the last shard
* for this account?
*/
struct GNUNET_TIME_Relative shard_delay;
/**
* Name of our job in the shard table.
*/
@ -117,15 +123,10 @@ struct WireAccount
unsigned int batch_size;
/**
* How much do we incremnt @e batch_size on success?
* How much do we increment @e batch_size on success?
*/
unsigned int batch_thresh;
/**
* How many transactions did we see in the current batch?
*/
unsigned int current_batch_size;
/**
* Should we delay the next request to the wire plugin a bit? Set to
* false if we actually did some work.
@ -150,12 +151,6 @@ static struct WireAccount *wa_head;
*/
static struct WireAccount *wa_tail;
/**
* Wire account we are currently processing. This would go away
* if we ever start processing all accounts in parallel.
*/
static struct WireAccount *wa_pos;
/**
* Handle to the context for interacting with the bank.
*/
@ -184,11 +179,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
*/
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
* How long did we take to finish the last shard?
*/
static struct GNUNET_TIME_Relative shard_delay;
/**
* Modulus to apply to group shards. The shard size must ultimately be a
* multiple of the batch size. Thus, if this is not a multiple of the
@ -249,9 +239,9 @@ shutdown_task (void *cls)
wa->started_transaction = false;
}
qs = db_plugin->abort_shard (db_plugin->cls,
wa_pos->job_name,
wa_pos->shard_start,
wa_pos->shard_end);
wa->job_name,
wa->shard_start,
wa->shard_end);
if (qs <= 0)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to abort work shard on shutdown\n");
@ -259,8 +249,6 @@ shutdown_task (void *cls)
GNUNET_free (wa);
}
}
wa_pos = NULL;
if (NULL != ctx)
{
GNUNET_CURL_fini (ctx);
@ -359,12 +347,22 @@ exchange_serve_process_config (void)
/**
* Query for incoming wire transfers.
* Lock a shard and then begin to query for incoming wire transfers.
*
* @param cls NULL
* @param cls a `struct WireAccount` to operate on
*/
static void
find_transfers (void *cls);
lock_shard (void *cls);
/**
* Continue with the credit history of the shard
* reserved as @a wa.
*
* @param[in,out] cls `struct WireAccount *` account with shard to continue processing
*/
static void
continue_with_shard (void *cls);
/**
@ -387,23 +385,59 @@ handle_soft_error (struct WireAccount *wa)
(unsigned long long) wa->batch_size);
}
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
/* Reset to beginning of transaction, and go again
from there. */
wa->latest_row_off = wa->batch_start;
task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
wa);
}
/**
* We are done with a shard, move on to the next one.
* Schedule the #lock_shard() operation for
* @a wa. If @a wa is NULL, start with #wa_head.
*
* @param wa account to schedule #lock_shard() for,
* possibly NULL (!).
*/
static void
schedule_transfers (struct WireAccount *wa)
{
if (NULL == wa)
{
wa = wa_head;
GNUNET_assert (NULL != wa);
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Will try to lock next shard of %s in %s\n",
wa->job_name,
GNUNET_STRINGS_relative_time_to_string (
GNUNET_TIME_absolute_get_remaining (wa->delayed_until),
GNUNET_YES));
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_at (wa->delayed_until,
&lock_shard,
wa);
}
/**
* We are done with the work that is possible on @a wa right now (and the
* transaction was committed, if there was one to commit). Move on to the next
* account.
*
* @param wa wire account for which we completed a shard
*/
static void
shard_completed (struct WireAccount *wa)
account_completed (struct WireAccount *wa)
{
/* transaction success, update #last_row_off */
wa->batch_start = wa->latest_row_off;
if (wa->batch_size < MAXIMUM_BATCH_SIZE)
GNUNET_assert (! wa->started_transaction);
if ( (wa->batch_start + wa->batch_size ==
wa->latest_row_off) &&
(wa->batch_size < MAXIMUM_BATCH_SIZE) )
{
/* The current batch size worked without serialization
issues, and we are allowed to grow. Do so slowly. */
int delta;
delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
@ -411,45 +445,45 @@ shard_completed (struct WireAccount *wa)
delta = -delta;
wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
wa->batch_size + delta + 1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
}
if (wa->delay)
{
/* This account was finished, block this one for the
#wirewatch_idle_sleep_interval and move on to the next one. */
wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa_pos = wa_pos->next;
if (NULL == wa_pos)
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
wa = wa->next;
}
GNUNET_assert (NULL == task);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Will look for more transfers in %s\n",
GNUNET_STRINGS_relative_time_to_string (
GNUNET_TIME_absolute_get_remaining (wa_pos->delayed_until),
GNUNET_YES));
task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
&find_transfers,
NULL);
schedule_transfers (wa);
}
/**
* We are finished with the current shard. Update the database, marking the
* shard as finished.
* Check if we are finished with the current shard. If so, update the
* database, marking the shard as finished.
*
* @param wa wire account to commit for
* @return true on success
* @return true if we were indeed done with the shard
*/
static bool
mark_shard_done (struct WireAccount *wa)
check_shard_done (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
if (wa->shard_end > wa->latest_row_off)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shard %s (%llu,%llu] at %llu\n",
wa->job_name,
(unsigned long long) wa->shard_start,
(unsigned long long) wa->shard_end,
(unsigned long long) wa->latest_row_off);
return false; /* actually, not done! */
}
/* shard is complete, mark this as well */
qs = db_plugin->complete_shard (db_plugin->cls,
wa->job_name,
@ -468,28 +502,25 @@ mark_shard_done (struct WireAccount *wa)
handle_soft_error (wa);
return false;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* already existed, ok, let's just continue */
GNUNET_break (0);
/* Not expected, but let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Completed shard %s (%llu,%llu] after %s\n",
wa->job_name,
(unsigned long long) wa->shard_start,
(unsigned long long) wa->shard_end,
GNUNET_STRINGS_relative_time_to_string (wa->shard_delay,
GNUNET_YES));
break;
}
return true;
}
/**
* Continue with the credit history of the shard
* reserved as @a wa_pos.
*
* @param[in,out] wa_pos shard to continue processing
*/
static void
continue_with_shard (struct WireAccount *wa_pos);
/**
* We are finished with the current transaction, try
* to commit and then schedule the next iteration.
@ -502,8 +533,17 @@ do_commit (struct WireAccount *wa)
enum GNUNET_DB_QueryStatus qs;
bool shard_done;
shard_done = check_shard_done (wa);
wa->started_transaction = false;
shard_done = mark_shard_done (wa);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Committing %s progress (%llu,%llu] at %llu\n (%s)",
wa->job_name,
(unsigned long long) wa->shard_start,
(unsigned long long) wa->shard_end,
(unsigned long long) wa->latest_row_off,
shard_done
? "shard done"
: "shard incomplete");
qs = db_plugin->commit (db_plugin->cls);
switch (qs)
{
@ -521,7 +561,7 @@ do_commit (struct WireAccount *wa)
break;
}
if (shard_done)
shard_completed (wa);
account_completed (wa);
else
continue_with_shard (wa);
}
@ -568,63 +608,67 @@ history_cb (void *cls,
}
if (wa->started_transaction)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"End of list. Committing progress!\n");
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"End of list. Committing progress on %s of (%llu,%llu]!\n",
wa->job_name,
(unsigned long long) wa->batch_start,
(unsigned long long) wa->latest_row_off);
do_commit (wa);
return GNUNET_OK; /* will be ignored anyway */
}
else
{
/* We did not even start a transaction. */
if ( (wa->delay) &&
(test_mode) &&
(NULL == wa->next) )
{
/* We exit on idle */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown due to test mode!\n");
GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK;
}
else
{
shard_completed (wa);
}
}
account_completed (wa);
return GNUNET_OK; /* will be ignored anyway */
}
/* We did get 'details' from the bank. Do sanity checks before inserting. */
if (serial_id < wa->latest_row_off)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Serial ID %llu not monotonic (got %llu before). Failing!\n",
(unsigned long long) serial_id,
(unsigned long long) wa->latest_row_off);
if (wa->started_transaction)
{
wa->started_transaction = false;
db_plugin->rollback (db_plugin->cls);
}
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
}
/* If we got 'limit' transactions back from the bank,
we should not introduce any delay before the next
call. */
if (serial_id >= wa->max_row_off)
wa->delay = false;
if (serial_id > wa->shard_end)
{
/* we are done with the current shard, commit and stop this iteration! */
/* we are *past* the current shard (likely because the serial_id of the
shard_end happens to not exist in the DB). So commit and stop this
iteration! */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serial ID %llu past shard end at %llu, ending iteration early!\n",
(unsigned long long) serial_id,
(unsigned long long) wa->shard_end);
wa->latest_row_off = serial_id;
wa->latest_row_off = serial_id - 1; /* excluding serial_id! */
wa->hh = NULL;
if (wa->started_transaction)
{
do_commit (wa);
}
else
{
if (mark_shard_done (wa))
shard_completed (wa);
if (check_shard_done (wa))
account_completed (wa);
else
continue_with_shard (wa);
}
wa->hh = NULL;
return GNUNET_SYSERR;
}
if (! wa->started_transaction)
@ -640,7 +684,6 @@ history_cb (void *cls,
wa->hh = NULL;
return GNUNET_SYSERR;
}
wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();
wa->started_transaction = true;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@ -675,6 +718,17 @@ history_cb (void *cls,
wa->hh = NULL;
return GNUNET_SYSERR;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* Either wirewatch was freshly started after the system was
shutdown and we're going over an incomplete shard again
after being restarted, or the shard lock period was too
short (number of workers set incorrectly?) and a 2nd
wirewatcher has been stealing our work while we are still
at it. */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Attempted to import transaction %llu (%s) twice. "
"This should happen rarely (if not, ask for support).\n",
(unsigned long long) serial_id,
wa->job_name);
/* already existed, ok, let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
@ -686,17 +740,43 @@ history_cb (void *cls,
}
/**
* Query for incoming wire transfers.
*
* @param cls NULL
*/
static void
find_transfers (void *cls)
continue_with_shard (void *cls)
{
enum GNUNET_DB_QueryStatus qs;
struct WireAccount *wa = cls;
unsigned int limit;
limit = GNUNET_MIN (wa->batch_size,
wa->shard_end - wa->latest_row_off);
wa->max_row_off = wa->latest_row_off + limit;
GNUNET_assert (NULL == wa->hh);
wa->hh = TALER_BANK_credit_history (ctx,
wa->ai->auth,
wa->latest_row_off,
limit,
test_mode
? GNUNET_TIME_UNIT_ZERO
: LONGPOLL_TIMEOUT,
&history_cb,
wa);
if (NULL == wa->hh)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start request for account history!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
}
static void
lock_shard (void *cls)
{
struct WireAccount *wa = cls;
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_TIME_Relative delay;
(void) cls;
task = NULL;
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
@ -707,15 +787,9 @@ find_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
if (wa_pos->shard_end <= wa_pos->batch_start)
{
uint64_t start;
uint64_t end;
struct GNUNET_TIME_Relative delay;
/* advance to next shard */
/* How long we lock a shard depends on the number of
workers expected, and how long we usually took to
process a shard. */
if (0 == max_workers)
delay = GNUNET_TIME_UNIT_ZERO;
else
@ -723,14 +797,15 @@ find_transfers (void *cls)
GNUNET_CRYPTO_QUALITY_WEAK,
4 * GNUNET_TIME_relative_max (
wirewatch_idle_sleep_interval,
GNUNET_TIME_relative_multiply (shard_delay,
GNUNET_TIME_relative_multiply (wa->shard_delay,
max_workers)).rel_value_us);
wa->shard_start_time = GNUNET_TIME_absolute_get ();
qs = db_plugin->begin_shard (db_plugin->cls,
wa_pos->job_name,
wa->job_name,
delay,
shard_size,
&start,
&end);
&wa->shard_start,
&wa->shard_end);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
@ -741,75 +816,45 @@ find_transfers (void *cls)
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Serialization error tying to obtain shard, will try again in %s!\n",
"Serialization error tying to obtain shard %s, will try again in %s!\n",
wa->job_name,
GNUNET_STRINGS_relative_time_to_string (
wirewatch_idle_sleep_interval,
GNUNET_YES));
task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
&find_transfers,
NULL);
wa->delayed_until = GNUNET_TIME_relative_to_absolute (
wirewatch_idle_sleep_interval);
schedule_transfers (wa->next);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"No shard available, will try again in %s!\n",
"No shard available, will try again for %s in %s!\n",
wa->job_name,
GNUNET_STRINGS_relative_time_to_string (
wirewatch_idle_sleep_interval,
GNUNET_YES));
task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
&find_transfers,
NULL);
wa->delayed_until = GNUNET_TIME_relative_to_absolute (
wirewatch_idle_sleep_interval);
schedule_transfers (wa->next);
return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
wa_pos->shard_start = start;
wa_pos->shard_end = end;
wa_pos->batch_start = start;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting with shard at [%llu,%llu) locked for %s\n",
(unsigned long long) start,
(unsigned long long) end,
GNUNET_STRINGS_relative_time_to_string (delay,
GNUNET_YES));
/* continued below */
break;
}
}
wa_pos->latest_row_off = wa_pos->batch_start;
continue_with_shard (wa_pos);
}
static void
continue_with_shard (struct WireAccount *wa_pos)
{
unsigned int limit;
limit = GNUNET_MIN (wa_pos->batch_size,
wa_pos->shard_end - wa_pos->latest_row_off);
GNUNET_assert (NULL == wa_pos->hh);
wa_pos->max_row_off = wa_pos->latest_row_off + limit - 1;
wa_pos->hh = TALER_BANK_credit_history (ctx,
wa_pos->ai->auth,
wa_pos->latest_row_off,
limit,
test_mode
? GNUNET_TIME_UNIT_ZERO
: LONGPOLL_TIMEOUT,
&history_cb,
wa_pos);
if (NULL == wa_pos->hh)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start request for account history!\n");
if (wa_pos->started_transaction)
{
db_plugin->rollback (db_plugin->cls);
wa_pos->started_transaction = false;
}
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting with shard %s at (%llu,%llu] locked for %s\n",
wa->job_name,
(unsigned long long) wa->shard_start,
(unsigned long long) wa->shard_end,
GNUNET_STRINGS_relative_time_to_string (delay,
GNUNET_YES));
wa->delay = true; /* default is to delay, unless
we find out that we're really busy */
wa->batch_start = wa->shard_start;
wa->latest_row_off = wa->batch_start;
continue_with_shard (wa);
}
@ -838,21 +883,19 @@ run (void *cls,
global_ret = EXIT_NOTCONFIGURED;
return;
}
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
&rc);
rc = GNUNET_CURL_gnunet_rc_create (ctx);
if (NULL == ctx)
{
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
return;
}
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
rc = GNUNET_CURL_gnunet_rc_create (ctx);
task = GNUNET_SCHEDULER_add_now (&lock_shard,
wa_head);
}