diff options
Diffstat (limited to 'src/exchange')
| -rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 191 | 
1 files changed, 189 insertions, 2 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 47ecba68..d7eaa7e0 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -657,6 +657,185 @@ process_reply (const struct TALER_BANK_CreditDetails *details,  /** + * We got incoming transaction details from the bank. Add them + * to the database. + * + * @param details array of transaction details + * @param details_length length of the @a details array + */ +static void +process_reply_batched (const struct TALER_BANK_CreditDetails *details, +                       unsigned int details_length) +{ +  enum GNUNET_DB_QueryStatus qs; +  bool shard_done; +  uint64_t lroff = latest_row_off; + +  if (0 == details_length) +  { +    /* Server should have used 204, not 200! */ +    GNUNET_break_op (0); +    transaction_completed (); +    return; +  } +  /* check serial IDs for range constraints */ +  for (unsigned int i = 0; i<details_length; i++) +  { +    const struct TALER_BANK_CreditDetails *cd = &details[i]; + +    if (cd->serial_id < lroff) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                  "Serial ID %llu not monotonic (got %llu before). Failing!\n", +                  (unsigned long long) cd->serial_id, +                  (unsigned long long) lroff); +      db_plugin->rollback (db_plugin->cls); +      GNUNET_SCHEDULER_shutdown (); +      return; +    } +    if (cd->serial_id > shard_end) +    { +      /* we are *past* the current shard (likely because the serial_id of the +         shard_end happens to not exist in the DB). So commit and stop this +         iteration! */ +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Serial ID %llu past shard end at %llu, ending iteration early!\n", +                  (unsigned long long) cd->serial_id, +                  (unsigned long long) shard_end); +      details_length = i; +      progress = true; +      lroff = cd->serial_id - 1; +      break; +    } +    lroff = cd->serial_id; +  } +  if (0 != details_length) +  { +    enum GNUNET_DB_QueryStatus qss[details_length]; +    struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length]; + +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Importing %u transactions\n", +                details_length); +    for (unsigned int i = 0; i<details_length; i++) +    { +      const struct TALER_BANK_CreditDetails *cd = &details[i]; +      struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i]; + +      res->reserve_pub = &cd->reserve_pub; +      res->balance = &cd->amount; +      res->execution_time = cd->execution_date; +      res->sender_account_details = cd->debit_account_uri; +      res->exchange_account_name = ai->section_name; +      res->wire_reference = cd->serial_id; +    } +    qs = db_plugin->batch_reserves_in_insert (db_plugin->cls, +                                              reserves, +                                              details_length, +                                              qss); +    switch (qs) +    { +    case GNUNET_DB_STATUS_HARD_ERROR: +      GNUNET_break (0); +      GNUNET_SCHEDULER_shutdown (); +      return; +    case GNUNET_DB_STATUS_SOFT_ERROR: +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Got DB soft error for batch_reserves_in_insert. Rolling back.\n"); +      handle_soft_error (); +      return; +    default: +      break; +    } +    for (unsigned int i = 0; i<details_length; i++) +    { +      const struct TALER_BANK_CreditDetails *cd = &details[i]; + +      switch (qss[i]) +      { +      case GNUNET_DB_STATUS_HARD_ERROR: +        GNUNET_break (0); +        GNUNET_SCHEDULER_shutdown (); +        return; +      case GNUNET_DB_STATUS_SOFT_ERROR: +        GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                    "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n", +                    i); +        handle_soft_error (); +        return; +      case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +        /* Either wirewatch was freshly started after the system was +           shutdown and we're going over an incomplete shard again +           after being restarted, or the shard lock period was too +           short (number of workers set incorrectly?) and a 2nd +           wirewatcher has been stealing our work while we are still +           at it. */ +        GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                    "Attempted to import transaction %llu (%s) twice. " +                    "This should happen rarely (if not, ask for support).\n", +                    (unsigned long long) cd->serial_id, +                    job_name); +        break; +      case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +        GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                    "Imported transaction %llu.", +                    (unsigned long long) cd->serial_id); +        /* normal case */ +        progress = true; +        break; +      } +    } +  } + +  latest_row_off = lroff; +  shard_done = (shard_end <= latest_row_off); +  if (shard_done) +  { +    /* shard is complete, mark this as well */ +    qs = db_plugin->complete_shard (db_plugin->cls, +                                    job_name, +                                    shard_start, +                                    shard_end); +    switch (qs) +    { +    case GNUNET_DB_STATUS_HARD_ERROR: +      GNUNET_break (0); +      GNUNET_SCHEDULER_shutdown (); +      return; +    case GNUNET_DB_STATUS_SOFT_ERROR: +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Got DB soft error for complete_shard. Rolling back.\n"); +      handle_soft_error (); +      return; +    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +      GNUNET_break (0); +      /* Not expected, but let's just continue */ +      break; +    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +      /* normal case */ +      progress = true; +      GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                  "Completed shard %s (%llu,%llu] after %s\n", +                  job_name, +                  (unsigned long long) shard_start, +                  (unsigned long long) shard_end, +                  GNUNET_STRINGS_relative_time_to_string ( +                    GNUNET_TIME_absolute_get_duration (shard_start_time), +                    true)); +      break; +    } +    shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); +    shard_open = false; +    transaction_completed (); +    return; +  } +  GNUNET_assert (NULL == task); +  task = GNUNET_SCHEDULER_add_now (&continue_with_shard, +                                   NULL); +} + + +/**   * Callbacks of this type are used to serve the result of asking   * the bank for the transaction history.   * @@ -667,7 +846,11 @@ static void  history_cb (void *cls,              const struct TALER_BANK_CreditHistoryResponse *reply)  { +  static int batch_mode = -1; +    (void) cls; +  if (-1 == batch_mode) +    batch_mode = (NULL != getenv ("TALER_USE_BATCH"));    GNUNET_assert (NULL == task);    hh = NULL;    GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -676,8 +859,12 @@ history_cb (void *cls,    switch (reply->http_status)    {    case MHD_HTTP_OK: -    process_reply (reply->details.success.details, -                   reply->details.success.details_length); +    if (0 == batch_mode) +      process_reply (reply->details.success.details, +                     reply->details.success.details_length); +    else +      process_reply_batched (reply->details.success.details, +                             reply->details.success.details_length);      return;    case MHD_HTTP_NO_CONTENT:      transaction_completed ();  | 
