/*
  This file is part of TALER
  Copyright (C) 2016-2020 Taler Systems SA
  TALER is free software; you can redistribute it and/or modify it under the
  terms of the GNU Affero General Public License as published by the Free Software
  Foundation; either version 3, or (at your option) any later version.
  TALER is distributed in the hope that it will be useful, but WITHOUT ANY
  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
  You should have received a copy of the GNU Affero General Public License along with
  TALER; see the file COPYING.  If not, see 
*/
/**
 * @file taler-exchange-aggregator.c
 * @brief Process that aggregates outgoing transactions and prepares their execution
 * @author Christian Grothoff
 */
#include "platform.h"
#include 
#include 
#include 
#include "taler_exchangedb_lib.h"
#include "taler_exchangedb_plugin.h"
#include "taler_json_lib.h"
#include "taler_bank_service.h"
/**
 * Information about one aggregation process to be executed.  There is
 * at most one of these around at any given point in time.
 * Note that this limits parallelism, and we might want
 * to revise this decision at a later point.
 */
struct AggregationUnit
{
  /**
   * Public key of the merchant.
   */
  struct TALER_MerchantPublicKeyP merchant_pub;
  /**
   * Total amount to be transferred, before subtraction of @e wire_fee and rounding down.
   */
  struct TALER_Amount total_amount;
  /**
   * Final amount to be transferred (after fee and rounding down).
   */
  struct TALER_Amount final_amount;
  /**
   * Wire fee we charge for @e wp at @e execution_time.
   */
  struct TALER_Amount wire_fee;
  /**
   * Hash of @e wire.
   */
  struct GNUNET_HashCode h_wire;
  /**
   * Wire transfer identifier we use.
   */
  struct TALER_WireTransferIdentifierRawP wtid;
  /**
   * Row ID of the transaction that started it all.
   */
  uint64_t row_id;
  /**
   * The current time (which triggered the aggregation and
   * defines the wire fee).
   */
  struct GNUNET_TIME_Absolute execution_time;
  /**
   * Wire details of the merchant.
   */
  json_t *wire;
  /**
   * Exchange wire account to be used for the preparation and
   * eventual execution of the aggregate wire transfer.
   */
  struct TALER_EXCHANGEDB_WireAccount *wa;
  /**
   * Database session for all of our transactions.
   */
  struct TALER_EXCHANGEDB_Session *session;
  /**
   * Wire preparation handle.
   */
  struct TALER_BANK_PrepareHandle *ph;
  /**
   * Array of row_ids from the aggregation.
   */
  uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
  /**
   * Offset specifying how many @e additional_rows are in use.
   */
  unsigned int rows_offset;
  /**
   * Set to #GNUNET_YES if we have to abort due to failure.
   */
  int failed;
  /**
   * Set to #GNUNET_YES if we encountered a refund during #refund_by_coin_cb.
   * Used to wave the deposit fee.
   */
  int have_refund;
};
/**
 * What is the smallest unit we support for wire transfers?
 * We will need to round down to a multiple of this amount.
 */
static struct TALER_Amount currency_round_unit;
/**
 * What is the base URL of this exchange?  Used in the
 * wire transfer subjects to that merchants and governments
 * can ask for the list of aggregated deposits.
 */
static char *exchange_base_url;
/**
 * The exchange's configuration.
 */
static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
 * Our database plugin.
 */
static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
 * Next task to run, if any.
 */
static struct GNUNET_SCHEDULER_Task *task;
/**
 * How long should we sleep when idle before trying to find more work?
 */
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
/**
 * Value to return from main(). 0 on success, non-zero on erorrs.
 */
