fix keepalive when using thread pool

This commit is contained in:
Christian Grothoff 2021-06-19 16:29:30 +02:00
parent ad8390432b
commit bbe86aee78
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 182 additions and 57 deletions

View File

@ -24,6 +24,7 @@
// TODO: support long polling // TODO: support long polling
// TODO: support adding WAD transfers // TODO: support adding WAD transfers
// TODO: adapt taler-exchange-benchmark to profile bank API // TODO: adapt taler-exchange-benchmark to profile bank API
// FIXME: support 'close_connections' option
#include "platform.h" #include "platform.h"
#include <pthread.h> #include <pthread.h>
@ -945,7 +946,11 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
h->mhd_task = NULL; h->mhd_task = NULL;
} }
#if EPOLL_SUPPORT #if EPOLL_SUPPORT
if (NULL != h->mhd_rfd)
{
GNUNET_NETWORK_socket_free_memory_only_ (h->mhd_rfd); GNUNET_NETWORK_socket_free_memory_only_ (h->mhd_rfd);
h->mhd_rfd = NULL;
}
#endif #endif
if (NULL != h->mhd_bank) if (NULL != h->mhd_bank)
{ {
@ -1890,7 +1895,7 @@ TALER_FAKEBANK_start (uint16_t port,
return TALER_FAKEBANK_start2 (port, return TALER_FAKEBANK_start2 (port,
currency, currency,
65536, /* RAM limit */ 65536, /* RAM limit */
0, 1, /* number of threads */
false); false);
} }
@ -1972,7 +1977,8 @@ TALER_FAKEBANK_start2 (uint16_t port,
#if EPOLL_SUPPORT #if EPOLL_SUPPORT
| MHD_USE_EPOLL | MHD_USE_EPOLL
#endif #endif
| MHD_USE_DUAL_STACK, | MHD_USE_DUAL_STACK
| MHD_ALLOW_SUSPEND_RESUME,
port, port,
NULL, NULL, NULL, NULL,
&handle_mhd_request, h, &handle_mhd_request, h,

View File

@ -93,6 +93,11 @@ static struct GNUNET_TIME_Relative duration;
*/ */
static struct TALER_TESTING_Command *all_commands; static struct TALER_TESTING_Command *all_commands;
/**
* Dummy keepalive task.
*/
static struct GNUNET_SCHEDULER_Task *keepalive;
/** /**
* Name of our configuration file. * Name of our configuration file.
*/ */
@ -105,6 +110,22 @@ static char *cfg_filename;
*/ */
static int use_fakebank = 1; static int use_fakebank = 1;
/**
* Launch taler-exchange-wirewatch.
*/
static int start_wirewatch;
/**
* Verbosity level.
*/
static unsigned int verbose;
/**
* Size of the transaction history the fakebank
* should keep in RAM.
*/
static unsigned long long history_size = 65536;
/** /**
* How many reserves we want to create per client. * How many reserves we want to create per client.
*/ */
@ -115,6 +136,11 @@ static unsigned int howmany_reserves = 1;
*/ */
static unsigned int howmany_clients = 1; static unsigned int howmany_clients = 1;
/**
* How many bank worker threads do we want to create.
*/
static unsigned int howmany_threads;
/** /**
* Log level used during the run. * Log level used during the run.
*/ */
@ -255,12 +281,11 @@ run (void *cls,
struct TALER_TESTING_Interpreter *is) struct TALER_TESTING_Interpreter *is)
{ {
char *total_reserve_amount; char *total_reserve_amount;
size_t len;
(void) cls; (void) cls;
// FIXME: vary user accounts more... len = howmany_reserves + 2;
all_commands = GNUNET_new_array (howmany_reserves all_commands = GNUNET_new_array (len,
+ 1 /* stat CMD */
+ 1 /* End CMD */,
struct TALER_TESTING_Command); struct TALER_TESTING_Command);
GNUNET_asprintf (&total_reserve_amount, GNUNET_asprintf (&total_reserve_amount,
"%s:5", "%s:5",
@ -270,6 +295,7 @@ run (void *cls,
char *create_reserve_label; char *create_reserve_label;
char *user_payto_uri; char *user_payto_uri;
// FIXME: vary user accounts more...
GNUNET_assert (GNUNET_OK == GNUNET_assert (GNUNET_OK ==
GNUNET_CONFIGURATION_get_value_string (cfg, GNUNET_CONFIGURATION_get_value_string (cfg,
"benchmark", "benchmark",
@ -315,6 +341,7 @@ launch_clients (void)
cfg, cfg,
NULL, NULL,
GNUNET_NO); GNUNET_NO);
if (verbose)
print_stats (); print_stats ();
return result; return result;
} }
@ -332,6 +359,7 @@ launch_clients (void)
cfg, cfg,
NULL, NULL,
GNUNET_NO); GNUNET_NO);
if (verbose)
print_stats (); print_stats ();
if (GNUNET_OK != result) if (GNUNET_OK != result)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@ -356,9 +384,18 @@ launch_clients (void)
{ {
int wstatus; int wstatus;
again:
if (cpids[i] !=
waitpid (cpids[i], waitpid (cpids[i],
&wstatus, &wstatus,
0); 0))
{
if (EINTR == errno)
goto again;
GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
"waitpid");
return GNUNET_SYSERR;
}
if ( (! WIFEXITED (wstatus)) || if ( (! WIFEXITED (wstatus)) ||
(0 != WEXITSTATUS (wstatus)) ) (0 != WEXITSTATUS (wstatus)) )
{ {
@ -380,7 +417,21 @@ stop_fakebank (void *cls)
{ {
struct TALER_FAKEBANK_Handle *fakebank = cls; struct TALER_FAKEBANK_Handle *fakebank = cls;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Stopping fakebank\n");
TALER_FAKEBANK_stop (fakebank); TALER_FAKEBANK_stop (fakebank);
GNUNET_SCHEDULER_cancel (keepalive);
keepalive = NULL;
}
/**
* Dummy task that is never run.
*/
static void
never_task (void *cls)
{
GNUNET_assert (0);
} }
@ -393,16 +444,36 @@ static void
launch_fakebank (void *cls) launch_fakebank (void *cls)
{ {
struct TALER_FAKEBANK_Handle *fakebank; struct TALER_FAKEBANK_Handle *fakebank;
unsigned long long pnum;
(void) cls; (void) cls;
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (cfg,
"bank",
"HTTP_PORT",
&pnum))
{
GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
"bank",
"HTTP_PORT",
"must be valid port number");
return;
}
fakebank fakebank
= TALER_TESTING_run_fakebank (exchange_bank_account.wire_gateway_url, = TALER_FAKEBANK_start2 ((uint16_t) pnum,
currency); currency,
history_size,
howmany_threads,
false);
if (NULL == fakebank) if (NULL == fakebank)
{ {
GNUNET_break (0); GNUNET_break (0);
return; return;
} }
keepalive
= GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
&never_task,
NULL);
GNUNET_SCHEDULER_add_shutdown (&stop_fakebank, GNUNET_SCHEDULER_add_shutdown (&stop_fakebank,
fakebank); fakebank);
} }
@ -469,6 +540,8 @@ parallel_benchmark (void)
GNUNET_OS_process_wait (dbinit)); GNUNET_OS_process_wait (dbinit));
GNUNET_OS_process_destroy (dbinit); GNUNET_OS_process_destroy (dbinit);
} }
if (start_wirewatch)
{
/* start exchange wirewatch */ /* start exchange wirewatch */
wirewatch = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, wirewatch = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL,
NULL, NULL, NULL, NULL, NULL, NULL,
@ -478,15 +551,22 @@ parallel_benchmark (void)
NULL); NULL);
if (NULL == wirewatch) if (NULL == wirewatch)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to launch wirewatch, aborting benchmark\n");
if (-1 != fakebank) if (-1 != fakebank)
{ {
int wstatus; int wstatus;
kill (fakebank, kill (fakebank,
SIGTERM); SIGTERM);
if (fakebank !=
waitpid (fakebank, waitpid (fakebank,
&wstatus, &wstatus,
0); 0))
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
"waitpid");
}
fakebank = -1; fakebank = -1;
} }
if (NULL != bankd) if (NULL != bankd)
@ -499,6 +579,7 @@ parallel_benchmark (void)
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
} }
}
if ( (MODE_CLIENT == mode) || if ( (MODE_CLIENT == mode) ||
(MODE_BOTH == mode) ) (MODE_BOTH == mode) )
@ -513,7 +594,8 @@ parallel_benchmark (void)
if ( (MODE_BANK == mode) || if ( (MODE_BANK == mode) ||
(MODE_BOTH == mode) ) (MODE_BOTH == mode) )
{ {
GNUNET_assert (NULL != wirewatch); if (NULL != wirewatch)
{
/* stop wirewatch */ /* stop wirewatch */
GNUNET_break (0 == GNUNET_break (0 ==
GNUNET_OS_process_kill (wirewatch, GNUNET_OS_process_kill (wirewatch,
@ -521,17 +603,27 @@ parallel_benchmark (void)
GNUNET_break (GNUNET_OK == GNUNET_break (GNUNET_OK ==
GNUNET_OS_process_wait (wirewatch)); GNUNET_OS_process_wait (wirewatch));
GNUNET_OS_process_destroy (wirewatch); GNUNET_OS_process_destroy (wirewatch);
wirewatch = NULL;
}
/* stop fakebank */ /* stop fakebank */
if (-1 != fakebank) if (-1 != fakebank)
{ {
int wstatus; int wstatus;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Telling fakebank to shut down\n");
kill (fakebank, kill (fakebank,
SIGTERM); SIGTERM);
if (fakebank !=
waitpid (fakebank, waitpid (fakebank,
&wstatus, &wstatus,
0); 0))
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
"waitpid");
}
else
{
if ( (! WIFEXITED (wstatus)) || if ( (! WIFEXITED (wstatus)) ||
(0 != WEXITSTATUS (wstatus)) ) (0 != WEXITSTATUS (wstatus)) )
{ {
@ -539,6 +631,8 @@ parallel_benchmark (void)
result = GNUNET_SYSERR; result = GNUNET_SYSERR;
} }
} }
fakebank = -1;
}
if (NULL != bankd) if (NULL != bankd)
{ {
GNUNET_OS_process_kill (bankd, GNUNET_OS_process_kill (bankd,
@ -588,16 +682,31 @@ main (int argc,
"run as bank, client or both", "run as bank, client or both",
&mode_str), &mode_str),
GNUNET_GETOPT_option_uint ('p', GNUNET_GETOPT_option_uint ('p',
"parallelism", "worker-parallelism",
"NPROCS", "NPROCS",
"How many client processes we should run", "How many client processes we should run",
&howmany_clients), &howmany_clients),
GNUNET_GETOPT_option_uint ('P',
"service-parallelism",
"NTHREADS",
"How many service threads we should create",
&howmany_threads),
GNUNET_GETOPT_option_uint ('r', GNUNET_GETOPT_option_uint ('r',
"reserves", "reserves",
"NRESERVES", "NRESERVES",
"How many reserves per client we should create", "How many reserves per client we should create",
&howmany_reserves), &howmany_reserves),
GNUNET_GETOPT_option_ulong ('s',
"size",
"HISTORY_SIZE",
"Maximum history size kept in memory by the fakebank",
&history_size),
GNUNET_GETOPT_option_version (PACKAGE_VERSION " " VCS_VERSION), GNUNET_GETOPT_option_version (PACKAGE_VERSION " " VCS_VERSION),
GNUNET_GETOPT_option_verbose (&verbose),
GNUNET_GETOPT_option_flag ('w',
"wirewatch",
"run taler-exchange-wirewatch",
&start_wirewatch),
GNUNET_GETOPT_OPTION_END GNUNET_GETOPT_OPTION_END
}; };
@ -617,6 +726,12 @@ main (int argc,
GNUNET_log_setup ("taler-bank-benchmark", GNUNET_log_setup ("taler-bank-benchmark",
NULL == loglev ? "INFO" : loglev, NULL == loglev ? "INFO" : loglev,
logfile); logfile);
if (history_size < 10)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"History size too small, this can hardly work\n");
return BAD_CLI_ARG;
}
if (NULL == mode_str) if (NULL == mode_str)
mode = MODE_BOTH; mode = MODE_BOTH;
else if (0 == strcasecmp (mode_str, else if (0 == strcasecmp (mode_str,
@ -707,6 +822,7 @@ main (int argc,
if (GNUNET_OK == result) if (GNUNET_OK == result)
{ {
struct rusage usage; struct rusage usage;
unsigned long long tps;
GNUNET_assert (0 == getrusage (RUSAGE_CHILDREN, GNUNET_assert (0 == getrusage (RUSAGE_CHILDREN,
&usage)); &usage));
@ -716,11 +832,14 @@ main (int argc,
howmany_clients, howmany_clients,
GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_STRINGS_relative_time_to_string (duration,
GNUNET_YES)); GNUNET_YES));
tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU
/ (duration.rel_value_us / 1000LL);
fprintf (stdout, fprintf (stdout,
"RAW: %04u %04u %16llu\n", "RAW: %04u %04u %16llu (%llu TPS)\n",
howmany_reserves, howmany_reserves,
howmany_clients, howmany_clients,
(unsigned long long) duration.rel_value_us); (unsigned long long) duration.rel_value_us,
tps);
fprintf (stdout, fprintf (stdout,
"CPU time: sys %llu user %llu\n", \ "CPU time: sys %llu user %llu\n", \
(unsigned long long) (usage.ru_stime.tv_sec * 1000 * 1000 (unsigned long long) (usage.ru_stime.tv_sec * 1000 * 1000