diff options
| -rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 814 | 
1 files changed, 358 insertions, 456 deletions
| diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index d84344fc..0d902bf2 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -13,7 +13,6 @@    You should have received a copy of the GNU Affero General Public License along with    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>  */ -  /**   * @file taler-exchange-wirewatch.c   * @brief Process that watches for wire transfers to the exchange's bank account @@ -43,122 +42,88 @@  #define MAXIMUM_BATCH_SIZE 1024  /** - * Information we keep for each supported account. + * Information about our account.   */ -struct WireAccount -{ -  /** -   * Accounts are kept in a DLL. -   */ -  struct WireAccount *next; - -  /** -   * Plugins are kept in a DLL. -   */ -  struct WireAccount *prev; - -  /** -   * Information about this account. -   */ -  const struct TALER_EXCHANGEDB_AccountInfo *ai; - -  /** -   * Active request for history. -   */ -  struct TALER_BANK_CreditHistoryHandle *hh; - -  /** -   * Until when is processing this wire plugin delayed? -   */ -  struct GNUNET_TIME_Absolute delayed_until; - -  /** -   * Encoded offset in the wire transfer list from where -   * to start the next query with the bank. -   */ -  uint64_t batch_start; - -  /** -   * Latest row offset seen in this transaction, becomes -   * the new #batch_start upon commit. -   */ -  uint64_t latest_row_off; - -  /** -   * Maximum row offset this transaction may yield. If we got the -   * maximum number of rows, we must not @e delay before running -   * the next transaction. -   */ -  uint64_t max_row_off; - -  /** -   * Offset where our current shard begins (inclusive). -   */ -  uint64_t shard_start; - -  /** -   * Offset where our current shard ends (exclusive). -   */ -  uint64_t shard_end; - -  /** -   * When did we start with the shard? -   */ -  struct GNUNET_TIME_Absolute shard_start_time; - -  /** -   * For how long did we lock the shard? -   */ -  struct GNUNET_TIME_Absolute shard_end_time; - -  /** -   * How long did we take to finish the last shard -   * for this account? -   */ -  struct GNUNET_TIME_Relative shard_delay; - -  /** -   * Name of our job in the shard table. -   */ -  char *job_name; - -  /** -   * How many transactions do we retrieve per batch? -   */ -  unsigned int batch_size; - -  /** -   * How much do we increment @e batch_size on success? -   */ -  unsigned int batch_thresh; - -  /** -   * Should we delay the next request to the wire plugin a bit?  Set to -   * false if we actually did some work. -   */ -  bool delay; - -  /** -   * Did we start a transaction yet? -   */ -  bool started_transaction; - -  /** -   * Is this shard still open for processing. -   */ -  bool shard_open; -}; +static const struct TALER_EXCHANGEDB_AccountInfo *ai; + +/** + * Active request for history. + */ +static struct TALER_BANK_CreditHistoryHandle *hh; + +/** + * Until when is processing this wire plugin delayed? + */ +static struct GNUNET_TIME_Absolute delayed_until; + +/** + * Encoded offset in the wire transfer list from where + * to start the next query with the bank. + */ +static uint64_t batch_start; + +/** + * Latest row offset seen in this transaction, becomes + * the new #batch_start upon commit. + */ +static uint64_t latest_row_off; + +/** + * Offset where our current shard begins (inclusive). + */ +static uint64_t shard_start; + +/** + * Offset where our current shard ends (exclusive). + */ +static uint64_t shard_end; + +/** + * When did we start with the shard? + */ +static struct GNUNET_TIME_Absolute shard_start_time; + +/** + * For how long did we lock the shard? + */ +static struct GNUNET_TIME_Absolute shard_end_time; + +/** + * How long did we take to finish the last shard + * for this account? + */ +static struct GNUNET_TIME_Relative shard_delay; + +/** + * Name of our job in the shard table. + */ +static char *job_name; + +/** + * How many transactions do we retrieve per batch? + */ +static unsigned int batch_size; +/** + * How much do we increment @e batch_size on success? + */ +static unsigned int batch_thresh;  /** - * Head of list of loaded wire plugins. + * Did work remain in the transaction queue? Set to true + * if we did some work and thus there might be more.   */ -static struct WireAccount *wa_head; +static bool progress;  /** - * Tail of list of loaded wire plugins. + * Did we start a transaction yet?   */ -static struct WireAccount *wa_tail; +static bool started_transaction; + +/** + * Is this shard still open for processing. + */ +static bool shard_open;  /**   * Handle to the context for interacting with the bank. @@ -227,6 +192,10 @@ static int ignore_account_404;   */  static struct GNUNET_SCHEDULER_Task *task; +/** + * Name of the configuration section with the account we should watch. + */ +static char *account_section;  /**   * We're being aborted with CTRL-C (or SIGTERM). Shut down. @@ -236,38 +205,27 @@ static struct GNUNET_SCHEDULER_Task *task;  static void  shutdown_task (void *cls)  { +  enum GNUNET_DB_QueryStatus qs;    (void) cls; -  { -    struct WireAccount *wa; -    while (NULL != (wa = wa_head)) -    { -      enum GNUNET_DB_QueryStatus qs; - -      if (NULL != wa->hh) -      { -        TALER_BANK_credit_history_cancel (wa->hh); -        wa->hh = NULL; -      } -      GNUNET_CONTAINER_DLL_remove (wa_head, -                                   wa_tail, -                                   wa); -      if (wa->started_transaction) -      { -        db_plugin->rollback (db_plugin->cls); -        wa->started_transaction = false; -      } -      qs = db_plugin->abort_shard (db_plugin->cls, -                                   wa->job_name, -                                   wa->shard_start, -                                   wa->shard_end); -      if (qs <= 0) -        GNUNET_log (GNUNET_ERROR_TYPE_WARNING, -                    "Failed to abort work shard on shutdown\n"); -      GNUNET_free (wa->job_name); -      GNUNET_free (wa); -    } +  if (NULL != hh) +  { +    TALER_BANK_credit_history_cancel (hh); +    hh = NULL; +  } +  if (started_transaction) +  { +    db_plugin->rollback (db_plugin->cls); +    started_transaction = false;    } +  qs = db_plugin->abort_shard (db_plugin->cls, +                               job_name, +                               shard_start, +                               shard_end); +  if (qs <= 0) +    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +                "Failed to abort work shard on shutdown\n"); +  GNUNET_free (job_name);    if (NULL != ctx)    {      GNUNET_CURL_fini (ctx); @@ -295,28 +253,36 @@ shutdown_task (void *cls)   * account to our list (if it is enabled and we can load the plugin).   *   * @param cls closure, NULL - * @param ai account information + * @param in_ai account information   */  static void  add_account_cb (void *cls, -                const struct TALER_EXCHANGEDB_AccountInfo *ai) +                const struct TALER_EXCHANGEDB_AccountInfo *in_ai)  { -  struct WireAccount *wa; -    (void) cls; -  if (! ai->credit_enabled) +  if (! in_ai->credit_enabled) +    return; /* not enabled for us, skip */ +  if ( (NULL != account_section) && +       (0 != strcasecmp (ai->section_name, +                         account_section)) )      return; /* not enabled for us, skip */ -  wa = GNUNET_new (struct WireAccount); -  wa->ai = ai; -  GNUNET_asprintf (&wa->job_name, +  if (NULL != ai) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n", +                ai->section_name, +                in_ai->section_name); +    GNUNET_SCHEDULER_shutdown (); +    global_ret = EXIT_INVALIDARGUMENT; +    return; +  } +  ai = in_ai; +  GNUNET_asprintf (&job_name,                     "wirewatch-%s",                     ai->section_name); -  wa->batch_size = MAXIMUM_BATCH_SIZE; -  if (0 != shard_size % wa->batch_size) -    wa->batch_size = shard_size; -  GNUNET_CONTAINER_DLL_insert (wa_head, -                               wa_tail, -                               wa); +  batch_size = MAXIMUM_BATCH_SIZE; +  if (0 != shard_size % batch_size) +    batch_size = shard_size;  } @@ -360,7 +326,16 @@ exchange_serve_process_config (void)    }    TALER_EXCHANGEDB_find_accounts (&add_account_cb,                                    NULL); -  GNUNET_assert (NULL != wa_head); +  if (NULL == ai) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "No accounts enabled for credit!\n"); +    GNUNET_SCHEDULER_shutdown (); +    global_ret = EXIT_INVALIDARGUMENT; +    TALER_EXCHANGEDB_plugin_unload (db_plugin); +    db_plugin = NULL; +    return GNUNET_SYSERR; +  }    return GNUNET_OK;  } @@ -368,240 +343,111 @@ exchange_serve_process_config (void)  /**   * Lock a shard and then begin to query for incoming wire transfers.   * - * @param cls a `struct WireAccount` to operate on + * @param cls NULL   */  static void  lock_shard (void *cls);  /** - * Continue with the credit history of the shard - * reserved as @a wa. + * Continue with the credit history of the shard.   * - * @param[in,out] cls `struct WireAccount *` account with shard to continue processing + * @param cls NULL   */  static void  continue_with_shard (void *cls);  /** - * We encountered a serialization error. - * Rollback the transaction and try again - * - * @param wa account we are transacting on + * We encountered a serialization error.  Rollback the transaction and try + * again.   */  static void -handle_soft_error (struct WireAccount *wa) +handle_soft_error (void)  {    db_plugin->rollback (db_plugin->cls); -  wa->started_transaction = false; -  if (1 < wa->batch_size) +  started_transaction = false; +  if (1 < batch_size)    { -    wa->batch_thresh = wa->batch_size; -    wa->batch_size /= 2; +    batch_thresh = batch_size; +    batch_size /= 2;      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Reduced batch size to %llu due to serialization issue\n", -                (unsigned long long) wa->batch_size); +                (unsigned long long) batch_size);    }    /* Reset to beginning of transaction, and go again       from there. */ -  wa->latest_row_off = wa->batch_start; +  latest_row_off = batch_start;    GNUNET_assert (NULL == task);    task = GNUNET_SCHEDULER_add_now (&continue_with_shard, -                                   wa); +                                   NULL);  }  /** - * Schedule the #lock_shard() operation for - * @a wa. If @a wa is NULL, start with #wa_head. - * - * @param wa account to schedule #lock_shard() for, - *        possibly NULL (!). + * Schedule the #lock_shard() operation.   */  static void -schedule_transfers (struct WireAccount *wa) +schedule_transfers (void)  { -  if (NULL == wa) -  { -    wa = wa_head; -    GNUNET_assert (NULL != wa); -  } -  if (wa->shard_open) +  if (shard_open)      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Will retry my shard (%llu,%llu] of %s in %s\n", -                (unsigned long long) wa->shard_start, -                (unsigned long long) wa->shard_end, -                wa->job_name, +                (unsigned long long) shard_start, +                (unsigned long long) shard_end, +                job_name,                  GNUNET_STRINGS_relative_time_to_string ( -                  GNUNET_TIME_absolute_get_remaining (wa->delayed_until), -                  GNUNET_YES)); +                  GNUNET_TIME_absolute_get_remaining (delayed_until), +                  true));    else      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Will try to lock next shard of %s in %s\n", -                wa->job_name, +                job_name,                  GNUNET_STRINGS_relative_time_to_string ( -                  GNUNET_TIME_absolute_get_remaining (wa->delayed_until), -                  GNUNET_YES)); +                  GNUNET_TIME_absolute_get_remaining (delayed_until), +                  true));    GNUNET_assert (NULL == task); -  task = GNUNET_SCHEDULER_add_at (wa->delayed_until, +  task = GNUNET_SCHEDULER_add_at (delayed_until,                                    &lock_shard, -                                  wa); +                                  NULL);  }  /** - * We are done with the work that is possible on @a wa right now (and the - * transaction was committed, if there was one to commit). Move on to the next - * account. - * - * @param wa wire account for which we completed a shard + * We are done with the work that is possible right now (and the transaction + * was committed, if there was one to commit). Move on to the next shard.   */  static void -account_completed (struct WireAccount *wa) +transaction_completed (void)  { -  GNUNET_assert (! wa->started_transaction); -  if ( (wa->batch_start + wa->batch_size == -        wa->latest_row_off) && -       (wa->batch_size < MAXIMUM_BATCH_SIZE) ) +  GNUNET_assert (! started_transaction); +  if ( (batch_start + batch_size == +        latest_row_off) && +       (batch_size < MAXIMUM_BATCH_SIZE) )    {      /* The current batch size worked without serialization         issues, and we are allowed to grow. Do so slowly. */      int delta; -    delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; +    delta = ((int) batch_thresh - (int) batch_size) / 4;      if (delta < 0)        delta = -delta; -    wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, -                                 wa->batch_size + delta + 1); +    batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, +                             batch_size + delta + 1);      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Increasing batch size to %llu\n", -                (unsigned long long) wa->batch_size); -  } - -  if (wa->delay) -  { -    /* This account was finished, block this one for the -       #wirewatch_idle_sleep_interval and move on to the next one. */ -    wa->delayed_until -      = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); -    wa = wa->next; +                (unsigned long long) batch_size);    } -  GNUNET_assert (NULL == task); -  schedule_transfers (wa); -} - -/** - * Check if we are finished with the current shard.  If so, update the - * database, marking the shard as finished. - * - * @param wa wire account to commit for - * @return true if we were indeed done with the shard - */ -static bool -check_shard_done (struct WireAccount *wa) -{ -  enum GNUNET_DB_QueryStatus qs; - -  if (wa->shard_end > wa->latest_row_off) +  if ( (! progress) && test_mode)    { -    GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                "Shard %s (%llu,%llu] at %llu\n", -                wa->job_name, -                (unsigned long long) wa->shard_start, -                (unsigned long long) wa->shard_end, -                (unsigned long long) wa->latest_row_off); -    return false; /* actually, not done! */ -  } -  /* shard is complete, mark this as well */ -  qs = db_plugin->complete_shard (db_plugin->cls, -                                  wa->job_name, -                                  wa->shard_start, -                                  wa->shard_end); -  switch (qs) -  { -  case GNUNET_DB_STATUS_HARD_ERROR: -    GNUNET_break (0); -    db_plugin->rollback (db_plugin->cls); -    GNUNET_SCHEDULER_shutdown (); -    return false; -  case GNUNET_DB_STATUS_SOFT_ERROR: -    GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                "Got DB soft error for complete_shard. Rolling back.\n"); -    handle_soft_error (wa); -    return false; -  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: -    GNUNET_break (0); -    /* Not expected, but let's just continue */ -    break; -  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: -    /* normal case */ -    GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                "Completed shard %s (%llu,%llu] after %s\n", -                wa->job_name, -                (unsigned long long) wa->shard_start, -                (unsigned long long) wa->shard_end, -                GNUNET_STRINGS_relative_time_to_string ( -                  GNUNET_TIME_absolute_get_duration (wa->shard_start_time), -                  GNUNET_YES)); -    break; -  } -  return true; -} - - -/** - * We are finished with the current transaction, try - * to commit and then schedule the next iteration. - * - * @param wa wire account to commit for - */ -static void -do_commit (struct WireAccount *wa) -{ -  enum GNUNET_DB_QueryStatus qs; -  bool shard_done; - -  GNUNET_assert (NULL == task); -  shard_done = check_shard_done (wa); -  wa->started_transaction = false; -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, -              "Committing %s progress (%llu,%llu] at %llu\n (%s)", -              wa->job_name, -              (unsigned long long) wa->shard_start, -              (unsigned long long) wa->shard_end, -              (unsigned long long) wa->latest_row_off, -              shard_done -              ? "shard done" -              : "shard incomplete"); -  qs = db_plugin->commit (db_plugin->cls); -  switch (qs) -  { -  case GNUNET_DB_STATUS_HARD_ERROR: -    GNUNET_break (0); +    /* Transaction list was drained and we are in +       test mode. So we are done. */      GNUNET_SCHEDULER_shutdown ();      return; -  case GNUNET_DB_STATUS_SOFT_ERROR: -    /* reduce transaction size to reduce rollback probability */ -    handle_soft_error (wa); -    return; -  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: -  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: -    /* normal case */ -    break; -  } -  if (shard_done) -  { -    wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); -    wa->shard_open = false; -    account_completed (wa); -  } -  else -  { -    task = GNUNET_SCHEDULER_add_now (&continue_with_shard, -                                     wa);    } +  GNUNET_assert (NULL == task); +  schedule_transfers ();  } @@ -609,18 +455,24 @@ do_commit (struct WireAccount *wa)   * We got incoming transaction details from the bank. Add them   * to the database.   * - * @param wa wire account we are handling   * @param details array of transaction details   * @param details_length length of the @a details array - * @return true on success   */ -static bool -process_reply (struct WireAccount *wa, -               const struct TALER_BANK_CreditDetails *details, +static void +process_reply (const struct TALER_BANK_CreditDetails *details,                 unsigned int details_length)  { -  uint64_t lroff = wa->latest_row_off; +  enum GNUNET_DB_QueryStatus qs; +  bool shard_done; +  uint64_t lroff = latest_row_off; +  if (0 == details_length) +  { +    /* Server should have used 204, not 200! */ +    GNUNET_break_op (0); +    transaction_completed (); +    return; +  }    /* check serial IDs for range constraints */    for (unsigned int i = 0; i<details_length; i++)    { @@ -634,16 +486,9 @@ process_reply (struct WireAccount *wa,                    (unsigned long long) lroff);        db_plugin->rollback (db_plugin->cls);        GNUNET_SCHEDULER_shutdown (); -      wa->hh = NULL; -      return false; -    } -    if (cd->serial_id >= wa->max_row_off) -    { -      /* We got 'limit' transactions back from the bank, so we should not -         introduce any delay before the next call. */ -      wa->delay = false; +      return;      } -    if (cd->serial_id > wa->shard_end) +    if (cd->serial_id > shard_end)      {        /* we are *past* the current shard (likely because the serial_id of the           shard_end happens to not exist in the DB). So commit and stop this @@ -651,19 +496,14 @@ process_reply (struct WireAccount *wa,        GNUNET_log (GNUNET_ERROR_TYPE_INFO,                    "Serial ID %llu past shard end at %llu, ending iteration early!\n",                    (unsigned long long) cd->serial_id, -                  (unsigned long long) wa->shard_end); +                  (unsigned long long) shard_end);        details_length = i; -      wa->delay = false; +      progress = true; +      lroff = cd->serial_id - 1;        break;      }      lroff = cd->serial_id;    } -  if (0 == details_length) -  { -    /* Server should have used 204, not 200! */ -    GNUNET_break_op (0); -    return true; -  }    if (GNUNET_OK !=        db_plugin->start_read_committed (db_plugin->cls,                                         "wirewatch check for incoming wire transfers")) @@ -672,15 +512,13 @@ process_reply (struct WireAccount *wa,                  "Failed to start database transaction!\n");      global_ret = EXIT_FAILURE;      GNUNET_SCHEDULER_shutdown (); -    wa->hh = NULL; -    return false; +    return;    } -  wa->started_transaction = true; +  started_transaction = true;    for (unsigned int i = 0; i<details_length; i++)    {      const struct TALER_BANK_CreditDetails *cd = &details[i]; -    enum GNUNET_DB_QueryStatus qs;      /* FIXME #7276: Consider using Postgres multi-valued insert here,     for up to 15x speed-up according to @@ -692,23 +530,19 @@ process_reply (struct WireAccount *wa,                                          &cd->amount,                                          cd->execution_date,                                          cd->debit_account_uri, -                                        wa->ai->section_name, +                                        ai->section_name,                                          cd->serial_id);      switch (qs)      {      case GNUNET_DB_STATUS_HARD_ERROR:        GNUNET_break (0); -      db_plugin->rollback (db_plugin->cls); -      wa->started_transaction = false;        GNUNET_SCHEDULER_shutdown (); -      wa->hh = NULL; -      return false; +      return;      case GNUNET_DB_STATUS_SOFT_ERROR:        GNUNET_log (GNUNET_ERROR_TYPE_INFO,                    "Got DB soft error for reserves_in_insert. Rolling back.\n"); -      handle_soft_error (wa); -      wa->hh = NULL; -      return true; +      handle_soft_error (); +      return;      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:        /* Either wirewatch was freshly started after the system was           shutdown and we're going over an incomplete shard again @@ -720,25 +554,92 @@ process_reply (struct WireAccount *wa,                    "Attempted to import transaction %llu (%s) twice. "                    "This should happen rarely (if not, ask for support).\n",                    (unsigned long long) cd->serial_id, -                  wa->job_name); +                  job_name);        db_plugin->rollback (db_plugin->cls); -      wa->latest_row_off = cd->serial_id; -      wa->started_transaction = false; +      started_transaction = false;        /* already existed, ok, let's just continue */ -      return true; +      return;      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: -      wa->latest_row_off = cd->serial_id; +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Imported transaction %llu.", +                  (unsigned long long) cd->serial_id);        /* normal case */        break;      }    } -  do_commit (wa); -  if (check_shard_done (wa)) -    account_completed (wa); -  else -    task = GNUNET_SCHEDULER_add_now (&continue_with_shard, -                                     wa); -  return true; +  latest_row_off = lroff; +  shard_done = (shard_end <= latest_row_off); +  if (shard_done) +  { +    /* shard is complete, mark this as well */ +    qs = db_plugin->complete_shard (db_plugin->cls, +                                    job_name, +                                    shard_start, +                                    shard_end); +    switch (qs) +    { +    case GNUNET_DB_STATUS_HARD_ERROR: +      GNUNET_break (0); +      GNUNET_SCHEDULER_shutdown (); +      return; +    case GNUNET_DB_STATUS_SOFT_ERROR: +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Got DB soft error for complete_shard. Rolling back.\n"); +      handle_soft_error (); +      return; +    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +      GNUNET_break (0); +      /* Not expected, but let's just continue */ +      break; +    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +      /* normal case */ +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Completed shard %s (%llu,%llu] after %s\n", +                  job_name, +                  (unsigned long long) shard_start, +                  (unsigned long long) shard_end, +                  GNUNET_STRINGS_relative_time_to_string ( +                    GNUNET_TIME_absolute_get_duration (shard_start_time), +                    true)); +      break; +    } +  } +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Committing %s progress (%llu,%llu] at %llu\n (%s)", +              job_name, +              (unsigned long long) shard_start, +              (unsigned long long) shard_end, +              (unsigned long long) latest_row_off, +              shard_done +              ? "shard done" +              : "shard incomplete"); +  qs = db_plugin->commit (db_plugin->cls); +  switch (qs) +  { +  case GNUNET_DB_STATUS_HARD_ERROR: +    GNUNET_break (0); +    GNUNET_SCHEDULER_shutdown (); +    return; +  case GNUNET_DB_STATUS_SOFT_ERROR: +    /* reduce transaction size to reduce rollback probability */ +    handle_soft_error (); +    return; +  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +    started_transaction = false; +    /* normal case */ +    break; +  } +  if (shard_done) +  { +    shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); +    shard_open = false; +    transaction_completed (); +    return; +  } +  GNUNET_assert (NULL == task); +  task = GNUNET_SCHEDULER_add_now (&continue_with_shard, +                                   NULL);  } @@ -746,76 +647,75 @@ process_reply (struct WireAccount *wa,   * Callbacks of this type are used to serve the result of asking   * the bank for the transaction history.   * - * @param cls closure with the `struct WireAccount *` we are processing + * @param cls NULL   * @param reply response we got from the bank   */  static void  history_cb (void *cls,              const struct TALER_BANK_CreditHistoryResponse *reply)  { -  struct WireAccount *wa = cls; -  bool ok; - +  (void) cls;    GNUNET_assert (NULL == task); -  wa->hh = NULL; +  hh = NULL; +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "History request returned with HTTP status %u\n", +              reply->http_status);    switch (reply->http_status)    { -  case 0: -    ok = false;    case MHD_HTTP_OK: -    ok = process_reply (wa, -                        reply->details.success.details, -                        reply->details.success.details_length); -    break; +    process_reply (reply->details.success.details, +                   reply->details.success.details_length); +    return;    case MHD_HTTP_NO_CONTENT: -    ok = true; -    break; +    transaction_completed (); +    return;    case MHD_HTTP_NOT_FOUND: -    ok = ignore_account_404; +    if (ignore_account_404) +    { +      transaction_completed (); +      return; +    }      break;    default: -    ok = false;      break;    } - -  if (! ok) +  GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +              "Error fetching history: %s (%u)\n", +              TALER_ErrorCode_get_hint (reply->ec), +              reply->http_status); +  if (! exit_on_error)    { -    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Error fetching history: %s (%u)\n", -                TALER_ErrorCode_get_hint (reply->ec), -                reply->http_status); -    if (! (exit_on_error || test_mode) ) -    { -      account_completed (wa); -      return; -    } -    GNUNET_SCHEDULER_shutdown (); +    transaction_completed ();      return;    } +  GNUNET_SCHEDULER_shutdown ();  }  static void  continue_with_shard (void *cls)  { -  struct WireAccount *wa = cls;    unsigned int limit; +  (void) cls;    task = NULL; -  limit = GNUNET_MIN (wa->batch_size, -                      wa->shard_end - wa->latest_row_off); -  wa->max_row_off = wa->latest_row_off + limit; -  GNUNET_assert (NULL == wa->hh); -  wa->hh = TALER_BANK_credit_history (ctx, -                                      wa->ai->auth, -                                      wa->latest_row_off, -                                      limit, -                                      test_mode -                                      ? GNUNET_TIME_UNIT_ZERO -                                      : LONGPOLL_TIMEOUT, -                                      &history_cb, -                                      wa); -  if (NULL == wa->hh) +  GNUNET_assert (shard_end > latest_row_off); +  limit = GNUNET_MIN (batch_size, +                      shard_end - latest_row_off); +  GNUNET_assert (NULL == hh); +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Requesting credit history staring from %llu\n", +              (unsigned long long) latest_row_off); +  hh = TALER_BANK_credit_history (ctx, +                                  ai->auth, +                                  latest_row_off, +                                  limit, +                                  test_mode +                                  ? GNUNET_TIME_UNIT_ZERO +                                  : LONGPOLL_TIMEOUT, +                                  &history_cb, +                                  NULL); +  if (NULL == hh)    {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Failed to start request for account history!\n"); @@ -829,12 +729,12 @@ continue_with_shard (void *cls)  static void  lock_shard (void *cls)  { -  struct WireAccount *wa = cls;    enum GNUNET_DB_QueryStatus qs;    struct GNUNET_TIME_Relative delay; -  uint64_t last_shard_start = wa->shard_start; -  uint64_t last_shard_end = wa->shard_end; +  uint64_t last_shard_start = shard_start; +  uint64_t last_shard_end = shard_end; +  (void) cls;    task = NULL;    if (GNUNET_SYSERR ==        db_plugin->preflight (db_plugin->cls)) @@ -845,17 +745,16 @@ lock_shard (void *cls)      GNUNET_SCHEDULER_shutdown ();      return;    } -  if ( (wa->shard_open) && -       (GNUNET_TIME_absolute_is_future (wa->shard_end_time)) ) +  if ( (shard_open) && +       (GNUNET_TIME_absolute_is_future (shard_end_time)) )    { -    wa->delay = true; /* default is to delay, unless -                         we find out that we're really busy */ -    wa->batch_start = wa->latest_row_off; +    progress = false; +    batch_start = latest_row_off;      task = GNUNET_SCHEDULER_add_now (&continue_with_shard, -                                     wa); +                                     NULL);      return;    } -  if (wa->shard_open) +  if (shard_open)      GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Shard not completed in time, will try to re-acquire\n");    /* How long we lock a shard depends on the number of @@ -868,15 +767,15 @@ lock_shard (void *cls)        GNUNET_CRYPTO_QUALITY_WEAK,        4 * GNUNET_TIME_relative_max (          wirewatch_idle_sleep_interval, -        GNUNET_TIME_relative_multiply (wa->shard_delay, +        GNUNET_TIME_relative_multiply (shard_delay,                                         max_workers)).rel_value_us); -  wa->shard_start_time = GNUNET_TIME_absolute_get (); +  shard_start_time = GNUNET_TIME_absolute_get ();    qs = db_plugin->begin_shard (db_plugin->cls, -                               wa->job_name, +                               job_name,                                 delay,                                 shard_size, -                               &wa->shard_start, -                               &wa->shard_end); +                               &shard_start, +                               &shard_end);    switch (qs)    {    case GNUNET_DB_STATUS_HARD_ERROR: @@ -893,52 +792,51 @@ lock_shard (void *cls)        rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval);        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,                    "Serialization error tying to obtain shard %s, will try again in %s!\n", -                  wa->job_name, +                  job_name,                    GNUNET_STRINGS_relative_time_to_string (rdelay, -                                                          GNUNET_YES)); -      wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); +                                                          true)); +      delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);      }      GNUNET_assert (NULL == task); -    schedule_transfers (wa->next); +    schedule_transfers ();      return;    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:      GNUNET_break (0);      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,                  "No shard available, will try again for %s in %s!\n", -                wa->job_name, +                job_name,                  GNUNET_STRINGS_relative_time_to_string (                    wirewatch_idle_sleep_interval,                    GNUNET_YES)); -    wa->delayed_until = GNUNET_TIME_relative_to_absolute ( +    delayed_until = GNUNET_TIME_relative_to_absolute (        wirewatch_idle_sleep_interval); -    wa->shard_open = false; +    shard_open = false;      GNUNET_assert (NULL == task); -    schedule_transfers (wa->next); +    schedule_transfers ();      return;    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:      /* continued below */      break;    } -  wa->shard_end_time = GNUNET_TIME_relative_to_absolute (delay); +  shard_end_time = GNUNET_TIME_relative_to_absolute (delay);    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Starting with shard %s at (%llu,%llu] locked for %s\n", -              wa->job_name, -              (unsigned long long) wa->shard_start, -              (unsigned long long) wa->shard_end, +              job_name, +              (unsigned long long) shard_start, +              (unsigned long long) shard_end,                GNUNET_STRINGS_relative_time_to_string (delay, -                                                      GNUNET_YES)); -  wa->delay = true; /* default is to delay, unless -                       we find out that we're really busy */ -  wa->batch_start = wa->shard_start; -  if ( (wa->shard_open) && -       (wa->shard_start == last_shard_start) && -       (wa->shard_end == last_shard_end) ) -    GNUNET_break (wa->latest_row_off >= wa->batch_start); /* resume where we left things */ +                                                      true)); +  progress = false; +  batch_start = shard_start; +  if ( (shard_open) && +       (shard_start == last_shard_start) && +       (shard_end == last_shard_end) ) +    GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */    else -    wa->latest_row_off = wa->batch_start; -  wa->shard_open = true; +    latest_row_off = batch_start; +  shard_open = true;    task = GNUNET_SCHEDULER_add_now (&continue_with_shard, -                                   wa); +                                   NULL);  } @@ -961,14 +859,15 @@ run (void *cls,    (void) cfgfile;    cfg = c; +  GNUNET_SCHEDULER_add_shutdown (&shutdown_task, +                                 cls);    if (GNUNET_OK !=        exchange_serve_process_config ())    {      global_ret = EXIT_NOTCONFIGURED; +    GNUNET_SCHEDULER_shutdown ();      return;    } -  GNUNET_SCHEDULER_add_shutdown (&shutdown_task, -                                 cls);    ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,                            &rc);    if (NULL == ctx) @@ -978,9 +877,7 @@ run (void *cls,      return;    }    rc = GNUNET_CURL_gnunet_rc_create (ctx); -  GNUNET_assert (NULL == task); -  task = GNUNET_SCHEDULER_add_now (&lock_shard, -                                   wa_head); +  schedule_transfers ();  } @@ -996,6 +893,11 @@ main (int argc,        char *const *argv)  {    struct GNUNET_GETOPT_CommandLineOption options[] = { +    GNUNET_GETOPT_option_string ('a', +                                 "account", +                                 "SECTION_NAME", +                                 "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)", +                                 &account_section),      GNUNET_GETOPT_option_flag ('e',                                 "exit-on-error",                                 "terminate wirewatch if we failed to download information from the bank", | 