static enum
{
  GR_SUCCESS = 0,
  GR_DATABASE_SESSION_FAIL = 1,
  GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
  GR_DATABASE_READY_DEPOSIT_HARD_FAIL = 3,
  GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL = 4,
  GR_DATABASE_TINY_MARK_HARD_FAIL = 5,
  GR_DATABASE_PREPARE_HARD_FAIL = 6,
  GR_DATABASE_PREPARE_COMMIT_HARD_FAIL = 7,
  GR_INVARIANT_FAILURE = 8,
  GR_CONFIGURATION_INVALID = 9,
  GR_CMD_LINE_UTF8_ERROR = 9,
  GR_CMD_LINE_OPTIONS_WRONG = 10,
} global_ret;
/**
 * #GNUNET_YES if we are in test mode and should exit when idle.
 */
static int test_mode;
/**
 * Main work function that queries the DB and aggregates transactions
 * into larger wire transfers.
 *
 * @param cls NULL
 */
static void
run_aggregation (void *cls);
/**
 * Free data stored in @a au, but not @a au itself (stack allocated).
 *
 * @param au aggreation unit to clean up
 */
static void
cleanup_au (struct AggregationUnit *au)
{
  GNUNET_assert (NULL != au);
  if (NULL != au->wire)
    json_decref (au->wire);
  memset (au,
          0,
          sizeof (*au));
}
/**
 * We're being aborted with CTRL-C (or SIGTERM). Shut down.
 *
 * @param cls closure
 */
static void
shutdown_task (void *cls)
{
  (void) cls;
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Running shutdown\n");
  if (NULL != task)
  {
    GNUNET_SCHEDULER_cancel (task);
    task = NULL;
  }
  TALER_EXCHANGEDB_plugin_unload (db_plugin);
  db_plugin = NULL;
  TALER_EXCHANGEDB_unload_accounts ();
  cfg = NULL;
}
/**
 * Parse the configuration for wirewatch.
 *
 * @return #GNUNET_OK on success
 */
static int
parse_wirewatch_config (void)
{
  if (GNUNET_OK !=
      GNUNET_CONFIGURATION_get_value_string (cfg,
                                             "exchange",
                                             "BASE_URL",
                                             &exchange_base_url))
  {
    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
                               "exchange",
                               "BASE_URL");
    return GNUNET_SYSERR;
  }
  if (GNUNET_OK !=
      GNUNET_CONFIGURATION_get_value_time (cfg,
                                           "exchange",
                                           "AGGREGATOR_IDLE_SLEEP_INTERVAL",
                                           &aggregator_idle_sleep_interval))
  {
    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
                               "exchange",
                               "AGGREGATOR_IDLE_SLEEP_INTERVAL");
    return GNUNET_SYSERR;
  }
  if ( (GNUNET_OK !=
        TALER_config_get_amount (cfg,
                                 "taler",
                                 "CURRENCY_ROUND_UNIT",
                                 ¤cy_round_unit)) ||
       ( (0 != currency_round_unit.fraction) &&
         (0 != currency_round_unit.value) ) )
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Invalid value specified in section `TALER' under `CURRENCY_ROUND_UNIT'\n");
    return GNUNET_SYSERR;
  }
  if (NULL ==
      (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to initialize DB subsystem\n");
    return GNUNET_SYSERR;
  }
  if (GNUNET_OK !=
      TALER_EXCHANGEDB_load_accounts (cfg))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "No wire accounts configured for debit!\n");
    TALER_EXCHANGEDB_plugin_unload (db_plugin);
    db_plugin = NULL;
    return GNUNET_SYSERR;
  }
  return GNUNET_OK;
}
/**
 * Callback invoked with information about refunds applicable
 * to a particular coin.  Subtract refunded amount(s) from
 * the aggregation unit's total amount.
 *
 * @param cls closure with a `struct AggregationUnit *`
 * @param amount_with_fee what was the refunded amount with the fee
 * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
 */
