/*
  This file is part of TALER
  Copyright (C) 2016--2020 Taler Systems SA
  TALER is free software; you can redistribute it and/or modify it under the
  terms of the GNU Affero General Public License as published by the Free Software
  Foundation; either version 3, or (at your option) any later version.
  TALER is distributed in the hope that it will be useful, but WITHOUT ANY
  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
  You should have received a copy of the GNU Affero General Public License along with
  TALER; see the file COPYING.  If not, see 
*/
/**
 * @file taler-exchange-wirewatch.c
 * @brief Process that watches for wire transfers to the exchange's bank account
 * @author Christian Grothoff
 */
#include "platform.h"
#include 
#include 
#include 
#include 
#include "taler_exchangedb_lib.h"
#include "taler_exchangedb_plugin.h"
#include "taler_json_lib.h"
#include "taler_bank_service.h"
#define DEBUG_LOGGING 0
/**
 * What is the initial batch size we use for credit history
 * requests with the bank.  See `batch_size` below.
 */
#define INITIAL_BATCH_SIZE 1024
/**
 * Information we keep for each supported account.
 */
struct WireAccount
{
  /**
   * Accounts are kept in a DLL.
   */
  struct WireAccount *next;
  /**
   * Plugins are kept in a DLL.
   */
  struct WireAccount *prev;
  /**
   * Name of the section that configures this account.
   */
  char *section_name;
  /**
   * Database session we are using for the current transaction.
   */
  struct TALER_EXCHANGEDB_Session *session;
  /**
   * Active request for history.
   */
  struct TALER_BANK_CreditHistoryHandle *hh;
  /**
   * Authentication data.
   */
  struct TALER_BANK_AuthenticationData auth;
  /**
   * Until when is processing this wire plugin delayed?
   */
  struct GNUNET_TIME_Absolute delayed_until;
  /**
   * Encoded offset in the wire transfer list from where
   * to start the next query with the bank.
   */
  uint64_t last_row_off;
  /**
   * Latest row offset seen in this transaction, becomes
   * the new #last_row_off upon commit.
   */
  uint64_t latest_row_off;
  /**
   * How many transactions do we retrieve per batch?
   */
  unsigned int batch_size;
  /**
   * How many transactions did we see in the current batch?
   */
  unsigned int current_batch_size;
  /**
   * Are we running from scratch and should re-process all transactions
   * for this account?
   */
  int reset_mode;
  /**
   * Should we delay the next request to the wire plugin a bit?  Set to
   * #GNUNET_NO if we actually did some work.
   */
  int delay;
};
/**
 * Head of list of loaded wire plugins.
 */
static struct WireAccount *wa_head;
/**
 * Tail of list of loaded wire plugins.
 */
static struct WireAccount *wa_tail;
/**
 * Wire account we are currently processing.  This would go away
 * if we ever start processing all accounts in parallel.
 */
static struct WireAccount *wa_pos;
/**
 * Handle to the context for interacting with the bank.
 */
static struct GNUNET_CURL_Context *ctx;
/**
 * Scheduler context for running the @e ctx.
 */
static struct GNUNET_CURL_RescheduleContext *rc;
/**
 * The exchange's configuration (global)
 */
static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
 * Our DB plugin.
 */
static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
 * How long should we sleep when idle before trying to find more work?
 */
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
 * Value to return from main(). 0 on success, non-zero on
 * on serious errors.
 */
static enum
{
  GR_SUCCESS = 0,
  GR_DATABASE_SESSION_FAIL = 1,
  GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
  GR_DATABASE_SELECT_LATEST_HARD_FAIL = 3,
  GR_BANK_REQUEST_HISTORY_FAIL = 4,
  GR_CONFIGURATION_INVALID = 5,
  GR_CMD_LINE_UTF8_ERROR = 6,
  GR_CMD_LINE_OPTIONS_WRONG = 7,
} global_ret;
/**
 * Are we run in testing mode and should only do one pass?
 */
static int test_mode;
/**
 * Are we running from scratch and should re-process all transactions?
 */
static int reset_mode;
/**
 * Current task waiting for execution, if any.
 */
static struct GNUNET_SCHEDULER_Task *task;
/**
 * We're being aborted with CTRL-C (or SIGTERM). Shut down.
 *
 * @param cls closure
 */
