/*
  This file is part of TALER
  Copyright (C) 2020-2022 Taler Systems SA
  TALER is free software; you can redistribute it and/or modify it under the
  terms of the GNU General Public License as published by the Free Software
  Foundation; either version 3, or (at your option) any later version.
  TALER is distributed in the hope that it will be useful, but WITHOUT ANY
  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
  A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
  You should have received a copy of the GNU General Public License along with
  TALER; see the file COPYING.  If not, see 
*/
/**
 * @file taler-auditor-sync.c
 * @brief Tool used by the auditor to make a 'safe' copy of the exchanges' database.
 * @author Christian Grothoff
 */
#include 
#include "taler_exchangedb_lib.h"
/**
 * Handle to access the exchange's source database.
 */
static struct TALER_EXCHANGEDB_Plugin *src;
/**
 * Handle to access the exchange's destination database.
 */
static struct TALER_EXCHANGEDB_Plugin *dst;
/**
 * Return value from #main().
 */
static int global_ret;
/**
 * Main task to do synchronization.
 */
static struct GNUNET_SCHEDULER_Task *sync_task;
/**
 * What is our target transaction size (number of records)?
 */
static unsigned int transaction_size = 512;
/**
 * Number of records copied in this transaction.
 */
static unsigned long long actual_size;
/**
 * Terminate once synchronization is achieved.
 */
static int exit_if_synced;
/**
 * Information we track per replicated table.
 */
struct Table
{
  /**
   * Which table is this record about?
   */
  enum TALER_EXCHANGEDB_ReplicatedTable rt;
  /**
   * Up to which record is the destination table synchronized.
   */
  uint64_t start_serial;
  /**
   * Highest serial in the source table.
   */
  uint64_t end_serial;
  /**
   * Marker for the end of the list of #tables.
   */
  bool end;
};
/**
 * Information about replicated tables.
 */
static struct Table tables[] = {
  { .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS},
  { .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS},
  { .rt = TALER_EXCHANGEDB_RT_WIRE_TARGETS},
  { .rt = TALER_EXCHANGEDB_RT_LEGITIMIZATION_PROCESSES},
  { .rt = TALER_EXCHANGEDB_RT_LEGITIMIZATION_REQUIREMENTS},
  { .rt = TALER_EXCHANGEDB_RT_RESERVES},
  { .rt = TALER_EXCHANGEDB_RT_RESERVES_IN},
  { .rt = TALER_EXCHANGEDB_RT_RESERVES_CLOSE},
  { .rt = TALER_EXCHANGEDB_RT_RESERVES_OPEN_REQUESTS},
  { .rt = TALER_EXCHANGEDB_RT_RESERVES_OPEN_DEPOSITS},
  { .rt = TALER_EXCHANGEDB_RT_RESERVES_OUT},
  { .rt = TALER_EXCHANGEDB_RT_AUDITORS},
  { .rt = TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS},
  { .rt = TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS},
  { .rt = TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS},
  { .rt = TALER_EXCHANGEDB_RT_KNOWN_COINS},
  { .rt = TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS},
  { .rt = TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS},
  { .rt = TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS},
  { .rt = TALER_EXCHANGEDB_RT_DEPOSITS},
  { .rt = TALER_EXCHANGEDB_RT_REFUNDS},
  { .rt = TALER_EXCHANGEDB_RT_WIRE_OUT},
  { .rt = TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING},
  { .rt = TALER_EXCHANGEDB_RT_WIRE_FEE},
  { .rt = TALER_EXCHANGEDB_RT_GLOBAL_FEE},
  { .rt = TALER_EXCHANGEDB_RT_RECOUP},
  { .rt = TALER_EXCHANGEDB_RT_RECOUP_REFRESH },
  { .rt = TALER_EXCHANGEDB_RT_EXTENSIONS},
  { .rt = TALER_EXCHANGEDB_RT_POLICY_DETAILS },
  { .rt = TALER_EXCHANGEDB_RT_POLICY_FULFILLMENTS },
  { .rt = TALER_EXCHANGEDB_RT_PURSE_REQUESTS},
  { .rt = TALER_EXCHANGEDB_RT_PURSE_DECISION},
  { .rt = TALER_EXCHANGEDB_RT_PURSE_MERGES},
  { .rt = TALER_EXCHANGEDB_RT_PURSE_DEPOSITS},
  { .rt = TALER_EXCHANGEDB_RT_ACCOUNT_MERGES},
  { .rt = TALER_EXCHANGEDB_RT_HISTORY_REQUESTS},
  { .rt = TALER_EXCHANGEDB_RT_CLOSE_REQUESTS},
  { .rt = TALER_EXCHANGEDB_RT_WADS_OUT},
  { .rt = TALER_EXCHANGEDB_RT_WADS_OUT_ENTRIES},
  { .rt = TALER_EXCHANGEDB_RT_WADS_IN},
  { .rt = TALER_EXCHANGEDB_RT_WADS_IN_ENTRIES},
  { .rt = TALER_EXCHANGEDB_RT_PROFIT_DRAINS},
  { .end = true }
};
/**
 * Closure for #do_insert.
 */