static int
refund_by_coin_cb (void *cls,
                   const struct TALER_Amount *amount_with_fee)
{
  struct AggregationUnit *aux = cls;
  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Aggregator subtracts applicable refund of amount %s\n",
              TALER_amount2s (amount_with_fee));
  aux->have_refund = GNUNET_YES;
  if (GNUNET_OK !=
      TALER_amount_subtract (&aux->total_amount,
                             &aux->total_amount,
                             amount_with_fee))
  {
    GNUNET_break (0);
    return GNUNET_SYSERR;
  }
  return GNUNET_OK;
}
/**
 * Function called with details about deposits that have been made,
 * with the goal of executing the corresponding wire transaction.
 *
 * @param cls a `struct AggregationUnit`
 * @param row_id identifies database entry
 * @param merchant_pub public key of the merchant
 * @param coin_pub public key of the coin
 * @param amount_with_fee amount that was deposited including fee
 * @param deposit_fee amount the exchange gets to keep as transaction fees
 * @param h_contract_terms hash of the proposal data known to merchant and customer
 * @param wire_deadline by which the merchant adviced that he would like the
 *        wire transfer to be executed
 * @param wire wire details for the merchant
 * @return transaction status code,  #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
 */
static enum GNUNET_DB_QueryStatus
deposit_cb (void *cls,
            uint64_t row_id,
            const struct TALER_MerchantPublicKeyP *merchant_pub,
            const struct TALER_CoinSpendPublicKeyP *coin_pub,
            const struct TALER_Amount *amount_with_fee,
            const struct TALER_Amount *deposit_fee,
            const struct GNUNET_HashCode *h_contract_terms,
            struct GNUNET_TIME_Absolute wire_deadline,
            const json_t *wire)
{
  struct AggregationUnit *au = cls;
  enum GNUNET_DB_QueryStatus qs;
  (void) cls;
  /* NOTE: potential optimization: use custom SQL API to not
     fetch this one: */
  (void) wire_deadline; /* already checked by SQL query */
  au->merchant_pub = *merchant_pub;
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Aggregator processing payment %s with amount %s\n",
              TALER_B2S (coin_pub),
              TALER_amount2s (amount_with_fee));
  au->row_id = row_id;
  au->total_amount = *amount_with_fee;
  au->have_refund = GNUNET_NO;
  qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
                                          au->session,
                                          coin_pub,
                                          &au->merchant_pub,
                                          h_contract_terms,
                                          &refund_by_coin_cb,
                                          au);
  if (0 > qs)
  {
    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    return qs;
  }
  if (GNUNET_NO == au->have_refund)
  {
    struct TALER_Amount ntotal;
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Non-refunded transaction, subtracting deposit fee %s\n",
                TALER_amount2s (deposit_fee));
    if (GNUNET_SYSERR ==
        TALER_amount_subtract (&ntotal,
                               amount_with_fee,
                               deposit_fee))
    {
      /* This should never happen, issue a warning, but continue processing
         with an amount of zero, least we hang here for good. */
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n",
                  (unsigned long long) row_id,
                  TALER_amount2s (amount_with_fee));
      GNUNET_assert (GNUNET_OK ==
                     TALER_amount_get_zero (au->total_amount.currency,
                                            &au->total_amount));
    }
    else
    {
      au->total_amount = ntotal;
    }
  }
  GNUNET_assert (NULL == au->wire);
  if (NULL == (au->wire = json_incref ((json_t *) wire)))
  {
    GNUNET_break (0);
    return GNUNET_DB_STATUS_HARD_ERROR;
  }
  if (GNUNET_OK !=
      TALER_JSON_merchant_wire_signature_hash (wire,
                                               &au->h_wire))
  {
    GNUNET_break (0);
    json_decref (au->wire);
    au->wire = NULL;
    return GNUNET_DB_STATUS_HARD_ERROR;
  }
  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
                              &au->wtid,
                              sizeof (au->wtid));
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n",
              TALER_B2S (&au->wtid),
              TALER_amount2s (amount_with_fee),
              (unsigned long long) row_id);
  {
    char *url;
    url = TALER_JSON_wire_to_payto (au->wire);
    au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url);
    if (NULL == au->wa)
    {
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "No exchange account configured for `%s', please fix your setup to continue!\n",
                  url);
      GNUNET_free (url);
      return GNUNET_DB_STATUS_HARD_ERROR;
    }
    GNUNET_free (url);
  }
  /* make sure we have current fees */
  au->execution_time = GNUNET_TIME_absolute_get ();
  (void) GNUNET_TIME_round_abs (&au->execution_time);
  {
    struct TALER_EXCHANGEDB_AggregateFees *af;
    af = TALER_EXCHANGEDB_update_fees (cfg,
                                       db_plugin,
                                       au->wa,
                                       au->execution_time,
                                       au->session);
    if (NULL == af)
    {
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Could not get or persist wire fees for %s. Aborting run.\n",
                  GNUNET_STRINGS_absolute_time_to_string (au->execution_time));
      return GNUNET_DB_STATUS_HARD_ERROR;
    }
    au->wire_fee = af->wire_fee;
  }
  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n",
              (unsigned long long) row_id,
              TALER_B2S (&au->wtid),
              TALER_amount2s (&au->wire_fee));
  qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
                                               au->session,
                                               &au->wtid,
                                               row_id);
  if (qs <= 0)
  {
    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    return qs;
  }
  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Aggregator marks deposit %llu as done\n",
              (unsigned long long) row_id);
  qs = db_plugin->mark_deposit_done (db_plugin->cls,
                                     au->session,
                                     row_id);
  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
  {
    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    return qs;
  }
  return qs;
}
/**
 * Function called with details about another deposit we
 * can aggregate into an existing aggregation unit.
 *
 * @param cls a `struct AggregationUnit`
 * @param row_id identifies database entry
 * @param merchant_pub public key of the merchant
 * @param coin_pub public key of the coin
 * @param amount_with_fee amount that was deposited including fee
 * @param deposit_fee amount the exchange gets to keep as transaction fees
 * @param h_contract_terms hash of the proposal data known to merchant and customer
 * @param wire_deadline by which the merchant adviced that he would like the
 *        wire transfer to be executed
 * @param wire wire details for the merchant
 * @return transaction status code
 */
