diff options
| author | Christian Grothoff <christian@grothoff.org> | 2022-05-21 21:07:24 +0200 | 
|---|---|---|
| committer | Christian Grothoff <christian@grothoff.org> | 2022-05-21 21:07:24 +0200 | 
| commit | 737937291cceddd81e0dac676d3cb909250f628a (patch) | |
| tree | 94d62258750b4e261c14f8f84f58380f1a206e79 | |
| parent | a6494f9905f88245a4dbb6830dfd531c0badf17e (diff) | |
wirewatch spring cleaning
| -rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 445 | 
1 files changed, 244 insertions, 201 deletions
| diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 898d678a..21d2df15 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -107,6 +107,12 @@ struct WireAccount    struct GNUNET_TIME_Absolute shard_start_time;    /** +   * How long did we take to finish the last shard +   * for this account? +   */ +  struct GNUNET_TIME_Relative shard_delay; + +  /**     * Name of our job in the shard table.     */    char *job_name; @@ -117,16 +123,11 @@ struct WireAccount    unsigned int batch_size;    /** -   * How much do we incremnt @e batch_size on success? +   * How much do we increment @e batch_size on success?     */    unsigned int batch_thresh;    /** -   * How many transactions did we see in the current batch? -   */ -  unsigned int current_batch_size; - -  /**     * Should we delay the next request to the wire plugin a bit?  Set to     * false if we actually did some work.     */ @@ -151,12 +152,6 @@ static struct WireAccount *wa_head;  static struct WireAccount *wa_tail;  /** - * Wire account we are currently processing.  This would go away - * if we ever start processing all accounts in parallel. - */ -static struct WireAccount *wa_pos; - -/**   * Handle to the context for interacting with the bank.   */  static struct GNUNET_CURL_Context *ctx; @@ -185,11 +180,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;  static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;  /** - * How long did we take to finish the last shard? - */ -static struct GNUNET_TIME_Relative shard_delay; - -/**   * Modulus to apply to group shards.  The shard size must ultimately be a   * multiple of the batch size. Thus, if this is not a multiple of the   * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size. @@ -249,9 +239,9 @@ shutdown_task (void *cls)          wa->started_transaction = false;        }        qs = db_plugin->abort_shard (db_plugin->cls, -                                   wa_pos->job_name, -                                   wa_pos->shard_start, -                                   wa_pos->shard_end); +                                   wa->job_name, +                                   wa->shard_start, +                                   wa->shard_end);        if (qs <= 0)          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,                      "Failed to abort work shard on shutdown\n"); @@ -259,8 +249,6 @@ shutdown_task (void *cls)        GNUNET_free (wa);      }    } -  wa_pos = NULL; -    if (NULL != ctx)    {      GNUNET_CURL_fini (ctx); @@ -359,12 +347,22 @@ exchange_serve_process_config (void)  /** - * Query for incoming wire transfers. + * Lock a shard and then begin to query for incoming wire transfers.   * - * @param cls NULL + * @param cls a `struct WireAccount` to operate on   */  static void -find_transfers (void *cls); +lock_shard (void *cls); + + +/** + * Continue with the credit history of the shard + * reserved as @a wa. + * + * @param[in,out] cls `struct WireAccount *` account with shard to continue processing + */ +static void +continue_with_shard (void *cls);  /** @@ -387,23 +385,59 @@ handle_soft_error (struct WireAccount *wa)                  (unsigned long long) wa->batch_size);    }    GNUNET_assert (NULL == task); -  task = GNUNET_SCHEDULER_add_now (&find_transfers, -                                   NULL); +  /* Reset to beginning of transaction, and go again +     from there. */ +  wa->latest_row_off = wa->batch_start; +  task = GNUNET_SCHEDULER_add_now (&continue_with_shard, +                                   wa);  }  /** - * We are done with a shard, move on to the next one. + * Schedule the #lock_shard() operation for + * @a wa. If @a wa is NULL, start with #wa_head. + * + * @param wa account to schedule #lock_shard() for, + *        possibly NULL (!). + */ +static void +schedule_transfers (struct WireAccount *wa) +{ +  if (NULL == wa) +  { +    wa = wa_head; +    GNUNET_assert (NULL != wa); +  } +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Will try to lock next shard of %s in %s\n", +              wa->job_name, +              GNUNET_STRINGS_relative_time_to_string ( +                GNUNET_TIME_absolute_get_remaining (wa->delayed_until), +                GNUNET_YES)); +  GNUNET_assert (NULL == task); +  task = GNUNET_SCHEDULER_add_at (wa->delayed_until, +                                  &lock_shard, +                                  wa); +} + + +/** + * We are done with the work that is possible on @a wa right now (and the + * transaction was committed, if there was one to commit). Move on to the next + * account.   *   * @param wa wire account for which we completed a shard   */  static void -shard_completed (struct WireAccount *wa) +account_completed (struct WireAccount *wa)  { -  /* transaction success, update #last_row_off */ -  wa->batch_start = wa->latest_row_off; -  if (wa->batch_size < MAXIMUM_BATCH_SIZE) +  GNUNET_assert (! wa->started_transaction); +  if ( (wa->batch_start + wa->batch_size == +        wa->latest_row_off) && +       (wa->batch_size < MAXIMUM_BATCH_SIZE) )    { +    /* The current batch size worked without serialization +       issues, and we are allowed to grow. Do so slowly. */      int delta;      delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; @@ -411,45 +445,45 @@ shard_completed (struct WireAccount *wa)        delta = -delta;      wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,                                   wa->batch_size + delta + 1); -    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, +    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Increasing batch size to %llu\n",                  (unsigned long long) wa->batch_size);    } +    if (wa->delay)    { +    /* This account was finished, block this one for the +       #wirewatch_idle_sleep_interval and move on to the next one. */      wa->delayed_until        = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); -    wa_pos = wa_pos->next; -    if (NULL == wa_pos) -      wa_pos = wa_head; -    GNUNET_assert (NULL != wa_pos); +    wa = wa->next;    } -  GNUNET_assert (NULL == task); -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, -              "Will look for more transfers in %s\n", -              GNUNET_STRINGS_relative_time_to_string ( -                GNUNET_TIME_absolute_get_remaining (wa_pos->delayed_until), -                GNUNET_YES)); -  task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until, -                                  &find_transfers, -                                  NULL); +  schedule_transfers (wa);  }  /** - * We are finished with the current shard. Update the database, marking the - * shard as finished. + * Check if we are finished with the current shard.  If so, update the + * database, marking the shard as finished.   *   * @param wa wire account to commit for - * @return true on success + * @return true if we were indeed done with the shard   */  static bool -mark_shard_done (struct WireAccount *wa) +check_shard_done (struct WireAccount *wa)  {    enum GNUNET_DB_QueryStatus qs;    if (wa->shard_end > wa->latest_row_off) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Shard %s (%llu,%llu] at %llu\n", +                wa->job_name, +                (unsigned long long) wa->shard_start, +                (unsigned long long) wa->shard_end, +                (unsigned long long) wa->latest_row_off);      return false; /* actually, not done! */ +  }    /* shard is complete, mark this as well */    qs = db_plugin->complete_shard (db_plugin->cls,                                    wa->job_name, @@ -468,12 +502,19 @@ mark_shard_done (struct WireAccount *wa)      handle_soft_error (wa);      return false;    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: -    /* already existed, ok, let's just continue */ +    GNUNET_break (0); +    /* Not expected, but let's just continue */      break;    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:      /* normal case */ -    shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); - +    wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Completed shard %s (%llu,%llu] after %s\n", +                wa->job_name, +                (unsigned long long) wa->shard_start, +                (unsigned long long) wa->shard_end, +                GNUNET_STRINGS_relative_time_to_string (wa->shard_delay, +                                                        GNUNET_YES));      break;    }    return true; @@ -481,16 +522,6 @@ mark_shard_done (struct WireAccount *wa)  /** - * Continue with the credit history of the shard - * reserved as @a wa_pos. - * - * @param[in,out] wa_pos shard to continue processing - */ -static void -continue_with_shard (struct WireAccount *wa_pos); - - -/**   * We are finished with the current transaction, try   * to commit and then schedule the next iteration.   * @@ -502,8 +533,17 @@ do_commit (struct WireAccount *wa)    enum GNUNET_DB_QueryStatus qs;    bool shard_done; +  shard_done = check_shard_done (wa);    wa->started_transaction = false; -  shard_done = mark_shard_done (wa); +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Committing %s progress (%llu,%llu] at %llu\n (%s)", +              wa->job_name, +              (unsigned long long) wa->shard_start, +              (unsigned long long) wa->shard_end, +              (unsigned long long) wa->latest_row_off, +              shard_done +              ? "shard done" +              : "shard incomplete");    qs = db_plugin->commit (db_plugin->cls);    switch (qs)    { @@ -521,7 +561,7 @@ do_commit (struct WireAccount *wa)      break;    }    if (shard_done) -    shard_completed (wa); +    account_completed (wa);    else      continue_with_shard (wa);  } @@ -568,63 +608,67 @@ history_cb (void *cls,      }      if (wa->started_transaction)      { -      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, -                  "End of list. Committing progress!\n"); +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "End of list. Committing progress on %s of (%llu,%llu]!\n", +                  wa->job_name, +                  (unsigned long long) wa->batch_start, +                  (unsigned long long) wa->latest_row_off);        do_commit (wa); +      return GNUNET_OK; /* will be ignored anyway */      } -    else +    /* We did not even start a transaction. */ +    if ( (wa->delay) && +         (test_mode) && +         (NULL == wa->next) )      { -      if ( (wa->delay) && -           (test_mode) && -           (NULL == wa->next) ) -      { -        GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                    "Shutdown due to test mode!\n"); -        GNUNET_SCHEDULER_shutdown (); -        return GNUNET_OK; -      } -      else -      { -        shard_completed (wa); -      } +      /* We exit on idle */ +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Shutdown due to test mode!\n"); +      GNUNET_SCHEDULER_shutdown (); +      return GNUNET_OK;      } +    account_completed (wa);      return GNUNET_OK; /* will be ignored anyway */    } + +  /* We did get 'details' from the bank. Do sanity checks before inserting. */    if (serial_id < wa->latest_row_off)    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Serial ID %llu not monotonic (got %llu before). Failing!\n",                  (unsigned long long) serial_id,                  (unsigned long long) wa->latest_row_off); -    if (wa->started_transaction) -    { -      wa->started_transaction = false; -      db_plugin->rollback (db_plugin->cls); -    }      GNUNET_SCHEDULER_shutdown ();      wa->hh = NULL;      return GNUNET_SYSERR;    } +  /* If we got 'limit' transactions back from the bank, +     we should not introduce any delay before the next +     call. */    if (serial_id >= wa->max_row_off)      wa->delay = false;    if (serial_id > wa->shard_end)    { -    /* we are done with the current shard, commit and stop this iteration! */ +    /* we are *past* the current shard (likely because the serial_id of the +       shard_end happens to not exist in the DB). So commit and stop this +       iteration! */      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Serial ID %llu past shard end at %llu, ending iteration early!\n",                  (unsigned long long) serial_id,                  (unsigned long long) wa->shard_end); -    wa->latest_row_off = serial_id; +    wa->latest_row_off = serial_id - 1; /* excluding serial_id! */ +    wa->hh = NULL;      if (wa->started_transaction)      {        do_commit (wa);      }      else      { -      if (mark_shard_done (wa)) -        shard_completed (wa); +      if (check_shard_done (wa)) +        account_completed (wa); +      else +        continue_with_shard (wa);      } -    wa->hh = NULL;      return GNUNET_SYSERR;    }    if (! wa->started_transaction) @@ -640,7 +684,6 @@ history_cb (void *cls,        wa->hh = NULL;        return GNUNET_SYSERR;      } -    wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();      wa->started_transaction = true;    }    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -675,6 +718,17 @@ history_cb (void *cls,      wa->hh = NULL;      return GNUNET_SYSERR;    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +    /* Either wirewatch was freshly started after the system was +       shutdown and we're going over an incomplete shard again +       after being restarted, or the shard lock period was too +       short (number of workers set incorrectly?) and a 2nd +       wirewatcher has been stealing our work while we are still +       at it. */ +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Attempted to import transaction %llu (%s) twice. " +                "This should happen rarely (if not, ask for support).\n", +                (unsigned long long) serial_id, +                wa->job_name);      /* already existed, ok, let's just continue */      break;    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: @@ -686,130 +740,121 @@ history_cb (void *cls,  } -/** - * Query for incoming wire transfers. - * - * @param cls NULL - */  static void -find_transfers (void *cls) +continue_with_shard (void *cls)  { -  enum GNUNET_DB_QueryStatus qs; +  struct WireAccount *wa = cls; +  unsigned int limit; -  (void) cls; -  task = NULL; -  if (GNUNET_SYSERR == -      db_plugin->preflight (db_plugin->cls)) +  limit = GNUNET_MIN (wa->batch_size, +                      wa->shard_end - wa->latest_row_off); +  wa->max_row_off = wa->latest_row_off + limit; +  GNUNET_assert (NULL == wa->hh); +  wa->hh = TALER_BANK_credit_history (ctx, +                                      wa->ai->auth, +                                      wa->latest_row_off, +                                      limit, +                                      test_mode +                                      ? GNUNET_TIME_UNIT_ZERO +                                      : LONGPOLL_TIMEOUT, +                                      &history_cb, +                                      wa); +  if (NULL == wa->hh)    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Failed to obtain database connection!\n"); +                "Failed to start request for account history!\n");      global_ret = EXIT_FAILURE;      GNUNET_SCHEDULER_shutdown ();      return;    } -  wa_pos->delay = true; -  wa_pos->current_batch_size = 0; /* reset counter */ -  if (wa_pos->shard_end <= wa_pos->batch_start) -  { -    uint64_t start; -    uint64_t end; -    struct GNUNET_TIME_Relative delay; -    /* advance to next shard */ - -    if (0 == max_workers) -      delay = GNUNET_TIME_UNIT_ZERO; -    else -      delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( -        GNUNET_CRYPTO_QUALITY_WEAK, -        4 * GNUNET_TIME_relative_max ( -          wirewatch_idle_sleep_interval, -          GNUNET_TIME_relative_multiply (shard_delay, -                                         max_workers)).rel_value_us); -    qs = db_plugin->begin_shard (db_plugin->cls, -                                 wa_pos->job_name, -                                 delay, -                                 shard_size, -                                 &start, -                                 &end); -    switch (qs) -    { -    case GNUNET_DB_STATUS_HARD_ERROR: -      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                  "Failed to obtain starting point for montoring from database!\n"); -      global_ret = EXIT_FAILURE; -      GNUNET_SCHEDULER_shutdown (); -      return; -    case GNUNET_DB_STATUS_SOFT_ERROR: -      /* try again */ -      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, -                  "Serialization error tying to obtain shard, will try again in %s!\n", -                  GNUNET_STRINGS_relative_time_to_string ( -                    wirewatch_idle_sleep_interval, -                    GNUNET_YES)); -      task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval, -                                           &find_transfers, -                                           NULL); -      return; -    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: -      GNUNET_break (0); -      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, -                  "No shard available, will try again in %s!\n", -                  GNUNET_STRINGS_relative_time_to_string ( -                    wirewatch_idle_sleep_interval, -                    GNUNET_YES)); -      task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval, -                                           &find_transfers, -                                           NULL); -      return; -    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: -      wa_pos->shard_start = start; -      wa_pos->shard_end = end; -      wa_pos->batch_start = start; -      GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                  "Starting with shard at [%llu,%llu) locked for %s\n", -                  (unsigned long long) start, -                  (unsigned long long) end, -                  GNUNET_STRINGS_relative_time_to_string (delay, -                                                          GNUNET_YES)); -      break; -    } -  } -  wa_pos->latest_row_off = wa_pos->batch_start; -  continue_with_shard (wa_pos);  }  static void -continue_with_shard (struct WireAccount *wa_pos) +lock_shard (void *cls)  { -  unsigned int limit; +  struct WireAccount *wa = cls; +  enum GNUNET_DB_QueryStatus qs; +  struct GNUNET_TIME_Relative delay; -  limit = GNUNET_MIN (wa_pos->batch_size, -                      wa_pos->shard_end - wa_pos->latest_row_off); -  GNUNET_assert (NULL == wa_pos->hh); -  wa_pos->max_row_off = wa_pos->latest_row_off + limit - 1; -  wa_pos->hh = TALER_BANK_credit_history (ctx, -                                          wa_pos->ai->auth, -                                          wa_pos->latest_row_off, -                                          limit, -                                          test_mode -                                          ? GNUNET_TIME_UNIT_ZERO -                                          : LONGPOLL_TIMEOUT, -                                          &history_cb, -                                          wa_pos); -  if (NULL == wa_pos->hh) +  task = NULL; +  if (GNUNET_SYSERR == +      db_plugin->preflight (db_plugin->cls))    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Failed to start request for account history!\n"); -    if (wa_pos->started_transaction) -    { -      db_plugin->rollback (db_plugin->cls); -      wa_pos->started_transaction = false; -    } +                "Failed to obtain database connection!\n"); +    global_ret = EXIT_FAILURE; +    GNUNET_SCHEDULER_shutdown (); +    return; +  } +  /* How long we lock a shard depends on the number of +     workers expected, and how long we usually took to +     process a shard. */ +  if (0 == max_workers) +    delay = GNUNET_TIME_UNIT_ZERO; +  else +    delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( +      GNUNET_CRYPTO_QUALITY_WEAK, +      4 * GNUNET_TIME_relative_max ( +        wirewatch_idle_sleep_interval, +        GNUNET_TIME_relative_multiply (wa->shard_delay, +                                       max_workers)).rel_value_us); +  wa->shard_start_time = GNUNET_TIME_absolute_get (); +  qs = db_plugin->begin_shard (db_plugin->cls, +                               wa->job_name, +                               delay, +                               shard_size, +                               &wa->shard_start, +                               &wa->shard_end); +  switch (qs) +  { +  case GNUNET_DB_STATUS_HARD_ERROR: +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to obtain starting point for montoring from database!\n");      global_ret = EXIT_FAILURE;      GNUNET_SCHEDULER_shutdown ();      return; +  case GNUNET_DB_STATUS_SOFT_ERROR: +    /* try again */ +    GNUNET_break (0); +    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +                "Serialization error tying to obtain shard %s, will try again in %s!\n", +                wa->job_name, +                GNUNET_STRINGS_relative_time_to_string ( +                  wirewatch_idle_sleep_interval, +                  GNUNET_YES)); +    wa->delayed_until = GNUNET_TIME_relative_to_absolute ( +      wirewatch_idle_sleep_interval); +    schedule_transfers (wa->next); +    return; +  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +    GNUNET_break (0); +    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +                "No shard available, will try again for %s in %s!\n", +                wa->job_name, +                GNUNET_STRINGS_relative_time_to_string ( +                  wirewatch_idle_sleep_interval, +                  GNUNET_YES)); +    wa->delayed_until = GNUNET_TIME_relative_to_absolute ( +      wirewatch_idle_sleep_interval); +    schedule_transfers (wa->next); +    return; +  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +    /* continued below */ +    break;    } +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Starting with shard %s at (%llu,%llu] locked for %s\n", +              wa->job_name, +              (unsigned long long) wa->shard_start, +              (unsigned long long) wa->shard_end, +              GNUNET_STRINGS_relative_time_to_string (delay, +                                                      GNUNET_YES)); +  wa->delay = true; /* default is to delay, unless +                       we find out that we're really busy */ +  wa->batch_start = wa->shard_start; +  wa->latest_row_off = wa->batch_start; +  continue_with_shard (wa);  } @@ -838,21 +883,19 @@ run (void *cls,      global_ret = EXIT_NOTCONFIGURED;      return;    } -  wa_pos = wa_head; -  GNUNET_assert (NULL != wa_pos);    GNUNET_SCHEDULER_add_shutdown (&shutdown_task,                                   cls);    ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,                            &rc); -  rc = GNUNET_CURL_gnunet_rc_create (ctx);    if (NULL == ctx)    {      GNUNET_break (0); +    GNUNET_SCHEDULER_shutdown ();      return;    } - -  task = GNUNET_SCHEDULER_add_now (&find_transfers, -                                   NULL); +  rc = GNUNET_CURL_gnunet_rc_create (ctx); +  task = GNUNET_SCHEDULER_add_now (&lock_shard, +                                   wa_head);  } | 