static void
shutdown_task (void *cls)
{
  (void) cls;
  {
    struct WireAccount *wa;
    while (NULL != (wa = wa_head))
    {
      if (NULL != wa->hh)
      {
        TALER_BANK_credit_history_cancel (wa->hh);
        wa->hh = NULL;
      }
      GNUNET_CONTAINER_DLL_remove (wa_head,
                                   wa_tail,
                                   wa);
      TALER_BANK_auth_free (&wa->auth);
      GNUNET_free (wa->section_name);
      GNUNET_free (wa);
    }
  }
  wa_pos = NULL;
  if (NULL != ctx)
  {
    GNUNET_CURL_fini (ctx);
    ctx = NULL;
  }
  if (NULL != rc)
  {
    GNUNET_CURL_gnunet_rc_destroy (rc);
    rc = NULL;
  }
  if (NULL != task)
  {
    GNUNET_SCHEDULER_cancel (task);
    task = NULL;
  }
  TALER_EXCHANGEDB_plugin_unload (db_plugin);
  db_plugin = NULL;
}
/**
 * Function called with information about a wire account.  Adds the
 * account to our list (if it is enabled and we can load the plugin).
 *
 * @param cls closure, NULL
 * @param ai account information
 */
static void
add_account_cb (void *cls,
                const struct TALER_EXCHANGEDB_AccountInfo *ai)
{
  struct WireAccount *wa;
  (void) cls;
  if (GNUNET_YES != ai->credit_enabled)
    return; /* not enabled for us, skip */
  wa = GNUNET_new (struct WireAccount);
  wa->reset_mode = reset_mode;
  if (GNUNET_OK !=
      TALER_BANK_auth_parse_cfg (cfg,
                                 ai->section_name,
                                 &wa->auth))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
                "Failed to load account `%s'\n",
                ai->section_name);
    GNUNET_free (wa);
    return;
  }
  wa->section_name = GNUNET_strdup (ai->section_name);
  wa->batch_size = INITIAL_BATCH_SIZE;
  GNUNET_CONTAINER_DLL_insert (wa_head,
                               wa_tail,
                               wa);
}
/**
 * Parse configuration parameters for the exchange server into the
 * corresponding global variables.
 *
 * @return #GNUNET_OK on success
 */