static enum GNUNET_DB_QueryStatus
aggregate_cb (void *cls,
              uint64_t row_id,
              const struct TALER_MerchantPublicKeyP *merchant_pub,
              const struct TALER_CoinSpendPublicKeyP *coin_pub,
              const struct TALER_Amount *amount_with_fee,
              const struct TALER_Amount *deposit_fee,
              const struct GNUNET_HashCode *h_contract_terms,
              struct GNUNET_TIME_Absolute wire_deadline,
              const json_t *wire)
{
  struct AggregationUnit *au = cls;
  struct TALER_Amount old;
  enum GNUNET_DB_QueryStatus qs;
  /* NOTE: potential optimization: use custom SQL API to not
     fetch these: */
  (void) wire_deadline; /* checked by SQL */
  (void) wire; /* must match */
  GNUNET_break (0 == GNUNET_memcmp (&au->merchant_pub,
                                    merchant_pub));
  if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
  {
    /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */
    GNUNET_break (0);
    /* Skip this one, but keep going with the overall transaction */
    return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
  }
  /* add to total */
  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Adding transaction amount %s from row %llu to aggregation\n",
              TALER_amount2s (amount_with_fee),
              (unsigned long long) row_id);
  /* save the existing total aggregate in 'old', for later */
  old = au->total_amount;
  /* we begin with the total contribution of the current coin */
  au->total_amount = *amount_with_fee;
  /* compute contribution of this coin (after fees) */
  au->have_refund = GNUNET_NO;
  qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
                                          au->session,
                                          coin_pub,
                                          &au->merchant_pub,
                                          h_contract_terms,
                                          &refund_by_coin_cb,
                                          au);
  if (0 > qs)
  {
    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    return qs;
  }
  if (GNUNET_NO == au->have_refund)
  {
    struct TALER_Amount tmp;
    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Subtracting deposit fee %s for non-refunded coin\n",
                TALER_amount2s (deposit_fee));
    if (GNUNET_SYSERR ==
        TALER_amount_subtract (&tmp,
                               &au->total_amount,
                               deposit_fee))
    {
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n",
                  (unsigned long long) row_id,
                  TALER_amount2s (&au->total_amount));
      GNUNET_assert (GNUNET_OK ==
                     TALER_amount_get_zero (old.currency,
                                            &au->total_amount));
    }
    else
    {
      au->total_amount = tmp;
    }
  }
  /* now add the au->total_amount with the (remaining) contribution of
     the current coin to the 'old' value with the current aggregate value */
  {
    struct TALER_Amount tmp;
    if (GNUNET_OK !=
        TALER_amount_add (&tmp,
                          &au->total_amount,
                          &old))
    {
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Overflow or currency incompatibility during aggregation at %llu\n",
                  (unsigned long long) row_id);
      /* Skip this one, but keep going! */
      au->total_amount = old;
      return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    }
    au->total_amount = tmp;
  }
  /* "append" to our list of rows */
  au->additional_rows[au->rows_offset++] = row_id;
  /* insert into aggregation tracking table */
  qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
                                               au->session,
                                               &au->wtid,
                                               row_id);
  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
  {
    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    return qs;
  }
  qs = db_plugin->mark_deposit_done (db_plugin->cls,
                                     au->session,
                                     row_id);
  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
  {
    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    return qs;
  }
  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Aggregator marked deposit %llu as DONE\n",
              (unsigned long long) row_id);
  return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