struct InsertContext
{
  /**
   * Table we are replicating.
   */
  struct Table *table;
  /**
   * Set to error if insertion created an error.
   */
  enum GNUNET_DB_QueryStatus qs;
};
/**
 * Function called on data to replicate in the auditor's database.
 *
 * @param cls closure, a `struct InsertContext`
 * @param td record from an exchange table
 * @return #GNUNET_OK to continue to iterate,
 *         #GNUNET_SYSERR to fail with an error
 */
static enum GNUNET_GenericReturnValue
do_insert (void *cls,
           const struct TALER_EXCHANGEDB_TableData *td)
{
  struct InsertContext *ctx = cls;
  enum GNUNET_DB_QueryStatus qs;
  if (0 >= ctx->qs)
    return GNUNET_SYSERR;
  qs = dst->insert_records_by_table (dst->cls,
                                     td);
  if (0 >= qs)
  {
    switch (qs)
    {
    case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
      GNUNET_assert (0);
      break;
    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                  "Failed to insert record into table %d: no change\n",
                  td->table);
      break;
    case GNUNET_DB_STATUS_SOFT_ERROR:
      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                  "Serialization error inserting record into table %d (will retry)\n",
                  td->table);
      break;
    case GNUNET_DB_STATUS_HARD_ERROR:
      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                  "Failed to insert record into table %d: hard error\n",
                  td->table);
      break;
    }
    ctx->qs = qs;
    return GNUNET_SYSERR;
  }
  actual_size++;
  ctx->table->start_serial = td->serial;
  return GNUNET_OK;
}
/**
 * Run one replication transaction.
 *
 * @return #GNUNET_OK on success, #GNUNET_SYSERR to rollback
 */
static enum GNUNET_GenericReturnValue
transact (void)
{
  struct InsertContext ctx = {
    .qs = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
  };
  if (0 >
      src->start (src->cls,
                  "lookup src serials"))
    return GNUNET_SYSERR;
  for (unsigned int i = 0; ! tables[i].end; i++)
    src->lookup_serial_by_table (src->cls,
                                 tables[i].rt,
                                 &tables[i].end_serial);
  src->rollback (src->cls);
  if (GNUNET_OK !=
      dst->start (dst->cls,
                  "lookup dst serials"))
    return GNUNET_SYSERR;
  for (unsigned int i = 0; ! tables[i].end; i++)
    dst->lookup_serial_by_table (dst->cls,
                                 tables[i].rt,
                                 &tables[i].start_serial);
  dst->rollback (dst->cls);
  for (unsigned int i = 0; ! tables[i].end; i++)
  {
    struct Table *table = &tables[i];
    if (table->start_serial == table->end_serial)
      continue;
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Replicating table %d from %llu to %llu\n",
                i,
                (unsigned long long) table->start_serial,
                (unsigned long long) table->end_serial);
    ctx.table = table;
    while (table->start_serial < table->end_serial)
    {
      enum GNUNET_DB_QueryStatus qs;
      if (GNUNET_OK !=
          src->start (src->cls,
                      "copy table (src)"))
        return GNUNET_SYSERR;
      if (GNUNET_OK !=
          dst->start (dst->cls,
                      "copy table (dst)"))
        return GNUNET_SYSERR;
      qs = src->lookup_records_by_table (src->cls,
                                         table->rt,
                                         table->start_serial,
                                         &do_insert,
                                         &ctx);
      if (ctx.qs < 0)
        qs = ctx.qs;
      if (GNUNET_DB_STATUS_HARD_ERROR == qs)
      {
        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                    "Failed to lookup records from table %d: hard error\n",
                    i);
        global_ret = EXIT_FAILURE;
        return GNUNET_SYSERR;
      }
      if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
      {
        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                    "Serialization error looking up records from table %d (will retry)\n",
                    i);
        return GNUNET_SYSERR; /* will retry */
      }
      if (0 == qs)
      {
        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                    "Failed to lookup records from table %d: no results\n",
                    i);
        GNUNET_break (0); /* should be impossible */
        global_ret = EXIT_FAILURE;
        return GNUNET_SYSERR;
      }
      if (0 == ctx.qs)
        return GNUNET_SYSERR; /* insertion failed, maybe record existed? try again */
      src->rollback (src->cls);
      qs = dst->commit (dst->cls);
      if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
      {
        GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                    "Serialization error committing transaction on table %d (will retry)\n",
                    i);
        continue;
      }
      if (GNUNET_DB_STATUS_HARD_ERROR == qs)
      {
        GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                    "Hard error committing transaction on table %d\n",
                    i);
        global_ret = EXIT_FAILURE;
        return GNUNET_SYSERR;
      }
    }
  }
  /* we do not care about conflicting UPDATEs to src table, so safe to just rollback */
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Sync pass completed successfully with %llu updates\n",
              actual_size);
  return GNUNET_OK;
}
/**
 * Task to do the actual synchronization work.
 *
 * @param cls NULL, unused
 */