static int
exchange_serve_process_config (void)
{
  if (GNUNET_OK !=
      GNUNET_CONFIGURATION_get_value_time (cfg,
                                           "exchange",
                                           "WIREWATCH_IDLE_SLEEP_INTERVAL",
                                           &wirewatch_idle_sleep_interval))
  {
    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
                               "exchange",
                               "WIREWATCH_IDLE_SLEEP_INTERVAL");
    return GNUNET_SYSERR;
  }
  if (NULL ==
      (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
  {
    fprintf (stderr,
             "Failed to initialize DB subsystem\n");
    return GNUNET_SYSERR;
  }
  TALER_EXCHANGEDB_find_accounts (cfg,
                                  &add_account_cb,
                                  NULL);
  if (NULL == wa_head)
  {
    fprintf (stderr,
             "No wire accounts configured for credit!\n");
    TALER_EXCHANGEDB_plugin_unload (db_plugin);
    return GNUNET_SYSERR;
  }
  return GNUNET_OK;
}
/**
 * Query for incoming wire transfers.
 *
 * @param cls NULL
 */
static void
find_transfers (void *cls);
/**
 * Callbacks of this type are used to serve the result of asking
 * the bank for the transaction history.
 *
 * @param cls closure with the `struct WioreAccount *` we are processing
 * @param http_status HTTP status code from the server
 * @param ec taler error code
 * @param serial_id identification of the position at which we are querying
 * @param details details about the wire transfer
 * @param json raw JSON response
 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
 */
static int
history_cb (void *cls,
            unsigned int http_status,
            enum TALER_ErrorCode ec,
            uint64_t serial_id,
            const struct TALER_BANK_CreditDetails *details,
            const json_t *json)
{
  struct WireAccount *wa = cls;
  struct TALER_EXCHANGEDB_Session *session = wa->session;
  enum GNUNET_DB_QueryStatus qs;
  (void) json;
  if (NULL == details)
  {
    wa->hh = NULL;
    if (TALER_EC_NONE != ec)
    {
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Error fetching history: ec=%u, http_status=%u\n",
                  (unsigned int) ec,
                  http_status);
    }
    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                "End of list. Committing progress!\n");
    qs = db_plugin->commit (db_plugin->cls,
                            session);
    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    {
      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "Got DB soft error for commit\n");
      /* reduce transaction size to reduce rollback probability */
      if (2 > wa->current_batch_size)
        wa->current_batch_size /= 2;
      /* try again */
      GNUNET_assert (NULL == task);
      task = GNUNET_SCHEDULER_add_now (&find_transfers,
                                       NULL);
      return GNUNET_OK; /* will be ignored anyway */
    }
    if (0 < qs)
    {
      /* transaction success, update #last_row_off */
      wa->last_row_off = wa->latest_row_off;
      wa->latest_row_off = 0; /* should not be needed */
      wa->session = NULL; /* should not be needed */
      /* if successful at limit, try increasing transaction batch size (AIMD) */
      if ( (wa->current_batch_size == wa->batch_size) &&
           (UINT_MAX > wa->batch_size) )
        wa->batch_size++;
    }
    GNUNET_break (0 <= qs);
    if ( (GNUNET_YES == wa->delay) &&
         (test_mode) &&
         (NULL == wa->next) )
    {
      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                  "Shutdown due to test mode!\n");
      GNUNET_SCHEDULER_shutdown ();
      return GNUNET_OK;
    }
    if (GNUNET_YES == wa->delay)
    {
      wa->delayed_until
        = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
      wa_pos = wa_pos->next;
      if (NULL == wa_pos)
        wa_pos = wa_head;
      GNUNET_assert (NULL != wa_pos);
    }
    task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
                                    &find_transfers,
                                    NULL);
    return GNUNET_OK; /* will be ignored anyway */
  }
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Adding wire transfer over %s with (hashed) subject `%s'\n",
              TALER_amount2s (&details->amount),
              TALER_B2S (&details->reserve_pub));
  /**
   * Debug block.
   */
#if DEBUG_LOGGING
  {
    /** Should be 53, give 80 just to be extra conservative (and aligned).  */
#define PUBSIZE 80
    char wtid_s[PUBSIZE];
    GNUNET_break (NULL !=
                  GNUNET_STRINGS_data_to_string (&details->reserve_pub,
                                                 sizeof (details->reserve_pub),
                                                 &wtid_s[0],
                                                 PUBSIZE));
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Plain text subject (= reserve_pub): %s\n",
                wtid_s);
  }
#endif
  if (wa->current_batch_size < UINT_MAX)
    wa->current_batch_size++;
  qs = db_plugin->reserves_in_insert (db_plugin->cls,
                                      session,
                                      &details->reserve_pub,
                                      &details->amount,
                                      details->execution_date,
                                      details->debit_account_url,
                                      wa->section_name,
                                      serial_id);
  if (GNUNET_DB_STATUS_HARD_ERROR == qs)
  {
    GNUNET_break (0);
    db_plugin->rollback (db_plugin->cls,
                         session);
    GNUNET_SCHEDULER_shutdown ();
    return GNUNET_SYSERR;
  }
  if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
  {
    GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                "Got DB soft error for reserves_in_insert. Rolling back.\n");
    db_plugin->rollback (db_plugin->cls,
                         session);
    /* try again */
    GNUNET_assert (NULL == task);
    task = GNUNET_SCHEDULER_add_now (&find_transfers,
                                     NULL);
    return GNUNET_SYSERR;
  }
  wa->delay = GNUNET_NO;
  wa->latest_row_off = serial_id;
  return GNUNET_OK;
}
/**
 * Query for incoming wire transfers.
 *
 * @param cls NULL
 */
static void
find_transfers (void *cls)
{
  struct TALER_EXCHANGEDB_Session *session;
  enum GNUNET_DB_QueryStatus qs;
  (void) cls;
  task = NULL;
  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Checking for incoming wire transfers\n");
  if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to obtain database session!\n");
    global_ret = GR_DATABASE_SESSION_FAIL;
    GNUNET_SCHEDULER_shutdown ();
    return;
  }
  db_plugin->preflight (db_plugin->cls,
                        session);
  if (GNUNET_OK !=
      db_plugin->start (db_plugin->cls,
                        session,
                        "wirewatch check for incoming wire transfers"))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to start database transaction!\n");
    global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
    GNUNET_SCHEDULER_shutdown ();
    return;
  }
  if (! wa_pos->reset_mode)
  {
    qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
                                                     session,
                                                     wa_pos->section_name,
                                                     &wa_pos->last_row_off);
    if (GNUNET_DB_STATUS_HARD_ERROR == qs)
    {
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Failed to obtain starting point for montoring from database!\n");
      db_plugin->rollback (db_plugin->cls,
                           session);
      global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
      GNUNET_SCHEDULER_shutdown ();
      return;
    }
    if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
    {
      /* try again */
      db_plugin->rollback (db_plugin->cls,
                           session);
      task = GNUNET_SCHEDULER_add_now (&find_transfers,
                                       NULL);
      return;
    }
    wa_pos->reset_mode = GNUNET_NO;
  }
  wa_pos->delay = GNUNET_YES;
  wa_pos->current_batch_size = 0; /* reset counter */
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "wirewatch: requesting incoming history from %s\n",
              wa_pos->auth.wire_gateway_url);
  wa_pos->session = session;
  wa_pos->hh = TALER_BANK_credit_history (ctx,
                                          &wa_pos->auth,
                                          wa_pos->last_row_off,
                                          wa_pos->batch_size,
                                          &history_cb,
                                          wa_pos);
  if (NULL == wa_pos->hh)
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to start request for account history!\n");
    db_plugin->rollback (db_plugin->cls,
                         session);
    global_ret = GR_BANK_REQUEST_HISTORY_FAIL;
    GNUNET_SCHEDULER_shutdown ();
    return;
  }
}
/**
 * First task.
 *
 * @param cls closure, NULL
 * @param args remaining command-line arguments
 * @param cfgfile name of the configuration file used (for saving, can be NULL!)
 * @param c configuration
 */
static void
run (void *cls,
     char *const *args,
     const char *cfgfile,
     const struct GNUNET_CONFIGURATION_Handle *c)
{
  (void) cls;
  (void) args;
  (void) cfgfile;
  cfg = c;
  if (GNUNET_OK !=
      exchange_serve_process_config ())
  {
    global_ret = GR_CONFIGURATION_INVALID;
    return;
  }
  wa_pos = wa_head;
  GNUNET_assert (NULL != wa_pos);
  task = GNUNET_SCHEDULER_add_now (&find_transfers,
                                   NULL);
  GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
                                 cls);
  ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
                          &rc);
  rc = GNUNET_CURL_gnunet_rc_create (ctx);
  if (NULL == ctx)
  {
    GNUNET_break (0);
    return;
  }
}
/**
 * The main function of taler-exchange-wirewatch
 *
 * @param argc number of arguments from the command line
 * @param argv command line arguments
 * @return 0 ok, non-zero on error
 */
int
main (int argc,
      char *const *argv)
{
  struct GNUNET_GETOPT_CommandLineOption options[] = {
    GNUNET_GETOPT_option_timetravel ('T',
                                     "timetravel"),
    GNUNET_GETOPT_option_flag ('t',
                               "test",
                               "run in test mode and exit when idle",
                               &test_mode),
    GNUNET_GETOPT_option_flag ('r',
                               "reset",
                               "start fresh with all transactions in the history",
                               &reset_mode),
    GNUNET_GETOPT_OPTION_END
  };
  if (GNUNET_OK !=
      GNUNET_STRINGS_get_utf8_args (argc, argv,
                                    &argc, &argv))
    return GR_CMD_LINE_UTF8_ERROR;
  if (GNUNET_OK !=
      GNUNET_PROGRAM_run (argc, argv,
                          "taler-exchange-wirewatch",
                          gettext_noop (
                            "background process that watches for incomming wire transfers from customers"),
                          options,
                          &run, NULL))
  {
    GNUNET_free ((void *) argv);
    return GR_CMD_LINE_OPTIONS_WRONG;
  }
  GNUNET_free ((void *) argv);
  return global_ret;
}
/* end of taler-exchange-wirewatch.c */