/**
 * Perform a database commit. If it fails, print a warning.
 *
 * @param session session to perform the commit for.
 * @return status of commit
 */
static enum GNUNET_DB_QueryStatus
commit_or_warn (struct TALER_EXCHANGEDB_Session *session)
{
  enum GNUNET_DB_QueryStatus qs;
  qs = db_plugin->commit (db_plugin->cls,
                          session);
  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
    return qs;
  GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
              ? GNUNET_ERROR_TYPE_INFO
              : GNUNET_ERROR_TYPE_ERROR,
              "Failed to commit database transaction!\n");
  return qs;
}
/**
 * Main work function that queries the DB and aggregates transactions
 * into larger wire transfers.
 *
 * @param cls NULL
 */
static void
run_aggregation (void *cls)
{
  struct AggregationUnit au_active;
  struct TALER_EXCHANGEDB_Session *session;
  enum GNUNET_DB_QueryStatus qs;
  (void) cls;
  task = NULL;
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Checking for ready deposits to aggregate\n");
  if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to obtain database session!\n");
    global_ret = GR_DATABASE_SESSION_FAIL;
    GNUNET_SCHEDULER_shutdown ();
    return;
  }
  if (GNUNET_OK !=
      db_plugin->start_deferred_wire_out (db_plugin->cls,
                                          session))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to start database transaction!\n");
    global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
    GNUNET_SCHEDULER_shutdown ();
    return;
  }
  memset (&au_active,
          0,
          sizeof (au_active));
  au_active.session = session;
  qs = db_plugin->get_ready_deposit (db_plugin->cls,
                                     session,
                                     &deposit_cb,
                                     &au_active);
  if (0 >= qs)
  {
    cleanup_au (&au_active);
    db_plugin->rollback (db_plugin->cls,
                         session);
    if (GNUNET_DB_STATUS_HARD_ERROR == qs)
    {
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Failed to execute deposit iteration!\n");
      global_ret = GR_DATABASE_READY_DEPOSIT_HARD_FAIL;
      GNUNET_SCHEDULER_shutdown ();
      return;
    }
    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    {
      /* should re-try immediately */
      GNUNET_assert (NULL == task);
      task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                       NULL);
      return;
    }
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "No more ready deposits, going to sleep\n");
    if (GNUNET_YES == test_mode)
    {
      /* in test mode, shutdown if we end up being idle */
      GNUNET_SCHEDULER_shutdown ();
    }
    else
    {
      /* nothing to do, sleep for a minute and try again */
      GNUNET_assert (NULL == task);
      task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
                                           &run_aggregation,
                                           NULL);
    }
    return;
  }
  /* Now try to find other deposits to aggregate */
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Found ready deposit for %s, aggregating\n",
              TALER_B2S (&au_active.merchant_pub));
  qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
                                             session,
                                             &au_active.h_wire,
                                             &au_active.merchant_pub,
                                             &aggregate_cb,
                                             &au_active,
                                             TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT);
  if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) ||
       (GNUNET_YES == au_active.failed) )
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to execute deposit iteration!\n");
    cleanup_au (&au_active);
    db_plugin->rollback (db_plugin->cls,
                         session);
    global_ret = GR_DATABASE_ITERATE_DEPOSIT_HARD_FAIL;
    GNUNET_SCHEDULER_shutdown ();
    return;
  }
  if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
  {
    /* serializiability issue, try again */
    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Serialization issue, trying again later!\n");
    db_plugin->rollback (db_plugin->cls,
                         session);
    cleanup_au (&au_active);
    GNUNET_assert (NULL == task);
    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                     NULL);
    return;
  }
  /* Subtract wire transfer fee and round to the unit supported by the
     wire transfer method; Check if after rounding down, we still have
     an amount to transfer, and if not mark as 'tiny'. */
  if ( (GNUNET_OK !=
        TALER_amount_subtract (&au_active.final_amount,
                               &au_active.total_amount,
                               &au_active.wire_fee)) ||
       (GNUNET_SYSERR ==
        TALER_amount_round_down (&au_active.final_amount,
                                 ¤cy_round_unit)) ||
       ( (0 == au_active.final_amount.value) &&
         (0 == au_active.final_amount.fraction) ) )
  {
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Aggregate value too low for transfer (%d/%s)\n",
                qs,
                TALER_amount2s (&au_active.final_amount));
    /* Rollback ongoing transaction, as we will not use the respective
       WTID and thus need to remove the tracking data */
    db_plugin->rollback (db_plugin->cls,
                         session);
    /* There were results, just the value was too low.  Start another
       transaction to mark all* of the selected deposits as minor! */
    if (GNUNET_OK !=
        db_plugin->start (db_plugin->cls,
                          session,
                          "aggregator mark tiny transactions"))
    {
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Failed to start database transaction!\n");
      global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
      cleanup_au (&au_active);
      GNUNET_SCHEDULER_shutdown ();
      return;
    }
    /* Mark transactions by row_id as minor */
    qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
                                       session,
                                       au_active.row_id);
    if (0 <= qs)
    {
      for (unsigned int i = 0; imark_deposit_tiny (db_plugin->cls,
                                           session,
                                           au_active.additional_rows[i]);
        if (0 > qs)
          break;
      }
    }
    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    {
      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Serialization issue, trying again later!\n");
      db_plugin->rollback (db_plugin->cls,
                           session);
      cleanup_au (&au_active);
      /* start again */
      GNUNET_assert (NULL == task);
      task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                       NULL);
      return;
    }
    if (GNUNET_DB_STATUS_HARD_ERROR == qs)
    {
      db_plugin->rollback (db_plugin->cls,
                           session);
      cleanup_au (&au_active);
      global_ret = GR_DATABASE_TINY_MARK_HARD_FAIL;
      GNUNET_SCHEDULER_shutdown ();
      return;
    }
    /* commit */
    (void) commit_or_warn (session);
    cleanup_au (&au_active);
    /* start again */
    GNUNET_assert (NULL == task);
    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                     NULL);
    return;
  }
  {
    char *amount_s;
    amount_s = TALER_amount_to_string (&au_active.final_amount);
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Preparing wire transfer of %s to %s\n",
                amount_s,
                TALER_B2S (&au_active.merchant_pub));
    GNUNET_free (amount_s);
  }
  {
    void *buf;
    size_t buf_size;
    {
      char *url;
      url = TALER_JSON_wire_to_payto (au_active.wire);
      TALER_BANK_prepare_transfer (url,
                                   &au_active.final_amount,
                                   exchange_base_url,
                                   &au_active.wtid,
                                   &buf,
                                   &buf_size);
      GNUNET_free (url);
    }
    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Storing %u bytes of wire prepare data\n",
                (unsigned int) buf_size);
    /* Commit our intention to execute the wire transfer! */
    qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
                                              session,
                                              au_active.wa->method,
                                              buf,
                                              buf_size);
    GNUNET_free (buf);
  }
  /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
     table constraints */
  if (qs >= 0)
    qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
                                             session,
                                             au_active.execution_time,
                                             &au_active.wtid,
                                             au_active.wire,
                                             au_active.wa->section_name,
                                             &au_active.final_amount);
  cleanup_au (&au_active);
  if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
  {
    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Serialization issue for prepared wire data; trying again later!\n");
    db_plugin->rollback (db_plugin->cls,
                         session);
    /* start again */
    GNUNET_assert (NULL == task);
    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                     NULL);
    return;
  }
  if (GNUNET_DB_STATUS_HARD_ERROR == qs)
  {
    GNUNET_break (0);
    db_plugin->rollback (db_plugin->cls,
                         session);
    /* die hard */
    global_ret = GR_DATABASE_PREPARE_HARD_FAIL;
    GNUNET_SCHEDULER_shutdown ();
    return;
  }
  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Stored wire transfer out instructions\n");
  /* Now we can finally commit the overall transaction, as we are
     again consistent if all of this passes. */
  switch (commit_or_warn (session))
  {
  case GNUNET_DB_STATUS_SOFT_ERROR:
    /* try again */
    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "Commit issue for prepared wire data; trying again later!\n");
    GNUNET_assert (NULL == task);
    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                     NULL);
    return;
  case GNUNET_DB_STATUS_HARD_ERROR:
    GNUNET_break (0);
    global_ret = GR_DATABASE_PREPARE_COMMIT_HARD_FAIL;
    GNUNET_SCHEDULER_shutdown ();
    return;
  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Preparation complete, going again\n");
    GNUNET_assert (NULL == task);
    task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                     NULL);
    return;
  default:
    GNUNET_break (0);
    global_ret = GR_INVARIANT_FAILURE;
    GNUNET_SCHEDULER_shutdown ();
    return;
  }
}
/**
 * First task.
 *
 * @param cls closure, NULL
 * @param args remaining command-line arguments
 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
 * @param c configuration
 */
