-fix wirewatch assertion

This commit is contained in:
Christian Grothoff 2022-05-22 13:48:56 +02:00
parent 3233195d2d
commit 21bcc5fa0b
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
3 changed files with 124 additions and 46 deletions

View File

@ -111,9 +111,9 @@ static char *cfg_filename;
static int use_fakebank = 1; static int use_fakebank = 1;
/** /**
* Launch taler-exchange-wirewatch. * Number of taler-exchange-wirewatchers to launch.
*/ */
static int start_wirewatch; static unsigned int start_wirewatch;
/** /**
* Verbosity level. * Verbosity level.
@ -265,8 +265,9 @@ run (void *cls,
(void) cls; (void) cls;
len = howmany_reserves + 2; len = howmany_reserves + 2;
all_commands = GNUNET_new_array (len, all_commands = GNUNET_malloc_large (len
struct TALER_TESTING_Command); * sizeof (struct TALER_TESTING_Command));
GNUNET_assert (NULL != all_commands);
GNUNET_asprintf (&total_reserve_amount, GNUNET_asprintf (&total_reserve_amount,
"%s:5", "%s:5",
currency); currency);
@ -465,14 +466,17 @@ launch_fakebank (void *cls)
* *
* @return #GNUNET_OK on success * @return #GNUNET_OK on success
*/ */
static int static enum GNUNET_GenericReturnValue
parallel_benchmark (void) parallel_benchmark (void)
{ {
enum GNUNET_GenericReturnValue result = GNUNET_OK; enum GNUNET_GenericReturnValue result = GNUNET_OK;
pid_t fakebank = -1; pid_t fakebank = -1;
struct GNUNET_OS_Process *bankd = NULL; struct GNUNET_OS_Process *bankd = NULL;
struct GNUNET_OS_Process *wirewatch = NULL; struct GNUNET_OS_Process *wirewatch[GNUNET_NZL (start_wirewatch)];
memset (wirewatch,
0,
sizeof (wirewatch));
if ( (MODE_BANK == mode) || if ( (MODE_BANK == mode) ||
(MODE_BOTH == mode) ) (MODE_BOTH == mode) )
{ {
@ -560,19 +564,30 @@ parallel_benchmark (void)
GNUNET_OS_process_wait (dbinit)); GNUNET_OS_process_wait (dbinit));
GNUNET_OS_process_destroy (dbinit); GNUNET_OS_process_destroy (dbinit);
} }
if (start_wirewatch) /* start exchange wirewatch */
for (unsigned int w = 0; w<start_wirewatch; w++)
{ {
/* start exchange wirewatch */ wirewatch[w] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL,
wirewatch = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, NULL, NULL, NULL,
NULL, NULL, NULL, "taler-exchange-wirewatch",
"taler-exchange-wirewatch", "taler-exchange-wirewatch",
"taler-exchange-wirewatch", "-c", cfg_filename,
"-c", cfg_filename, "-L", loglev,
NULL); NULL);
if (NULL == wirewatch) if (NULL == wirewatch[w])
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to launch wirewatch, aborting benchmark\n"); "Failed to launch wirewatch, aborting benchmark\n");
for (unsigned int x = 0; x<w; x++)
{
GNUNET_break (0 ==
GNUNET_OS_process_kill (wirewatch[x],
SIGTERM));
GNUNET_break (GNUNET_OK ==
GNUNET_OS_process_wait (wirewatch[x]));
GNUNET_OS_process_destroy (wirewatch[x]);
wirewatch[x] = NULL;
}
if (-1 != fakebank) if (-1 != fakebank)
{ {
int wstatus; int wstatus;
@ -618,17 +633,61 @@ parallel_benchmark (void)
if ( (MODE_BANK == mode) || if ( (MODE_BANK == mode) ||
(MODE_BOTH == mode) ) (MODE_BOTH == mode) )
{ {
if (NULL != wirewatch) /* Ensure wirewatch runs to completion! */
if (0 != start_wirewatch)
{ {
/* stop wirewatch */ /* replace ONE of the wirewatchers with one that is in test-mode */
GNUNET_break (0 == GNUNET_break (0 ==
GNUNET_OS_process_kill (wirewatch, GNUNET_OS_process_kill (wirewatch[0],
SIGTERM)); SIGTERM));
GNUNET_break (GNUNET_OK == GNUNET_break (GNUNET_OK ==
GNUNET_OS_process_wait (wirewatch)); GNUNET_OS_process_wait (wirewatch[0]));
GNUNET_OS_process_destroy (wirewatch); GNUNET_OS_process_destroy (wirewatch[0]);
wirewatch = NULL; wirewatch[0] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL,
NULL, NULL, NULL,
"taler-exchange-wirewatch",
"taler-exchange-wirewatch",
"-c", cfg_filename,
"-L", loglev,
"-t",
NULL);
/* wait for it to finish! */
GNUNET_break (GNUNET_OK ==
GNUNET_OS_process_wait (wirewatch[0]));
GNUNET_OS_process_destroy (wirewatch[0]);
wirewatch[0] = NULL;
/* Then stop the rest, which should basically also be finished */
for (unsigned int w = 1; w<start_wirewatch; w++)
{
GNUNET_break (0 ==
GNUNET_OS_process_kill (wirewatch[w],
SIGTERM));
GNUNET_break (GNUNET_OK ==
GNUNET_OS_process_wait (wirewatch[w]));
GNUNET_OS_process_destroy (wirewatch[w]);
}
/* But be extra sure we did finish all shards by doing one more */
wirewatch[0] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL,
NULL, NULL, NULL,
"taler-exchange-wirewatch",
"taler-exchange-wirewatch",
"-c", cfg_filename,
"-L", loglev,
"-t",
NULL);
/* wait for it to finish! */
GNUNET_break (GNUNET_OK ==
GNUNET_OS_process_wait (wirewatch[0]));
GNUNET_OS_process_destroy (wirewatch[0]);
wirewatch[0] = NULL;
} }
/* Now stop the time, if this was the right mode */
if ( (GNUNET_YES != linger) &&
(MODE_BANK != mode) )
duration = GNUNET_TIME_absolute_get_duration (start_time);
/* stop fakebank */ /* stop fakebank */
if (-1 != fakebank) if (-1 != fakebank)
{ {
@ -727,9 +786,10 @@ main (int argc,
&history_size), &history_size),
GNUNET_GETOPT_option_version (PACKAGE_VERSION " " VCS_VERSION), GNUNET_GETOPT_option_version (PACKAGE_VERSION " " VCS_VERSION),
GNUNET_GETOPT_option_verbose (&verbose), GNUNET_GETOPT_option_verbose (&verbose),
GNUNET_GETOPT_option_flag ('w', GNUNET_GETOPT_option_uint ('w',
"wirewatch", "wirewatch",
"run taler-exchange-wirewatch", "NPROC",
"run NPROC taler-exchange-wirewatch processes",
&start_wirewatch), &start_wirewatch),
GNUNET_GETOPT_OPTION_END GNUNET_GETOPT_OPTION_END
}; };
@ -858,14 +918,17 @@ main (int argc,
howmany_clients, howmany_clients,
GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_STRINGS_relative_time_to_string (duration,
GNUNET_YES)); GNUNET_YES));
tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU if (! GNUNET_TIME_relative_is_zero (duration))
/ (duration.rel_value_us / 1000LL); {
fprintf (stdout, tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU
"RAW: %04u %04u %16llu (%llu TPS)\n", / (duration.rel_value_us / 1000LL);
howmany_reserves, fprintf (stdout,
howmany_clients, "RAW: %04u %04u %16llu (%llu TPS)\n",
(unsigned long long) duration.rel_value_us, howmany_reserves,
tps); howmany_clients,
(unsigned long long) duration.rel_value_us,
tps);
}
fprintf (stdout, fprintf (stdout,
"CPU time: sys %llu user %llu\n", \ "CPU time: sys %llu user %llu\n", \
(unsigned long long) (usage.ru_stime.tv_sec * 1000 * 1000 (unsigned long long) (usage.ru_stime.tv_sec * 1000 * 1000

View File

@ -384,10 +384,10 @@ handle_soft_error (struct WireAccount *wa)
"Reduced batch size to %llu due to serialization issue\n", "Reduced batch size to %llu due to serialization issue\n",
(unsigned long long) wa->batch_size); (unsigned long long) wa->batch_size);
} }
GNUNET_assert (NULL == task);
/* Reset to beginning of transaction, and go again /* Reset to beginning of transaction, and go again
from there. */ from there. */
wa->latest_row_off = wa->batch_start; wa->latest_row_off = wa->batch_start;
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&continue_with_shard, task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
wa); wa);
} }
@ -458,6 +458,7 @@ account_completed (struct WireAccount *wa)
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa = wa->next; wa = wa->next;
} }
GNUNET_assert (NULL == task);
schedule_transfers (wa); schedule_transfers (wa);
} }
@ -533,6 +534,7 @@ do_commit (struct WireAccount *wa)
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
bool shard_done; bool shard_done;
GNUNET_assert (NULL == task);
shard_done = check_shard_done (wa); shard_done = check_shard_done (wa);
wa->started_transaction = false; wa->started_transaction = false;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@ -563,7 +565,8 @@ do_commit (struct WireAccount *wa)
if (shard_done) if (shard_done)
account_completed (wa); account_completed (wa);
else else
continue_with_shard (wa); task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
wa);
} }
@ -591,6 +594,7 @@ history_cb (void *cls,
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
(void) json; (void) json;
GNUNET_assert (NULL == task);
if (NULL == details) if (NULL == details)
{ {
wa->hh = NULL; wa->hh = NULL;
@ -660,14 +664,17 @@ history_cb (void *cls,
wa->hh = NULL; wa->hh = NULL;
if (wa->started_transaction) if (wa->started_transaction)
{ {
GNUNET_assert (NULL == task);
do_commit (wa); do_commit (wa);
} }
else else
{ {
GNUNET_assert (NULL == task);
if (check_shard_done (wa)) if (check_shard_done (wa))
account_completed (wa); account_completed (wa);
else else
continue_with_shard (wa); task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
wa);
} }
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
@ -746,6 +753,7 @@ continue_with_shard (void *cls)
struct WireAccount *wa = cls; struct WireAccount *wa = cls;
unsigned int limit; unsigned int limit;
task = NULL;
limit = GNUNET_MIN (wa->batch_size, limit = GNUNET_MIN (wa->batch_size,
wa->shard_end - wa->latest_row_off); wa->shard_end - wa->latest_row_off);
wa->max_row_off = wa->latest_row_off + limit; wa->max_row_off = wa->latest_row_off + limit;
@ -816,15 +824,18 @@ lock_shard (void *cls)
return; return;
case GNUNET_DB_STATUS_SOFT_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */ /* try again */
GNUNET_break (0); {
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, struct GNUNET_TIME_Relative rdelay;
"Serialization error tying to obtain shard %s, will try again in %s!\n",
wa->job_name, rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval);
GNUNET_STRINGS_relative_time_to_string ( GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
wirewatch_idle_sleep_interval, "Serialization error tying to obtain shard %s, will try again in %s!\n",
GNUNET_YES)); wa->job_name,
wa->delayed_until = GNUNET_TIME_relative_to_absolute ( GNUNET_STRINGS_relative_time_to_string (rdelay,
wirewatch_idle_sleep_interval); GNUNET_YES));
wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
}
GNUNET_assert (NULL == task);
schedule_transfers (wa->next); schedule_transfers (wa->next);
return; return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
@ -837,6 +848,7 @@ lock_shard (void *cls)
GNUNET_YES)); GNUNET_YES));
wa->delayed_until = GNUNET_TIME_relative_to_absolute ( wa->delayed_until = GNUNET_TIME_relative_to_absolute (
wirewatch_idle_sleep_interval); wirewatch_idle_sleep_interval);
GNUNET_assert (NULL == task);
schedule_transfers (wa->next); schedule_transfers (wa->next);
return; return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
@ -854,7 +866,8 @@ lock_shard (void *cls)
we find out that we're really busy */ we find out that we're really busy */
wa->batch_start = wa->shard_start; wa->batch_start = wa->shard_start;
wa->latest_row_off = wa->batch_start; wa->latest_row_off = wa->batch_start;
continue_with_shard (wa); task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
wa);
} }
@ -894,6 +907,7 @@ run (void *cls,
return; return;
} }
rc = GNUNET_CURL_gnunet_rc_create (ctx); rc = GNUNET_CURL_gnunet_rc_create (ctx);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&lock_shard, task = GNUNET_SCHEDULER_add_now (&lock_shard,
wa_head); wa_head);
} }

View File

@ -449,8 +449,9 @@ TALER_TESTING_run2 (struct TALER_TESTING_Interpreter *is,
/* get the number of commands */ /* get the number of commands */
for (i = 0; NULL != commands[i].label; i++) for (i = 0; NULL != commands[i].label; i++)
; ;
is->commands = GNUNET_new_array (i + 1, is->commands = GNUNET_malloc_large ( (i + 1)
struct TALER_TESTING_Command); * sizeof (struct TALER_TESTING_Command));
GNUNET_assert (NULL != is->commands);
memcpy (is->commands, memcpy (is->commands,
commands, commands,
sizeof (struct TALER_TESTING_Command) * i); sizeof (struct TALER_TESTING_Command) * i);