diff options
| author | Christian Grothoff <christian@grothoff.org> | 2020-03-12 10:23:26 +0100 | 
|---|---|---|
| committer | Christian Grothoff <christian@grothoff.org> | 2020-03-12 10:23:26 +0100 | 
| commit | b91fcbb92f21db498214cba38ffd6e3fe886d95e (patch) | |
| tree | 18ee0a5b8b28feef17e3ff5aedff490ddc7aebad /src | |
| parent | 83631bc98fe70dd73f212581fb54ab3a82560686 (diff) | |
finish separation of aggreator into aggregation, closing and transfer processes (test cases still need to be updated)
Diffstat (limited to 'src')
| -rw-r--r-- | src/exchange/.gitignore | 1 | ||||
| -rw-r--r-- | src/exchange/Makefile.am | 13 | ||||
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 338 | ||||
| -rw-r--r-- | src/exchange/taler-exchange-transfer.c | 544 | 
4 files changed, 560 insertions, 336 deletions
diff --git a/src/exchange/.gitignore b/src/exchange/.gitignore index 09cf60a8..5818f171 100644 --- a/src/exchange/.gitignore +++ b/src/exchange/.gitignore @@ -8,3 +8,4 @@ taler-exchange-wirewatch  test_taler_exchange_wirewatch-postgres  test_taler_exchange_httpd_home/.config/taler/account-1.json  taler-exchange-closer +taler-exchange-transfer diff --git a/src/exchange/Makefile.am b/src/exchange/Makefile.am index 227224d3..88753c5e 100644 --- a/src/exchange/Makefile.am +++ b/src/exchange/Makefile.am @@ -20,6 +20,7 @@ bin_PROGRAMS = \    taler-exchange-aggregator \    taler-exchange-closer \    taler-exchange-httpd \ +  taler-exchange-transfer \    taler-exchange-wirewatch  taler_exchange_aggregator_SOURCES = \ @@ -59,6 +60,18 @@ taler_exchange_wirewatch_LDADD = \    -lgnunetcurl \    -lgnunetutil +taler_exchange_transfer_SOURCES = \ +  taler-exchange-transfer.c +taler_exchange_transfer_LDADD = \ +  $(LIBGCRYPT_LIBS) \ +  $(top_builddir)/src/json/libtalerjson.la \ +  $(top_builddir)/src/util/libtalerutil.la \ +  $(top_builddir)/src/bank-lib/libtalerbank.la \ +  $(top_builddir)/src/exchangedb/libtalerexchangedb.la \ +  -ljansson \ +  -lgnunetcurl \ +  -lgnunetutil +  taler_exchange_httpd_SOURCES = \    taler-exchange-httpd.c taler-exchange-httpd.h \    taler-exchange-httpd_db.c taler-exchange-httpd_db.h \ diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 431abea4..59db4dae 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -18,19 +18,6 @@   * @file taler-exchange-aggregator.c   * @brief Process that aggregates outgoing transactions and executes them   * @author Christian Grothoff - * - * Note: - * It might be simpler and theoretically more performant to split up - * this process into three: - * - one that runs the 'pending' wire transfers - * - one that performs aggregation - * - one that closes (expired) reserves - * - * They would have some (minor) code duplication to load the database and wire - * plugins and account data, and this would also slightly complicate - * operations by having to launch three processes. OTOH, those processes could - * then fail independently, which might also be a good thing.  In any case, - * doing this is not expected to be complicated.   */  #include "platform.h"  #include <gnunet/gnunet_util_lib.h> @@ -43,38 +30,6 @@  /** - * Data we keep to #run_transfers().  There is at most - * one of these around at any given point in time. - * Note that this limits parallelism, and we might want - * to revise this decision at a later point. - */ -struct WirePrepareData -{ - -  /** -   * Database session for all of our transactions. -   */ -  struct TALER_EXCHANGEDB_Session *session; - -  /** -   * Wire execution handle. -   */ -  struct TALER_BANK_TransferHandle *eh; - -  /** -   * Wire account used for this preparation. -   */ -  struct TALER_EXCHANGEDB_WireAccount *wa; - -  /** -   * Row ID of the transfer. -   */ -  unsigned long long row_id; - -}; - - -/**   * Information about one aggregation process to be executed.  There is   * at most one of these around at any given point in time.   * Note that this limits parallelism, and we might want @@ -202,22 +157,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;  static struct GNUNET_SCHEDULER_Task *task;  /** - * If we are currently executing a transfer, information about - * the active transfer is here. Otherwise, this variable is NULL. - */ -static struct WirePrepareData *wpd; - -/** - * Handle to the context for interacting with the bank / wire gateway. - */ -static struct GNUNET_CURL_Context *ctx; - -/** - * Scheduler context for running the @e ctx. - */ -static struct GNUNET_CURL_RescheduleContext *rc; - -/**   * How long should we sleep when idle before trying to find more work?   */  static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; @@ -245,16 +184,6 @@ run_aggregation (void *cls);  /** - * Execute the wire transfers that we have committed to - * do. - * - * @param cls NULL - */ -static void -run_transfers (void *cls); - - -/**   * Free data stored in @a au, but not @a au itself (stack allocated).   *   * @param au aggreation unit to clean up @@ -281,16 +210,6 @@ static void  shutdown_task (void *cls)  {    (void) cls; -  if (NULL != ctx) -  { -    GNUNET_CURL_fini (ctx); -    ctx = NULL; -  } -  if (NULL != rc) -  { -    GNUNET_CURL_gnunet_rc_destroy (rc); -    rc = NULL; -  }    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                "Running shutdown\n");    if (NULL != task) @@ -298,18 +217,6 @@ shutdown_task (void *cls)      GNUNET_SCHEDULER_cancel (task);      task = NULL;    } -  if (NULL != wpd) -  { -    if (NULL != wpd->eh) -    { -      TALER_BANK_transfer_cancel (wpd->eh); -      wpd->eh = NULL; -    } -    db_plugin->rollback (db_plugin->cls, -                         wpd->session); -    GNUNET_free (wpd); -    wpd = NULL; -  }    TALER_EXCHANGEDB_plugin_unload (db_plugin);    db_plugin = NULL;    TALER_EXCHANGEDB_unload_accounts (); @@ -1038,106 +945,11 @@ run_aggregation (void *cls)      return;    case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:      GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                "Preparation complete, switching to transfer mode\n"); -    /* run alternative task: actually do wire transfer! */ -    GNUNET_assert (NULL == task); -    task = GNUNET_SCHEDULER_add_now (&run_transfers, -                                     NULL); -    return; -  default: -    GNUNET_break (0); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    return; -  } -} - - -/** - * Function called with the result from the execute step. - * - * @param cls NULL - * @param http_status_code #MHD_HTTP_OK on success - * @param ec taler error code - * @param row_id unique ID of the wire transfer in the bank's records - * @param wire_timestamp when did the transfer happen - */ -static void -wire_confirm_cb (void *cls, -                 unsigned int http_status_code, -                 enum TALER_ErrorCode ec, -                 uint64_t row_id, -                 struct GNUNET_TIME_Absolute wire_timestamp) -{ -  struct TALER_EXCHANGEDB_Session *session = wpd->session; -  enum GNUNET_DB_QueryStatus qs; - -  (void) cls; -  (void) row_id; -  (void) wire_timestamp; -  wpd->eh = NULL; -  if (MHD_HTTP_OK != http_status_code) -  { -    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Wire transaction failed: %u/%d\n", -                http_status_code, -                ec); -    db_plugin->rollback (db_plugin->cls, -                         session); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    GNUNET_free (wpd); -    wpd = NULL; -    return; -  } -  qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, -                                                   session, -                                                   wpd->row_id); -  if (0 >= qs) -  { -    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); -    db_plugin->rollback (db_plugin->cls, -                         session); -    if (GNUNET_DB_STATUS_SOFT_ERROR == qs) -    { -      /* try again */ -      GNUNET_assert (NULL == task); -      task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                       NULL); -    } -    else -    { -      global_ret = GNUNET_SYSERR; -      GNUNET_SCHEDULER_shutdown (); -    } -    GNUNET_free (wpd); -    wpd = NULL; -    return; -  } -  GNUNET_free (wpd); -  wpd = NULL; -  switch (commit_or_warn (session)) -  { -  case GNUNET_DB_STATUS_SOFT_ERROR: -    /* try again */ +                "Preparation complete, going again\n");      GNUNET_assert (NULL == task);      task = GNUNET_SCHEDULER_add_now (&run_aggregation,                                       NULL);      return; -  case GNUNET_DB_STATUS_HARD_ERROR: -    GNUNET_break (0); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    return; -  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: -    GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                "Wire transfer complete\n"); -    /* continue with #run_transfers(), just to guard -       against the unlikely case that there are more. */ -    GNUNET_assert (NULL == task); -    task = GNUNET_SCHEDULER_add_now (&run_transfers, -                                     NULL); -    return;    default:      GNUNET_break (0);      global_ret = GNUNET_SYSERR; @@ -1148,143 +960,6 @@ wire_confirm_cb (void *cls,  /** - * Callback with data about a prepared transaction. - * - * @param cls NULL - * @param rowid row identifier used to mark prepared transaction as done - * @param wire_method wire method the preparation was done for - * @param buf transaction data that was persisted, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -wire_prepare_cb (void *cls, -                 uint64_t rowid, -                 const char *wire_method, -                 const char *buf, -                 size_t buf_size) -{ -  struct TALER_EXCHANGEDB_WireAccount *wa; - -  (void) cls; -  wpd->row_id = rowid; -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, -              "Starting wire transfer %llu\n", -              (unsigned long long) rowid); -  wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method); -  if (NULL == wpd->wa) -  { -    /* Should really never happen here, as when we get -       here the wire account should be in the cache. */ -    GNUNET_break (0); -    db_plugin->rollback (db_plugin->cls, -                         wpd->session); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    GNUNET_free (wpd); -    wpd = NULL; -    return; -  } -  wa = wpd->wa; -  wpd->eh = TALER_BANK_transfer (ctx, -                                 &wa->auth, -                                 buf, -                                 buf_size, -                                 &wire_confirm_cb, -                                 NULL); -  if (NULL == wpd->eh) -  { -    GNUNET_break (0); /* Irrecoverable */ -    db_plugin->rollback (db_plugin->cls, -                         wpd->session); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    GNUNET_free (wpd); -    wpd = NULL; -    return; -  } -} - - -/** - * Execute the wire transfers that we have committed to - * do. - * - * @param cls NULL - */ -static void -run_transfers (void *cls) -{ -  enum GNUNET_DB_QueryStatus qs; -  struct TALER_EXCHANGEDB_Session *session; -  const struct GNUNET_SCHEDULER_TaskContext *tc; - -  (void) cls; -  task = NULL; -  GNUNET_log (GNUNET_ERROR_TYPE_INFO, -              "Checking for pending wire transfers\n"); -  tc = GNUNET_SCHEDULER_get_task_context (); -  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) -    return; -  if (NULL == (session = db_plugin->get_session (db_plugin->cls))) -  { -    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Failed to obtain database session!\n"); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    return; -  } -  if (GNUNET_OK != -      db_plugin->start (db_plugin->cls, -                        session, -                        "aggregator run transfer")) -  { -    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, -                "Failed to start database transaction!\n"); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    return; -  } -  wpd = GNUNET_new (struct WirePrepareData); -  wpd->session = session; -  qs = db_plugin->wire_prepare_data_get (db_plugin->cls, -                                         session, -                                         &wire_prepare_cb, -                                         NULL); -  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) -    return;  /* continued via continuation set in #wire_prepare_cb() */ -  db_plugin->rollback (db_plugin->cls, -                       session); -  GNUNET_free (wpd); -  wpd = NULL; -  switch (qs) -  { -  case GNUNET_DB_STATUS_HARD_ERROR: -    GNUNET_break (0); -    global_ret = GNUNET_SYSERR; -    GNUNET_SCHEDULER_shutdown (); -    return; -  case GNUNET_DB_STATUS_SOFT_ERROR: -    /* try again */ -    GNUNET_assert (NULL == task); -    task = GNUNET_SCHEDULER_add_now (&run_transfers, -                                     NULL); -    return; -  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: -    /* no more prepared wire transfers, go back to aggregation! */ -    GNUNET_log (GNUNET_ERROR_TYPE_INFO, -                "No more pending wire transfers, starting aggregation\n"); -    GNUNET_assert (NULL == task); -    task = GNUNET_SCHEDULER_add_now (&run_aggregation, -                                     NULL); -    return; -  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: -    /* should be impossible */ -    GNUNET_assert (0); -  } -} - - -/**   * First task.   *   * @param cls closure, NULL @@ -1309,17 +984,8 @@ run (void *cls,      global_ret = 1;      return;    } -  ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, -                          &rc); -  rc = GNUNET_CURL_gnunet_rc_create (ctx); -  if (NULL == ctx) -  { -    GNUNET_break (0); -    return; -  } -    GNUNET_assert (NULL == task); -  task = GNUNET_SCHEDULER_add_now (&run_transfers, +  task = GNUNET_SCHEDULER_add_now (&run_aggregation,                                     NULL);    GNUNET_SCHEDULER_add_shutdown (&shutdown_task,                                   cls); diff --git a/src/exchange/taler-exchange-transfer.c b/src/exchange/taler-exchange-transfer.c new file mode 100644 index 00000000..e8c0929b --- /dev/null +++ b/src/exchange/taler-exchange-transfer.c @@ -0,0 +1,544 @@ +/* +  This file is part of TALER +  Copyright (C) 2016-2020 Taler Systems SA + +  TALER is free software; you can redistribute it and/or modify it under the +  terms of the GNU Affero General Public License as published by the Free Software +  Foundation; either version 3, or (at your option) any later version. + +  TALER is distributed in the hope that it will be useful, but WITHOUT ANY +  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +  A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details. + +  You should have received a copy of the GNU Affero General Public License along with +  TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/> +*/ + +/** + * @file taler-exchange-transfer.c + * @brief Process that actually finalizes outgoing transfers with the wire gateway / bank + * @author Christian Grothoff + */ +#include "platform.h" +#include <gnunet/gnunet_util_lib.h> +#include <jansson.h> +#include <pthread.h> +#include "taler_exchangedb_lib.h" +#include "taler_exchangedb_plugin.h" +#include "taler_json_lib.h" +#include "taler_bank_service.h" + + +/** + * Data we keep to #run_transfers().  There is at most + * one of these around at any given point in time. + * Note that this limits parallelism, and we might want + * to revise this decision at a later point. + */ +struct WirePrepareData +{ + +  /** +   * Database session for all of our transactions. +   */ +  struct TALER_EXCHANGEDB_Session *session; + +  /** +   * Wire execution handle. +   */ +  struct TALER_BANK_TransferHandle *eh; + +  /** +   * Wire account used for this preparation. +   */ +  struct TALER_EXCHANGEDB_WireAccount *wa; + +  /** +   * Row ID of the transfer. +   */ +  unsigned long long row_id; + +}; + + +/** + * The exchange's configuration. + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Our database plugin. + */ +static struct TALER_EXCHANGEDB_Plugin *db_plugin; + +/** + * Next task to run, if any. + */ +static struct GNUNET_SCHEDULER_Task *task; + +/** + * If we are currently executing a transfer, information about + * the active transfer is here. Otherwise, this variable is NULL. + */ +static struct WirePrepareData *wpd; + +/** + * Handle to the context for interacting with the bank / wire gateway. + */ +static struct GNUNET_CURL_Context *ctx; + +/** + * Scheduler context for running the @e ctx. + */ +static struct GNUNET_CURL_RescheduleContext *rc; + +/** + * How long should we sleep when idle before trying to find more work? + */ +static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; + +/** + * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR + * on serious errors. + */ +static int global_ret; + +/** + * #GNUNET_YES if we are in test mode and should exit when idle. + */ +static int test_mode; + + +/** + * Execute the wire transfers that we have committed to + * do. + * + * @param cls NULL + */ +static void +run_transfers (void *cls); + + +/** + * We're being aborted with CTRL-C (or SIGTERM). Shut down. + * + * @param cls closure + */ +static void +shutdown_task (void *cls) +{ +  (void) cls; +  if (NULL != ctx) +  { +    GNUNET_CURL_fini (ctx); +    ctx = NULL; +  } +  if (NULL != rc) +  { +    GNUNET_CURL_gnunet_rc_destroy (rc); +    rc = NULL; +  } +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Running shutdown\n"); +  if (NULL != task) +  { +    GNUNET_SCHEDULER_cancel (task); +    task = NULL; +  } +  if (NULL != wpd) +  { +    if (NULL != wpd->eh) +    { +      TALER_BANK_transfer_cancel (wpd->eh); +      wpd->eh = NULL; +    } +    db_plugin->rollback (db_plugin->cls, +                         wpd->session); +    GNUNET_free (wpd); +    wpd = NULL; +  } +  TALER_EXCHANGEDB_plugin_unload (db_plugin); +  db_plugin = NULL; +  TALER_EXCHANGEDB_unload_accounts (); +  cfg = NULL; +} + + +/** + * Parse the configuration for wirewatch. + * + * @return #GNUNET_OK on success + */ +static int +parse_wirewatch_config () +{ +  if (GNUNET_OK != +      GNUNET_CONFIGURATION_get_value_time (cfg, +                                           "exchange", +                                           "AGGREGATOR_IDLE_SLEEP_INTERVAL", +                                           &aggregator_idle_sleep_interval)) +  { +    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, +                               "exchange", +                               "AGGREGATOR_IDLE_SLEEP_INTERVAL"); +    return GNUNET_SYSERR; +  } +  if (NULL == +      (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to initialize DB subsystem\n"); +    return GNUNET_SYSERR; +  } +  if (GNUNET_OK != +      TALER_EXCHANGEDB_load_accounts (cfg)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "No wire accounts configured for debit!\n"); +    TALER_EXCHANGEDB_plugin_unload (db_plugin); +    db_plugin = NULL; +    return GNUNET_SYSERR; +  } +  return GNUNET_OK; +} + + +/** + * Perform a database commit. If it fails, print a warning. + * + * @param session session to perform the commit for. + * @return status of commit + */ +static enum GNUNET_DB_QueryStatus +commit_or_warn (struct TALER_EXCHANGEDB_Session *session) +{ +  enum GNUNET_DB_QueryStatus qs; + +  qs = db_plugin->commit (db_plugin->cls, +                          session); +  if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) +    return qs; +  GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) +              ? GNUNET_ERROR_TYPE_INFO +              : GNUNET_ERROR_TYPE_ERROR, +              "Failed to commit database transaction!\n"); +  return qs; +} + + +/** + * Function called with the result from the execute step. + * + * @param cls NULL + * @param http_status_code #MHD_HTTP_OK on success + * @param ec taler error code + * @param row_id unique ID of the wire transfer in the bank's records + * @param wire_timestamp when did the transfer happen + */ +static void +wire_confirm_cb (void *cls, +                 unsigned int http_status_code, +                 enum TALER_ErrorCode ec, +                 uint64_t row_id, +                 struct GNUNET_TIME_Absolute wire_timestamp) +{ +  struct TALER_EXCHANGEDB_Session *session = wpd->session; +  enum GNUNET_DB_QueryStatus qs; + +  (void) cls; +  (void) row_id; +  (void) wire_timestamp; +  wpd->eh = NULL; +  if (MHD_HTTP_OK != http_status_code) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Wire transaction failed: %u/%d\n", +                http_status_code, +                ec); +    db_plugin->rollback (db_plugin->cls, +                         session); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    GNUNET_free (wpd); +    wpd = NULL; +    return; +  } +  qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, +                                                   session, +                                                   wpd->row_id); +  if (0 >= qs) +  { +    GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); +    db_plugin->rollback (db_plugin->cls, +                         session); +    if (GNUNET_DB_STATUS_SOFT_ERROR == qs) +    { +      /* try again */ +      GNUNET_assert (NULL == task); +      task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                       NULL); +    } +    else +    { +      global_ret = GNUNET_SYSERR; +      GNUNET_SCHEDULER_shutdown (); +    } +    GNUNET_free (wpd); +    wpd = NULL; +    return; +  } +  GNUNET_free (wpd); +  wpd = NULL; +  switch (commit_or_warn (session)) +  { +  case GNUNET_DB_STATUS_SOFT_ERROR: +    /* try again */ +    GNUNET_assert (NULL == task); +    task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                     NULL); +    return; +  case GNUNET_DB_STATUS_HARD_ERROR: +    GNUNET_break (0); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    return; +  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Wire transfer complete\n"); +    /* continue with #run_transfers(), just to guard +       against the unlikely case that there are more. */ +    GNUNET_assert (NULL == task); +    task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                     NULL); +    return; +  default: +    GNUNET_break (0); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    return; +  } +} + + +/** + * Callback with data about a prepared transaction. + * + * @param cls NULL + * @param rowid row identifier used to mark prepared transaction as done + * @param wire_method wire method the preparation was done for + * @param buf transaction data that was persisted, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +wire_prepare_cb (void *cls, +                 uint64_t rowid, +                 const char *wire_method, +                 const char *buf, +                 size_t buf_size) +{ +  struct TALER_EXCHANGEDB_WireAccount *wa; + +  (void) cls; +  wpd->row_id = rowid; +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Starting wire transfer %llu\n", +              (unsigned long long) rowid); +  wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method); +  if (NULL == wpd->wa) +  { +    /* Should really never happen here, as when we get +       here the wire account should be in the cache. */ +    GNUNET_break (0); +    db_plugin->rollback (db_plugin->cls, +                         wpd->session); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    GNUNET_free (wpd); +    wpd = NULL; +    return; +  } +  wa = wpd->wa; +  wpd->eh = TALER_BANK_transfer (ctx, +                                 &wa->auth, +                                 buf, +                                 buf_size, +                                 &wire_confirm_cb, +                                 NULL); +  if (NULL == wpd->eh) +  { +    GNUNET_break (0); /* Irrecoverable */ +    db_plugin->rollback (db_plugin->cls, +                         wpd->session); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    GNUNET_free (wpd); +    wpd = NULL; +    return; +  } +} + + +/** + * Execute the wire transfers that we have committed to + * do. + * + * @param cls NULL + */ +static void +run_transfers (void *cls) +{ +  enum GNUNET_DB_QueryStatus qs; +  struct TALER_EXCHANGEDB_Session *session; +  const struct GNUNET_SCHEDULER_TaskContext *tc; + +  (void) cls; +  task = NULL; +  GNUNET_log (GNUNET_ERROR_TYPE_INFO, +              "Checking for pending wire transfers\n"); +  tc = GNUNET_SCHEDULER_get_task_context (); +  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) +    return; +  if (NULL == (session = db_plugin->get_session (db_plugin->cls))) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to obtain database session!\n"); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    return; +  } +  if (GNUNET_OK != +      db_plugin->start (db_plugin->cls, +                        session, +                        "aggregator run transfer")) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to start database transaction!\n"); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    return; +  } +  wpd = GNUNET_new (struct WirePrepareData); +  wpd->session = session; +  qs = db_plugin->wire_prepare_data_get (db_plugin->cls, +                                         session, +                                         &wire_prepare_cb, +                                         NULL); +  if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) +    return;  /* continued via continuation set in #wire_prepare_cb() */ +  db_plugin->rollback (db_plugin->cls, +                       session); +  GNUNET_free (wpd); +  wpd = NULL; +  switch (qs) +  { +  case GNUNET_DB_STATUS_HARD_ERROR: +    GNUNET_break (0); +    global_ret = GNUNET_SYSERR; +    GNUNET_SCHEDULER_shutdown (); +    return; +  case GNUNET_DB_STATUS_SOFT_ERROR: +    /* try again */ +    GNUNET_assert (NULL == task); +    task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                     NULL); +    return; +  case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: +    /* no more prepared wire transfers, go sleep a bit! */ +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "No more pending wire transfers, going idle\n"); +    GNUNET_assert (NULL == task); +    task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, +                                         &run_transfers, +                                         NULL); +    return; +  case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: +    /* should be impossible */ +    GNUNET_assert (0); +  } +} + + +/** + * First task. + * + * @param cls closure, NULL + * @param args remaining command-line arguments + * @param cfgfile name of the configuration file used (for saving, can be NULL!) + * @param c configuration + */ +static void +run (void *cls, +     char *const *args, +     const char *cfgfile, +     const struct GNUNET_CONFIGURATION_Handle *c) +{ +  (void) cls; +  (void) args; +  (void) cfgfile; + +  cfg = c; +  if (GNUNET_OK != parse_wirewatch_config ()) +  { +    cfg = NULL; +    global_ret = 1; +    return; +  } +  ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, +                          &rc); +  rc = GNUNET_CURL_gnunet_rc_create (ctx); +  if (NULL == ctx) +  { +    GNUNET_break (0); +    return; +  } + +  GNUNET_assert (NULL == task); +  task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                   NULL); +  GNUNET_SCHEDULER_add_shutdown (&shutdown_task, +                                 cls); +} + + +/** + * The main function of the taler-exchange-transfer. + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, +      char *const *argv) +{ +  struct GNUNET_GETOPT_CommandLineOption options[] = { +    GNUNET_GETOPT_option_timetravel ('T', +                                     "timetravel"), +    GNUNET_GETOPT_option_flag ('t', +                               "test", +                               "run in test mode and exit when idle", +                               &test_mode), +    GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), +    GNUNET_GETOPT_OPTION_END +  }; + +  if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, +                                                 &argc, &argv)) +    return 2; +  if (GNUNET_OK != +      GNUNET_PROGRAM_run (argc, argv, +                          "taler-exchange-transfers", +                          gettext_noop ( +                            "background process that executes outgoing wire transfers"), +                          options, +                          &run, NULL)) +  { +    GNUNET_free ((void *) argv); +    return 1; +  } +  GNUNET_free ((void *) argv); +  return global_ret; +} + + +/* end of taler-exchange-transfer.c */  | 