static void
do_sync (void *cls)
{
  static struct GNUNET_TIME_Relative delay;
  (void) cls;
  sync_task = NULL;
  actual_size = 0;
  if (GNUNET_SYSERR ==
      src->preflight (src->cls))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to begin transaction with data source. Exiting\n");
    return;
  }
  if (GNUNET_SYSERR ==
      dst->preflight (dst->cls))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Failed to begin transaction with data destination. Exiting\n");
    return;
  }
  if (GNUNET_OK != transact ())
  {
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Transaction failed, rolling back\n");
    src->rollback (src->cls);
    dst->rollback (dst->cls);
  }
  if (0 != global_ret)
  {
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Transaction failed permanently, exiting\n");
    return;
  }
  if ( (0 == actual_size) &&
       (exit_if_synced) )
  {
    GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                "Databases are synchronized. Exiting\n");
    return;
  }
  if (actual_size < transaction_size / 2)
  {
    delay = GNUNET_TIME_STD_BACKOFF (delay);
  }
  else if (actual_size >= transaction_size)
  {
    delay = GNUNET_TIME_UNIT_ZERO;
  }
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Next sync pass in %s\n",
              GNUNET_STRINGS_relative_time_to_string (delay,
                                                      GNUNET_YES));
  sync_task = GNUNET_SCHEDULER_add_delayed (delay,
                                            &do_sync,
                                            NULL);
}
/**
 * Set an option of type 'char *' from the command line with
 * filename expansion a la #GNUNET_STRINGS_filename_expand().
 *
 * @param ctx command line processing context
 * @param scls additional closure (will point to the `char *`,
 *             which will be allocated)
 * @param option name of the option
 * @param value actual value of the option (a string)
 * @return #GNUNET_OK
 */
static enum GNUNET_GenericReturnValue
set_filename (struct GNUNET_GETOPT_CommandLineProcessorContext *ctx,
              void *scls,
              const char *option,
              const char *value)
{
  char **val = scls;
  (void) ctx;
  (void) option;
  GNUNET_assert (NULL != value);
  GNUNET_free (*val);
  *val = GNUNET_STRINGS_filename_expand (value);
  return GNUNET_OK;
}
/**
 * Allow user to specify configuration file name (-s option)
 *
 * @param[out] fn set to the name of the configuration file
 */
static struct GNUNET_GETOPT_CommandLineOption
option_cfgfile_src (char **fn)
{
  struct GNUNET_GETOPT_CommandLineOption clo = {
    .shortName = 's',
    .name = "source-configuration",
    .argumentHelp = "FILENAME",
    .description = gettext_noop (
      "use configuration file FILENAME for the SOURCE database"),
    .require_argument = 1,
    .processor = &set_filename,
    .scls = (void *) fn
  };
  return clo;
}
/**
 * Allow user to specify configuration file name (-d option)
 *
 * @param[out] fn set to the name of the configuration file
 */
