diff options
| author | Christian Grothoff <christian@grothoff.org> | 2016-03-01 15:35:04 +0100 | 
|---|---|---|
| committer | Christian Grothoff <christian@grothoff.org> | 2016-03-01 15:35:04 +0100 | 
| commit | b5cba3251053c22bf1df46282f1dd0a4c46f6a38 (patch) | |
| tree | b7495c3e47c40c57ff81045a4e43aa07a3b6c7b1 /src/exchange/taler-exchange-aggregator.c | |
| parent | e406833eab7ca0835f9779abebada94592a85a7e (diff) | |
renaming mint->exchange
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
| -rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 914 | 
1 files changed, 914 insertions, 0 deletions
| diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c new file mode 100644 index 00000000..3ffce1d3 --- /dev/null +++ b/src/exchange/taler-exchange-aggregator.c @@ -0,0 +1,914 @@ +/* +  This file is part of TALER +  Copyright (C) 2016 GNUnet e.V. + +  TALER is free software; you can redistribute it and/or modify it under the +  terms of the GNU Affero General Public License as published by the Free Software +  Foundation; either version 3, or (at your option) any later version. + +  TALER is distributed in the hope that it will be useful, but WITHOUT ANY +  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +  A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details. + +  You should have received a copy of the GNU Affero General Public License along with +  TALER; see the file COPYING.  If not, If not, see <http://www.gnu.org/licenses/> +*/ + +/** + * @file taler-exchange-aggregator.c + * @brief Process that aggregates outgoing transactions and executes them + * @author Christian Grothoff + * + * TODO: + * - simplify global_ret: make it a global! + * - handle shutdown more nicely (call 'cancel' method on wire transfers) + */ +#include "platform.h" +#include <gnunet/gnunet_util_lib.h> +#include <jansson.h> +#include <pthread.h> +#include "taler_exchangedb_lib.h" +#include "taler_exchangedb_plugin.h" +#include "taler_wire_lib.h" + +/** + * Which currency is used by this exchange? + */ +static char *exchange_currency_string; + +/** + * Which wireformat should be supported by this aggregator? + */ +static char *exchange_wireformat; + +/** + * Base directory of the exchange (global) + */ +static char *exchange_directory; + +/** + * The exchange's configuration (global) + */ +static struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Our DB plugin. + */ +static struct TALER_EXCHANGEDB_Plugin *db_plugin; + +/** + * Our wire plugin. + */ +static struct TALER_WIRE_Plugin *wire_plugin; + +/** + * Task for the main #run() function. + */ +static struct GNUNET_SCHEDULER_Task *task; + +/** + * Limit on the number of transactions we aggregate at once.  Note + * that the limit must be big enough to ensure that when transactions + * of the smallest possible unit are aggregated, they do surpass the + * "tiny" threshold beyond which we never trigger a wire transaction! + * + * TODO: make configurable (via config file or command line option) + */ +static unsigned int aggregation_limit = 10000; + + +/** + * Load configuration parameters for the exchange + * server into the corresponding global variables. + * + * @param exchange_directory the exchange's directory + * @return #GNUNET_OK on success + */ +static int +exchange_serve_process_config (const char *exchange_directory) +{ +  char *type; + +  cfg = TALER_config_load (exchange_directory); +  if (NULL == cfg) +  { +    fprintf (stderr, +             "Failed to load exchange configuration\n"); +    return GNUNET_SYSERR; +  } +  if (GNUNET_OK != +      GNUNET_CONFIGURATION_get_value_string (cfg, +                                             "exchange", +                                             "currency", +                                             &exchange_currency_string)) +  { +    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, +                               "exchange", +                               "currency"); +    return GNUNET_SYSERR; +  } +  if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN) +  { +    fprintf (stderr, +             "Currency `%s' longer than the allowed limit of %u characters.", +             exchange_currency_string, +             (unsigned int) TALER_CURRENCY_LEN); +    return GNUNET_SYSERR; +  } +  if (NULL != exchange_wireformat) +    GNUNET_CONFIGURATION_set_value_string (cfg, +                                           "exchange", +                                           "wireformat", +                                           exchange_wireformat); +  if (GNUNET_OK != +      GNUNET_CONFIGURATION_get_value_string (cfg, +                                             "exchange", +                                             "wireformat", +                                             &type)) +  { +    GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, +                               "exchange", +                               "wireformat"); +    return GNUNET_SYSERR; +  } + +  if (NULL == +      (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) +  { +    fprintf (stderr, +             "Failed to initialize DB subsystem\n"); +    GNUNET_free (type); +    return GNUNET_SYSERR; +  } + +  if (NULL == +      (wire_plugin = TALER_WIRE_plugin_load (cfg, +                                             type))) +  { +    fprintf (stderr, +             "Failed to load wire plugin for `%s'\n", +             type); +    GNUNET_free (type); +    return GNUNET_SYSERR; +  } +  GNUNET_free (type); + +  return GNUNET_OK; +} + + +/** + * Information about one aggregation process to + * be executed. + */ +struct AggregationUnit +{ +  /** +   * Public key of the merchant. +   */ +  struct TALER_MerchantPublicKeyP merchant_pub; + +  /** +   * Total amount to be transferred. +   */ +  struct TALER_Amount total_amount; + +  /** +   * Hash of @e wire. +   */ +  struct GNUNET_HashCode h_wire; + +  /** +   * Wire transfer identifier we use. +   */ +  struct TALER_WireTransferIdentifierRawP wtid; + +  /** +   * Row ID of the transaction that started it all. +   */ +  unsigned long long row_id; + +  /** +   * The current time. +   */ +  struct GNUNET_TIME_Absolute execution_time; + +  /** +   * Wire details of the merchant. +   */ +  json_t *wire; + +  /** +   * Database session for all of our transactions. +   */ +  struct TALER_EXCHANGEDB_Session *session; + +  /** +   * Wire preparation handle. +   */ +  struct TALER_WIRE_PrepareHandle *ph; + +  /** +   * Array of #aggregation_limit row_ids from the +   * aggregation. +   */ +  unsigned long long *additional_rows; + +  /** +   * Pointer to global return value. Closure for #run(). +   */ +  int *global_ret; + +  /** +   * Offset specifying how many #additional_rows are in use. +   */ +  unsigned int rows_offset; + +  /** +   * Set to #GNUNET_YES if we have to abort due to failure. +   */ +  int failed; + +}; + + +/** + * Function called with details about deposits that have been made, + * with the goal of executing the corresponding wire transaction. + * + * @param cls closure with the `struct AggregationUnit` + * @param row_id identifies database entry + * @param merchant_pub public key of the merchant + * @param coin_pub public key of the coin + * @param amount_with_fee amount that was deposited including fee + * @param deposit_fee amount the exchange gets to keep as transaction fees + * @param transaction_id unique transaction ID chosen by the merchant + * @param h_contract hash of the contract between merchant and customer + * @param wire_deadline by which the merchant adviced that he would like the + *        wire transfer to be executed + * @param wire wire details for the merchant + * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + */ +static int +deposit_cb (void *cls, +            unsigned long long row_id, +            const struct TALER_MerchantPublicKeyP *merchant_pub, +            const struct TALER_CoinSpendPublicKeyP *coin_pub, +            const struct TALER_Amount *amount_with_fee, +            const struct TALER_Amount *deposit_fee, +            uint64_t transaction_id, +            const struct GNUNET_HashCode *h_contract, +            struct GNUNET_TIME_Absolute wire_deadline, +            const json_t *wire) +{ +  struct AggregationUnit *au = cls; + +  au->merchant_pub = *merchant_pub; +  if (GNUNET_OK != +      TALER_amount_subtract (&au->total_amount, +                             amount_with_fee, +                             deposit_fee)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Fatally malformed record at %llu\n", +                row_id); +    return GNUNET_SYSERR; +  } +  au->row_id = row_id; +  au->wire = (json_t *) wire; +  au->execution_time = GNUNET_TIME_absolute_get (); +  TALER_hash_json (au->wire, +                   &au->h_wire); +  json_incref (au->wire); +  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, +                              &au->wtid, +                              sizeof (au->wtid)); +  if (GNUNET_OK != +      db_plugin->insert_aggregation_tracking (db_plugin->cls, +                                              au->session, +                                              &au->wtid, +                                              merchant_pub, +                                              &au->h_wire, +                                              h_contract, +                                              transaction_id, +                                              au->execution_time, +                                              coin_pub, +                                              amount_with_fee, +                                              deposit_fee)) +  { +    GNUNET_break (0); +    return GNUNET_SYSERR; +  } +  if (GNUNET_OK != +      db_plugin->mark_deposit_done (db_plugin->cls, +                                    au->session, +                                    row_id)) +  { +    GNUNET_break (0); +    au->failed = GNUNET_YES; +    return GNUNET_SYSERR; +  } +  return GNUNET_OK; +} + + + +/** + * Function called with details about another deposit we + * can aggregate into an existing aggregation unit. + * + * @param cls closure with the `struct AggregationUnit` + * @param row_id identifies database entry + * @param merchant_pub public key of the merchant + * @param coin_pub public key of the coin + * @param amount_with_fee amount that was deposited including fee + * @param deposit_fee amount the exchange gets to keep as transaction fees + * @param transaction_id unique transaction ID chosen by the merchant + * @param h_contract hash of the contract between merchant and customer + * @param wire_deadline by which the merchant adviced that he would like the + *        wire transfer to be executed + * @param wire wire details for the merchant + * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + */ +static int +aggregate_cb (void *cls, +              unsigned long long row_id, +              const struct TALER_MerchantPublicKeyP *merchant_pub, +              const struct TALER_CoinSpendPublicKeyP *coin_pub, +              const struct TALER_Amount *amount_with_fee, +              const struct TALER_Amount *deposit_fee, +              uint64_t transaction_id, +              const struct GNUNET_HashCode *h_contract, +              struct GNUNET_TIME_Absolute wire_deadline, +              const json_t *wire) +{ +  struct AggregationUnit *au = cls; +  struct TALER_Amount delta; + +  GNUNET_break (0 == +                memcmp (&au->merchant_pub, +                        merchant_pub, +                        sizeof (struct TALER_MerchantPublicKeyP))); +  /* compute contribution of this coin after fees */ +  if (GNUNET_OK != +      TALER_amount_subtract (&delta, +                             amount_with_fee, +                             deposit_fee)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Fatally malformed record at %llu\n", +                row_id); +    return GNUNET_SYSERR; +  } +  /* add to total */ +  if (GNUNET_OK != +      TALER_amount_add (&au->total_amount, +                        &au->total_amount, +                        &delta)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Overflow or currency incompatibility during aggregation at %llu\n", +                row_id); +    /* Skip this one, but keep going! */ +    return GNUNET_OK; +  } +  if (au->rows_offset >= aggregation_limit) +  { +    /* Bug: we asked for at most #aggregation_limit results! */ +    GNUNET_break (0); +    /* Skip this one, but keep going. */ +    return GNUNET_OK; +  } +  if (NULL == au->additional_rows) +    au->additional_rows = GNUNET_new_array (aggregation_limit, +                                            unsigned long long); +  /* "append" to our list of rows */ +  au->additional_rows[au->rows_offset++] = row_id; +  /* insert into aggregation tracking table */ +  if (GNUNET_OK != +      db_plugin->insert_aggregation_tracking (db_plugin->cls, +                                              au->session, +                                              &au->wtid, +                                              merchant_pub, +                                              &au->h_wire, +                                              h_contract, +                                              transaction_id, +                                              au->execution_time, +                                              coin_pub, +                                              amount_with_fee, +                                              deposit_fee)) +  { +    GNUNET_break (0); +    return GNUNET_SYSERR; +  } +  if (GNUNET_OK != +      db_plugin->mark_deposit_done (db_plugin->cls, +                                    au->session, +                                    row_id)) +  { +    GNUNET_break (0); +    au->failed = GNUNET_YES; +    return GNUNET_SYSERR; +  } +  return GNUNET_OK; +} + + +/** + * Function to be called with the prepared transfer data. + * + * @param cls closure with the `struct AggregationUnit` + * @param buf transaction data to persist, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +prepare_cb (void *cls, +            const char *buf, +            size_t buf_size); + + +/** + * Main work function that queries the DB and aggregates transactions + * into larger wire transfers. + * + * @param cls pointer to an `int` which we will return from main() + * @param tc scheduler context + */ +static void +run_aggregation (void *cls, +                 const struct GNUNET_SCHEDULER_TaskContext *tc) +{ +  int *global_ret = cls; +  struct TALER_EXCHANGEDB_Session *session; +  struct AggregationUnit *au; +  unsigned int i; +  int ret; + +  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) +    return; +  if (NULL == (session = db_plugin->get_session (db_plugin->cls, +                                                 GNUNET_NO))) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to obtain database session!\n"); +    *global_ret = GNUNET_SYSERR; +    return; +  } +  if (GNUNET_OK != +      db_plugin->start (db_plugin->cls, +                        session)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to start database transaction!\n"); +    *global_ret = GNUNET_SYSERR; +    return; +  } +  au = GNUNET_new (struct AggregationUnit); +  au->session = session; +  ret = db_plugin->get_ready_deposit (db_plugin->cls, +                                      session, +                                      &deposit_cb, +                                      au); +  if (GNUNET_OK != ret) +  { +    GNUNET_free (au); +    db_plugin->rollback (db_plugin->cls, +                         session); +    if (0 != ret) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                  "Failed to execute deposit iteration!\n"); +      *global_ret = GNUNET_SYSERR; +      return; +    } +    /* nothing to do, sleep for a minute and try again */ +    task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, +                                         &run_aggregation, +                                         global_ret); +    return; +  } +  /* Now try to find other deposits to aggregate */ +  ret = db_plugin->iterate_matching_deposits (db_plugin->cls, +                                              session, +                                              &au->h_wire, +                                              &au->merchant_pub, +                                              &aggregate_cb, +                                              au, +                                              aggregation_limit); +  if ( (GNUNET_SYSERR == ret) || +       (GNUNET_YES == au->failed) ) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to execute deposit iteration!\n"); +    GNUNET_free_non_null (au->additional_rows); +    GNUNET_free (au); +    db_plugin->rollback (db_plugin->cls, +                         session); +    *global_ret = GNUNET_SYSERR; +    return; +  } +  /* Round to the unit supported by the wire transfer method */ +  GNUNET_assert (GNUNET_SYSERR != +                 wire_plugin->amount_round (wire_plugin->cls, +                                            &au->total_amount)); +  /* Check if after rounding down, we still have an amount to transfer */ +  if ( (0 == au->total_amount.value) && +       (0 == au->total_amount.fraction) ) +  { +    /* Rollback ongoing transaction, as we will not use the respective +       WTID and thus need to remove the tracking data */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    /* Start another transaction to mark all* of the selected deposits +       *as minor! */ +    if (GNUNET_OK != +        db_plugin->start (db_plugin->cls, +                          session)) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                  "Failed to start database transaction!\n"); +      *global_ret = GNUNET_SYSERR; +      GNUNET_free_non_null (au->additional_rows); +      GNUNET_free (au); +      return; +    } +    /* Mark transactions by row_id as minor */ +    ret = GNUNET_OK; +    if (GNUNET_OK != +        db_plugin->mark_deposit_tiny (db_plugin->cls, +                                      session, +                                      au->row_id)) +      ret = GNUNET_SYSERR; +    else +      for (i=0;i<au->rows_offset;i++) +        if (GNUNET_OK != +            db_plugin->mark_deposit_tiny (db_plugin->cls, +                                          session, +                                          au->additional_rows[i])) +          ret = GNUNET_SYSERR; +    /* commit */ +    if (GNUNET_OK != +        db_plugin->commit (db_plugin->cls, +                           session)) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +                  "Failed to commit database transaction!\n"); +    } +    GNUNET_free_non_null (au->additional_rows); +    GNUNET_free (au); +    /* start again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } +  au->global_ret = global_ret; +  au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls, +                                               au->wire, +                                               &au->total_amount, +                                               &au->wtid, +                                               &prepare_cb, +                                               au); +  /* FIXME: currently we have no clean-up plan on +     shutdown to call prepare_wire_transfer_cancel! +     Maybe make 'au' global? */ +  if (NULL == au->ph) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    GNUNET_free_non_null (au->additional_rows); +    GNUNET_free (au); +    /* start again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } +  /* otherwise we continue with #prepare_cb(), see below */ +} + + +/** + * Execute the wire transfers that we have committed to + * do. + * + * @param cls pointer to an `int` which we will return from main() + * @param tc scheduler context + */ +static void +run_transfers (void *cls, +               const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * Function to be called with the prepared transfer data. + * + * @param cls closure with the `struct AggregationUnit` + * @param buf transaction data to persist, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +prepare_cb (void *cls, +            const char *buf, +            size_t buf_size) +{ +  struct AggregationUnit *au = cls; +  int *global_ret = au->global_ret; +  struct TALER_EXCHANGEDB_Session *session = au->session; + +  GNUNET_free_non_null (au->additional_rows); +  GNUNET_free (au); +  if (NULL == buf) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    /* start again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } + +  /* Commit our intention to execute the wire transfer! */ +  if (GNUNET_OK != +      db_plugin->wire_prepare_data_insert (db_plugin->cls, +                                           session, +                                           exchange_wireformat, +                                           buf, +                                           buf_size)) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    /* start again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } + +  /* Now we can finally commit the overall transaction, as we are +     again consistent if all of this passes. */ +  if (GNUNET_OK != +      db_plugin->commit (db_plugin->cls, +                         session)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Failed to commit database transaction!\n"); +    /* try again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } + +  /* run alternative task: actually do wire transfer! */ +  task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                   &global_ret); +} + + +/** + * Data we keep to #run_transfers(). + */ +struct WirePrepareData +{ + +  /** +   * Database session for all of our transactions. +   */ +  struct TALER_EXCHANGEDB_Session *session; + +  /** +   * Wire execution handle. +   */ +  struct TALER_WIRE_ExecuteHandle *eh; + +  /** +   * Pointer to global return value. Closure for #run(). +   */ +  int *global_ret; + + +  /** +   * Row ID of the transfer. +   */ +  unsigned long long row_id; + +}; + + +/** + * Function called with the result from the execute step. + * + * @param cls closure with the `struct WirePrepareData` + * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure + * @param emsg NULL on success, otherwise an error message + */ +static void +wire_confirm_cb (void *cls, +                 int success, +                 const char *emsg) +{ +  struct WirePrepareData *wpd = cls; +  int *global_ret = wpd->global_ret; +  struct TALER_EXCHANGEDB_Session *session = wpd->session; + +  wpd->eh = NULL; +  if (GNUNET_SYSERR == success) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Wire transaction failed: %s\n", +                emsg); +    db_plugin->rollback (db_plugin->cls, +                         session); +    *global_ret = GNUNET_SYSERR; +    GNUNET_free (wpd); +    return; +  } +  if (GNUNET_OK != +      db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, +                                                  session, +                                                  wpd->row_id)) +  { +    GNUNET_break (0); /* why!? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    *global_ret = GNUNET_SYSERR; +    GNUNET_free (wpd); +    return; +  } +  GNUNET_free (wpd); +  if (GNUNET_OK != +      db_plugin->commit (db_plugin->cls, +                         session)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Failed to commit database transaction!\n"); +    /* try again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } +  /* continue with #run_transfers(), just to guard +     against the unlikely case that there are more. */ +  task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                   &global_ret); + +} + + +/** + * Callback with data about a prepared transaction. + * + * @param cls closure with the `struct WirePrepareData` + * @param rowid row identifier used to mark prepared transaction as done + * @param buf transaction data that was persisted, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +wire_prepare_cb (void *cls, +                 unsigned long long rowid, +                 const char *buf, +                 size_t buf_size) +{ +  struct WirePrepareData *wpd = cls; +  int *global_ret = wpd->global_ret; + +  wpd->row_id = rowid; +  wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, +                                                buf, +                                                buf_size, +                                                &wire_confirm_cb, +                                                wpd); +  /* FIXME: currently we have no clean-up plan on +     shutdown to call execute_wire_transfer_cancel! +     Maybe make 'wpd' global? */ +  if (NULL == wpd->eh) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         wpd->session); +    *global_ret = GNUNET_SYSERR; +    GNUNET_free (wpd); +    return; +  } +} + + +/** + * Execute the wire transfers that we have committed to + * do. + * + * @param cls pointer to an `int` which we will return from main() + * @param tc scheduler context + */ +static void +run_transfers (void *cls, +               const struct GNUNET_SCHEDULER_TaskContext *tc) +{ +  int *global_ret = cls; +  int ret; +  struct WirePrepareData *wpd; +  struct TALER_EXCHANGEDB_Session *session; + +  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) +    return; +  if (NULL == (session = db_plugin->get_session (db_plugin->cls, +                                                 GNUNET_NO))) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to obtain database session!\n"); +    *global_ret = GNUNET_SYSERR; +    return; +  } +  if (GNUNET_OK != +      db_plugin->start (db_plugin->cls, +                        session)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to start database transaction!\n"); +    *global_ret = GNUNET_SYSERR; +    return; +  } +  wpd = GNUNET_new (struct WirePrepareData); +  wpd->session = session; +  wpd->global_ret = global_ret; +  ret = db_plugin->wire_prepare_data_get (db_plugin->cls, +                                          session, +                                          exchange_wireformat, +                                          &wire_prepare_cb, +                                          wpd); +  if (GNUNET_SYSERR == ret) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    *global_ret = GNUNET_SYSERR; +    GNUNET_free (wpd); +    return; +  } +  if (GNUNET_NO == ret) +  { +    /* no more prepared wire transfers, go back to aggregation! */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    GNUNET_free (wpd); +    return; +  } +  /* otherwise, continues in #wire_prepare_cb() */ +} + + +/** + * The main function of the taler-exchange-httpd server ("the exchange"). + * + * @param argc number of arguments from the command line + * @param argv command line arguments + * @return 0 ok, 1 on error + */ +int +main (int argc, +      char *const *argv) +{ +  static const struct GNUNET_GETOPT_CommandLineOption options[] = { +    {'d', "exchange-dir", "DIR", +     "exchange directory with configuration and keys for operating the exchange", 1, +     &GNUNET_GETOPT_set_filename, &exchange_directory}, +    {'f', "format", "WIREFORMAT", +     "wireformat to use, overrides WIREFORMAT option in [exchange] section", 1, +     &GNUNET_GETOPT_set_filename, &exchange_wireformat}, +    TALER_GETOPT_OPTION_HELP ("background process that aggregates and executes wire transfers to merchants"), +    GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION), +    GNUNET_GETOPT_OPTION_END +  }; +  int ret = GNUNET_OK; + +  GNUNET_assert (GNUNET_OK == +                 GNUNET_log_setup ("taler-exchange-aggregator", +                                   "INFO", +                                   NULL)); +  if (0 >= +      GNUNET_GETOPT_run ("taler-exchange-aggregator", +                         options, +                         argc, argv)) +    return 1; +  if (NULL == exchange_directory) +  { +    fprintf (stderr, +             "Exchange directory not specified\n"); +    return 1; +  } +  if (GNUNET_OK != +      exchange_serve_process_config (exchange_directory)) +  { +    return 1; +  } + +  GNUNET_SCHEDULER_run (&run_transfers, &ret); + +  TALER_EXCHANGEDB_plugin_unload (db_plugin); +  TALER_WIRE_plugin_unload (wire_plugin); +  return (GNUNET_SYSERR == ret) ? 1 : 0; +} + +/* end of taler-exchange-aggregator.c */ | 
