diff options
| author | Christian Grothoff <christian@grothoff.org> | 2022-05-22 13:48:56 +0200 | 
|---|---|---|
| committer | Christian Grothoff <christian@grothoff.org> | 2022-05-22 13:48:56 +0200 | 
| commit | 21bcc5fa0bb4e2c101fc71d5740934d5914eb480 (patch) | |
| tree | 73c83681219894b607b97b8133331ecd0462e04f | |
| parent | 3233195d2d6c4733e6c98e754c54902f9c6d657c (diff) | |
-fix wirewatch assertion
| -rw-r--r-- | src/benchmark/taler-bank-benchmark.c | 125 | ||||
| -rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 40 | ||||
| -rw-r--r-- | src/testing/testing_api_loop.c | 5 | 
3 files changed, 124 insertions, 46 deletions
| diff --git a/src/benchmark/taler-bank-benchmark.c b/src/benchmark/taler-bank-benchmark.c index 4d7dbe35..75a7434d 100644 --- a/src/benchmark/taler-bank-benchmark.c +++ b/src/benchmark/taler-bank-benchmark.c @@ -111,9 +111,9 @@ static char *cfg_filename;  static int use_fakebank = 1;  /** - * Launch taler-exchange-wirewatch. + * Number of taler-exchange-wirewatchers to launch.   */ -static int start_wirewatch; +static unsigned int start_wirewatch;  /**   * Verbosity level. @@ -265,8 +265,9 @@ run (void *cls,    (void) cls;    len = howmany_reserves + 2; -  all_commands = GNUNET_new_array (len, -                                   struct TALER_TESTING_Command); +  all_commands = GNUNET_malloc_large (len +                                      * sizeof (struct TALER_TESTING_Command)); +  GNUNET_assert (NULL != all_commands);    GNUNET_asprintf (&total_reserve_amount,                     "%s:5",                     currency); @@ -465,14 +466,17 @@ launch_fakebank (void *cls)   *   * @return #GNUNET_OK on success   */ -static int +static enum GNUNET_GenericReturnValue  parallel_benchmark (void)  {    enum GNUNET_GenericReturnValue result = GNUNET_OK;    pid_t fakebank = -1;    struct GNUNET_OS_Process *bankd = NULL; -  struct GNUNET_OS_Process *wirewatch = NULL; +  struct GNUNET_OS_Process *wirewatch[GNUNET_NZL (start_wirewatch)]; +  memset (wirewatch, +          0, +          sizeof (wirewatch));    if ( (MODE_BANK == mode) ||         (MODE_BOTH == mode) )    { @@ -560,19 +564,30 @@ parallel_benchmark (void)                      GNUNET_OS_process_wait (dbinit));        GNUNET_OS_process_destroy (dbinit);      } -    if (start_wirewatch) +    /* start exchange wirewatch */ +    for (unsigned int w = 0; w<start_wirewatch; w++)      { -      /* start exchange wirewatch */ -      wirewatch = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, -                                           NULL, NULL, NULL, -                                           "taler-exchange-wirewatch", -                                           "taler-exchange-wirewatch", -                                           "-c", cfg_filename, -                                           NULL); -      if (NULL == wirewatch) +      wirewatch[w] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, +                                              NULL, NULL, NULL, +                                              "taler-exchange-wirewatch", +                                              "taler-exchange-wirewatch", +                                              "-c", cfg_filename, +                                              "-L", loglev, +                                              NULL); +      if (NULL == wirewatch[w])        {          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                      "Failed to launch wirewatch, aborting benchmark\n"); +        for (unsigned int x = 0; x<w; x++) +        { +          GNUNET_break (0 == +                        GNUNET_OS_process_kill (wirewatch[x], +                                                SIGTERM)); +          GNUNET_break (GNUNET_OK == +                        GNUNET_OS_process_wait (wirewatch[x])); +          GNUNET_OS_process_destroy (wirewatch[x]); +          wirewatch[x] = NULL; +        }          if (-1 != fakebank)          {            int wstatus; @@ -618,17 +633,61 @@ parallel_benchmark (void)    if ( (MODE_BANK == mode) ||         (MODE_BOTH == mode) )    { -    if (NULL != wirewatch) +    /* Ensure wirewatch runs to completion! */ +    if (0 != start_wirewatch)      { -      /* stop wirewatch */ +      /* replace ONE of the wirewatchers with one that is in test-mode */        GNUNET_break (0 == -                    GNUNET_OS_process_kill (wirewatch, +                    GNUNET_OS_process_kill (wirewatch[0],                                              SIGTERM));        GNUNET_break (GNUNET_OK == -                    GNUNET_OS_process_wait (wirewatch)); -      GNUNET_OS_process_destroy (wirewatch); -      wirewatch = NULL; +                    GNUNET_OS_process_wait (wirewatch[0])); +      GNUNET_OS_process_destroy (wirewatch[0]); +      wirewatch[0] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, +                                              NULL, NULL, NULL, +                                              "taler-exchange-wirewatch", +                                              "taler-exchange-wirewatch", +                                              "-c", cfg_filename, +                                              "-L", loglev, +                                              "-t", +                                              NULL); +      /* wait for it to finish! */ +      GNUNET_break (GNUNET_OK == +                    GNUNET_OS_process_wait (wirewatch[0])); +      GNUNET_OS_process_destroy (wirewatch[0]); +      wirewatch[0] = NULL; +      /* Then stop the rest, which should basically also be finished */ +      for (unsigned int w = 1; w<start_wirewatch; w++) +      { +        GNUNET_break (0 == +                      GNUNET_OS_process_kill (wirewatch[w], +                                              SIGTERM)); +        GNUNET_break (GNUNET_OK == +                      GNUNET_OS_process_wait (wirewatch[w])); +        GNUNET_OS_process_destroy (wirewatch[w]); +      } + +      /* But be extra sure we did finish all shards by doing one more */ +      wirewatch[0] = GNUNET_OS_start_process (GNUNET_OS_INHERIT_STD_ALL, +                                              NULL, NULL, NULL, +                                              "taler-exchange-wirewatch", +                                              "taler-exchange-wirewatch", +                                              "-c", cfg_filename, +                                              "-L", loglev, +                                              "-t", +                                              NULL); +      /* wait for it to finish! */ +      GNUNET_break (GNUNET_OK == +                    GNUNET_OS_process_wait (wirewatch[0])); +      GNUNET_OS_process_destroy (wirewatch[0]); +      wirewatch[0] = NULL;      } + +    /* Now stop the time, if this was the right mode */ +    if ( (GNUNET_YES != linger) && +         (MODE_BANK != mode) ) +      duration = GNUNET_TIME_absolute_get_duration (start_time); +      /* stop fakebank */      if (-1 != fakebank)      { @@ -727,9 +786,10 @@ main (int argc,                                  &history_size),      GNUNET_GETOPT_option_version (PACKAGE_VERSION " " VCS_VERSION),      GNUNET_GETOPT_option_verbose (&verbose), -    GNUNET_GETOPT_option_flag ('w', +    GNUNET_GETOPT_option_uint ('w',                                 "wirewatch", -                               "run taler-exchange-wirewatch", +                               "NPROC", +                               "run NPROC taler-exchange-wirewatch processes",                                 &start_wirewatch),      GNUNET_GETOPT_OPTION_END    }; @@ -858,14 +918,17 @@ main (int argc,               howmany_clients,               GNUNET_STRINGS_relative_time_to_string (duration,                                                       GNUNET_YES)); -    tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU -          / (duration.rel_value_us / 1000LL); -    fprintf (stdout, -             "RAW: %04u %04u %16llu (%llu TPS)\n", -             howmany_reserves, -             howmany_clients, -             (unsigned long long) duration.rel_value_us, -             tps); +    if (! GNUNET_TIME_relative_is_zero (duration)) +    { +      tps = ((unsigned long long) howmany_reserves) * howmany_clients * 1000LLU +            / (duration.rel_value_us / 1000LL); +      fprintf (stdout, +               "RAW: %04u %04u %16llu (%llu TPS)\n", +               howmany_reserves, +               howmany_clients, +               (unsigned long long) duration.rel_value_us, +               tps); +    }      fprintf (stdout,               "CPU time: sys %llu user %llu\n",                          \               (unsigned long long) (usage.ru_stime.tv_sec * 1000 * 1000 diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 21d2df15..7cc4ac38 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -384,10 +384,10 @@ handle_soft_error (struct WireAccount *wa)                  "Reduced batch size to %llu due to serialization issue\n",                  (unsigned long long) wa->batch_size);    } -  GNUNET_assert (NULL == task);    /* Reset to beginning of transaction, and go again       from there. */    wa->latest_row_off = wa->batch_start; +  GNUNET_assert (NULL == task);    task = GNUNET_SCHEDULER_add_now (&continue_with_shard,                                     wa);  } @@ -458,6 +458,7 @@ account_completed (struct WireAccount *wa)        = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);      wa = wa->next;    } +  GNUNET_assert (NULL == task);    schedule_transfers (wa);  } @@ -533,6 +534,7 @@ do_commit (struct WireAccount *wa)    enum GNUNET_DB_QueryStatus qs;    bool shard_done; +  GNUNET_assert (NULL == task);    shard_done = check_shard_done (wa);    wa->started_transaction = false;    GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -563,7 +565,8 @@ do_commit (struct WireAccount *wa)    if (shard_done)      account_completed (wa);    else -    continue_with_shard (wa); +    task = GNUNET_SCHEDULER_add_now (&continue_with_shard, +                                     wa);  } @@ -591,6 +594,7 @@ history_cb (void *cls,    enum GNUNET_DB_QueryStatus qs;    (void) json; +  GNUNET_assert (NULL == task);    if (NULL == details)    {      wa->hh = NULL; @@ -660,14 +664,17 @@ history_cb (void *cls,      wa->hh = NULL;      if (wa->started_transaction)      { +      GNUNET_assert (NULL == task);        do_commit (wa);      }      else      { +      GNUNET_assert (NULL == task);        if (check_shard_done (wa))          account_completed (wa);        else -        continue_with_shard (wa); +        task = GNUNET_SCHEDULER_add_now (&continue_with_shard, +                                         wa);      }      return GNUNET_SYSERR;    } @@ -746,6 +753,7 @@ continue_with_shard (void *cls)    struct WireAccount *wa = cls;    unsigned int limit; +  task = NULL;    limit = GNUNET_MIN (wa->batch_size,                        wa->shard_end - wa->latest_row_off);    wa->max_row_off = wa->latest_row_off + limit; @@ -816,15 +824,18 @@ lock_shard (void *cls)      return;    case GNUNET_DB_STATUS_SOFT_ERROR:      /* try again */ -    GNUNET_break (0); -    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, -                "Serialization error tying to obtain shard %s, will try again in %s!\n", -                wa->job_name, -                GNUNET_STRINGS_relative_time_to_string ( -                  wirewatch_idle_sleep_interval, -                  GNUNET_YES)); -    wa->delayed_until = GNUNET_TIME_relative_to_absolute ( -      wirewatch_idle_sleep_interval); +    { +      struct GNUNET_TIME_Relative rdelay; + +      rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval); +      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +                  "Serialization error tying to obtain shard %s, will try again in %s!\n", +                  wa->job_name, +                  GNUNET_STRINGS_relative_time_to_string (rdelay, +                                                          GNUNET_YES)); +      wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); +    } +    GNUNET_assert (NULL == task);      schedule_transfers (wa->next);      return;    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: @@ -837,6 +848,7 @@ lock_shard (void *cls)                    GNUNET_YES));      wa->delayed_until = GNUNET_TIME_relative_to_absolute (        wirewatch_idle_sleep_interval); +    GNUNET_assert (NULL == task);      schedule_transfers (wa->next);      return;    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: @@ -854,7 +866,8 @@ lock_shard (void *cls)                         we find out that we're really busy */    wa->batch_start = wa->shard_start;    wa->latest_row_off = wa->batch_start; -  continue_with_shard (wa); +  task = GNUNET_SCHEDULER_add_now (&continue_with_shard, +                                   wa);  } @@ -894,6 +907,7 @@ run (void *cls,      return;    }    rc = GNUNET_CURL_gnunet_rc_create (ctx); +  GNUNET_assert (NULL == task);    task = GNUNET_SCHEDULER_add_now (&lock_shard,                                     wa_head);  } diff --git a/src/testing/testing_api_loop.c b/src/testing/testing_api_loop.c index 1ea1d5a2..190e2092 100644 --- a/src/testing/testing_api_loop.c +++ b/src/testing/testing_api_loop.c @@ -449,8 +449,9 @@ TALER_TESTING_run2 (struct TALER_TESTING_Interpreter *is,    /* get the number of commands */    for (i = 0; NULL != commands[i].label; i++)      ; -  is->commands = GNUNET_new_array (i + 1, -                                   struct TALER_TESTING_Command); +  is->commands = GNUNET_malloc_large ( (i + 1) +                                       * sizeof (struct TALER_TESTING_Command)); +  GNUNET_assert (NULL != is->commands);    memcpy (is->commands,            commands,            sizeof (struct TALER_TESTING_Command) * i); | 