static struct GNUNET_GETOPT_CommandLineOption
option_cfgfile_dst (char **fn)
{
  struct GNUNET_GETOPT_CommandLineOption clo = {
    .shortName = 'd',
    .name = "destination-configuration",
    .argumentHelp = "FILENAME",
    .description = gettext_noop (
      "use configuration file FILENAME for the DESTINATION database"),
    .require_argument = 1,
    .processor = &set_filename,
    .scls = (void *) fn
  };
  return clo;
}
static struct GNUNET_CONFIGURATION_Handle *
load_config (const char *cfgfile)
{
  struct GNUNET_CONFIGURATION_Handle *cfg;
  cfg = GNUNET_CONFIGURATION_create ();
  GNUNET_log (GNUNET_ERROR_TYPE_INFO,
              "Loading config file: %s\n",
              cfgfile);
  if (GNUNET_SYSERR ==
      GNUNET_CONFIGURATION_load (cfg,
                                 cfgfile))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Malformed configuration file `%s', exit ...\n",
                cfgfile);
    GNUNET_CONFIGURATION_destroy (cfg);
    return NULL;
  }
  return cfg;
}
/**
 * Shutdown task.
 *
 * @param cls NULL, unused
 */
static void
do_shutdown (void *cls)
{
  (void) cls;
  if (NULL != sync_task)
  {
    GNUNET_SCHEDULER_cancel (sync_task);
    sync_task = NULL;
  }
}
/**
 * Initial task.
 *
 * @param cls NULL, unused
 */
static void
run (void *cls)
{
  (void) cls;
  GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
                                 NULL);
  sync_task = GNUNET_SCHEDULER_add_now (&do_sync,
                                        NULL);
}
/**
 * Setup plugins in #src and #dst and #run() the main
 * logic with those plugins.
 */
static void
setup (struct GNUNET_CONFIGURATION_Handle *src_cfg,
       struct GNUNET_CONFIGURATION_Handle *dst_cfg)
{
  src = TALER_EXCHANGEDB_plugin_load (src_cfg);
  if (NULL == src)
  {
    global_ret = EXIT_NOTINSTALLED;
    return;
  }
  dst = TALER_EXCHANGEDB_plugin_load (dst_cfg);
  if (NULL == dst)
  {
    global_ret = EXIT_NOTINSTALLED;
    TALER_EXCHANGEDB_plugin_unload (src);
    src = NULL;
    return;
  }
  GNUNET_SCHEDULER_run (&run,
                        NULL);
  TALER_EXCHANGEDB_plugin_unload (src);
  src = NULL;
  TALER_EXCHANGEDB_plugin_unload (dst);
  dst = NULL;
}
/**
 * The main function of the taler-auditor-exchange tool.  This tool is used
 * to add (or remove) an exchange's master key and base URL to the auditor's
 * database.
 *
 * @param argc number of arguments from the command line
 * @param argv command line arguments
 * @return 0 ok, non-zero on error
 */
int
main (int argc,
      char *const *argv)
{
  char *src_cfgfile = NULL;
  char *dst_cfgfile = NULL;
  char *level = GNUNET_strdup ("WARNING");
  struct GNUNET_CONFIGURATION_Handle *src_cfg;
  struct GNUNET_CONFIGURATION_Handle *dst_cfg;
  const struct GNUNET_GETOPT_CommandLineOption options[] = {
    GNUNET_GETOPT_option_mandatory (
      option_cfgfile_src (&src_cfgfile)),
    GNUNET_GETOPT_option_mandatory (
      option_cfgfile_dst (&dst_cfgfile)),
    GNUNET_GETOPT_option_help (
      gettext_noop ("Make a safe copy of an exchange database")),
    GNUNET_GETOPT_option_uint (
      'b',
      "batch",
      "SIZE",
      gettext_noop (
        "target SIZE for a the number of records to copy in one transaction"),
      &transaction_size),
    GNUNET_GETOPT_option_flag (
      't',
      "terminate-when-synchronized",
      gettext_noop (
        "terminate as soon as the databases are synchronized"),
      &exit_if_synced),
    GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
    GNUNET_GETOPT_option_loglevel (&level),
    GNUNET_GETOPT_OPTION_END
  };
  TALER_OS_init ();
  TALER_gcrypt_init (); /* must trigger initialization manually at this point! */
  {
    int ret;
    ret = GNUNET_GETOPT_run ("taler-auditor-sync",
                             options,
                             argc, argv);
    if (GNUNET_NO == ret)
      return EXIT_SUCCESS;
    if (GNUNET_SYSERR == ret)
      return EXIT_INVALIDARGUMENT;
  }
  GNUNET_assert (GNUNET_OK ==
                 GNUNET_log_setup ("taler-auditor-sync",
                                   level,
                                   NULL));
  GNUNET_free (level);
  if (0 == strcmp (src_cfgfile,
                   dst_cfgfile))
  {
    GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
                "Source and destination configuration files must differ!\n");
    return EXIT_INVALIDARGUMENT;
  }
  src_cfg = load_config (src_cfgfile);
  if (NULL == src_cfg)
  {
    GNUNET_free (src_cfgfile);
    GNUNET_free (dst_cfgfile);
    return EXIT_NOTCONFIGURED;
  }
  dst_cfg = load_config (dst_cfgfile);
  if (NULL == dst_cfg)
  {
    GNUNET_CONFIGURATION_destroy (src_cfg);
    GNUNET_free (src_cfgfile);
    GNUNET_free (dst_cfgfile);
    return EXIT_NOTCONFIGURED;
  }
  setup (src_cfg,
         dst_cfg);
  GNUNET_CONFIGURATION_destroy (src_cfg);
  GNUNET_CONFIGURATION_destroy (dst_cfg);
  GNUNET_free (src_cfgfile);
  GNUNET_free (dst_cfgfile);
  return global_ret;
}
/* end of taler-auditor-sync.c */