static void
run (void *cls,
     char *const *args,
     const char *cfgfile,
     const struct GNUNET_CONFIGURATION_Handle *c)
{
  (void) cls;
  (void) args;
  (void) cfgfile;
  cfg = c;
  if (GNUNET_OK != parse_wirewatch_config ())
  {
    cfg = NULL;
    global_ret = GR_CONFIGURATION_INVALID;
    return;
  }
  GNUNET_assert (NULL == task);
  task = GNUNET_SCHEDULER_add_now (&run_aggregation,
                                   NULL);
  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                 cls);
}
/**
 * The main function of the taler-exchange-aggregator.
 *
 * @param argc number of arguments from the command line
 * @param argv command line arguments
 * @return 0 ok, non-zero on error, see #global_ret
 */
int
main (int argc,
      char *const *argv)
{
  struct GNUNET_GETOPT_CommandLineOption options[] = {
    GNUNET_GETOPT_option_timetravel ('T',
                                     "timetravel"),
    GNUNET_GETOPT_option_flag ('t',
                               "test",
                               "run in test mode and exit when idle",
                               &test_mode),
    GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
    GNUNET_GETOPT_OPTION_END
  };
  if (GNUNET_OK !=
      GNUNET_STRINGS_get_utf8_args (argc, argv,
                                    &argc, &argv))
    return GR_CMD_LINE_UTF8_ERROR;
  if (GNUNET_OK !=
      GNUNET_PROGRAM_run (argc, argv,
                          "taler-exchange-aggregator",
                          gettext_noop (
                            "background process that aggregates and executes wire transfers"),
                          options,
                          &run, NULL))
  {
    GNUNET_free ((void *) argv);
    return GR_CMD_LINE_OPTIONS_WRONG;
  }
  GNUNET_free ((void *) argv);
  return global_ret;
}
/* end of taler-exchange-aggregator.c */