diff options
| -rw-r--r-- | src/include/taler_mint_service.h | 5 | ||||
| -rw-r--r-- | src/include/taler_mintdb_plugin.h | 6 | ||||
| -rw-r--r-- | src/include/taler_signatures.h | 14 | ||||
| -rw-r--r-- | src/mint-lib/mint_api_deposit_wtid.c | 9 | ||||
| -rw-r--r-- | src/mint/taler-mint-aggregator.c | 661 | ||||
| -rw-r--r-- | src/mint/taler-mint-httpd_db.c | 3 | ||||
| -rw-r--r-- | src/mint/taler-mint-httpd_responses.c | 5 | ||||
| -rw-r--r-- | src/mint/taler-mint-httpd_responses.h | 2 | ||||
| -rw-r--r-- | src/mintdb/plugin_mintdb_postgres.c | 25 | 
9 files changed, 640 insertions, 90 deletions
diff --git a/src/include/taler_mint_service.h b/src/include/taler_mint_service.h index b151cb00..1502edfb 100644 --- a/src/include/taler_mint_service.h +++ b/src/include/taler_mint_service.h @@ -1173,8 +1173,6 @@ struct TALER_MINT_DepositWtidHandle;   *                  yet execute the transaction   * @param execution_time actual or planned execution time for the wire transfer   * @param coin_contribution contribution to the @a total_amount of the deposited coin (may be NULL) - * @param total_amount total amount of the wire transfer, or NULL if the mint could - *             not provide any @a wtid (set only if @a http_status is #MHD_HTTP_OK)   */  typedef void  (*TALER_MINT_DepositWtidCallback)(void *cls, @@ -1182,8 +1180,7 @@ typedef void                                    json_t *json,                                    const struct TALER_WireTransferIdentifierRawP *wtid,                                    struct GNUNET_TIME_Absolute execution_time, -                                  const struct TALER_Amount *coin_contribution, -                                  const struct TALER_Amount *total_amount); +                                  const struct TALER_Amount *coin_contribution);  /** diff --git a/src/include/taler_mintdb_plugin.h b/src/include/taler_mintdb_plugin.h index 7c48114b..d2cc3d76 100644 --- a/src/include/taler_mintdb_plugin.h +++ b/src/include/taler_mintdb_plugin.h @@ -583,7 +583,6 @@ typedef void   * @param coin_contribution how much did the coin we asked about   *        contribute to the total transfer value? (deposit value including fee)   * @param coin_fee how much did the mint charge for the deposit fee - * @param total_amount how much was the total wire transfer?   * @param execution_time when was the transaction done, or   *         when we expect it to be done (if @a wtid was NULL)   */ @@ -592,7 +591,6 @@ typedef void  				    const struct TALER_WireTransferIdentifierRawP *wtid,                                      const struct TALER_Amount *coin_contribution,                                      const struct TALER_Amount *coin_fee, -                                    const struct TALER_Amount *total_amount,  				    struct GNUNET_TIME_Absolute execution_time); @@ -1360,7 +1358,6 @@ struct TALER_MINTDB_Plugin     * @param coin_pub which public key was this payment about     * @param coin_value amount contributed by this coin in total     * @param coin_fee deposit fee charged by mint for this coin -   * @param transfer_value total amount of the wire transfer     * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors     */    int @@ -1374,8 +1371,7 @@ struct TALER_MINTDB_Plugin                                   struct GNUNET_TIME_Absolute execution_time,                                   const struct TALER_CoinSpendPublicKeyP *coin_pub,                                   const struct TALER_Amount *coin_value, -                                 const struct TALER_Amount *coin_fee, -                                 const struct TALER_Amount *transfer_value); +                                 const struct TALER_Amount *coin_fee);    /** diff --git a/src/include/taler_signatures.h b/src/include/taler_signatures.h index 85c681da..2526597e 100644 --- a/src/include/taler_signatures.h +++ b/src/include/taler_signatures.h @@ -953,20 +953,6 @@ struct TALER_ConfirmWirePS     */    struct TALER_AmountNBO coin_contribution; -  /** -   * The total amount the mint transferred in the transaction. -   * Note that we may be aggregating multiple coin's @e coin_contribution -   * values into a single wire transfer, so this value may be larger -   * than that of @e coin_contribution.  It may also be smaller, as -   * @e coin_contribution may be say "1.123456" but the wire unit may -   * be rounded down, i.e. to "1.12" (depending on the transfer method). -   * -   * Note that the mint books the deltas from rounding down as profit, -   * so aggregating transfers is a good thing for the merchant (as it -   * reduces rounding down expenses). -   */ -  struct TALER_AmountNBO total_amount; -  };  GNUNET_NETWORK_STRUCT_END diff --git a/src/mint-lib/mint_api_deposit_wtid.c b/src/mint-lib/mint_api_deposit_wtid.c index 50f9c55d..d29f406e 100644 --- a/src/mint-lib/mint_api_deposit_wtid.c +++ b/src/mint-lib/mint_api_deposit_wtid.c @@ -148,9 +148,7 @@ handle_deposit_wtid_finished (void *cls,    const struct TALER_WireTransferIdentifierRawP *wtid = NULL;    struct GNUNET_TIME_Absolute execution_time = GNUNET_TIME_UNIT_FOREVER_ABS;    const struct TALER_Amount *coin_contribution = NULL; -  const struct TALER_Amount *total_amount = NULL;    struct TALER_Amount coin_contribution_s; -  struct TALER_Amount total_amount_s;    dwh->job = NULL;    json = MAC_download_get_result (&dwh->db, @@ -166,7 +164,6 @@ handle_deposit_wtid_finished (void *cls,          MAJ_spec_fixed_auto ("wtid", &dwh->depconf.wtid),          MAJ_spec_absolute_time ("execution_time", &execution_time),          MAJ_spec_amount ("coin_contribution", &coin_contribution_s), -        MAJ_spec_amount ("total_amount", &total_amount_s),          MAJ_spec_end        }; @@ -183,9 +180,6 @@ handle_deposit_wtid_finished (void *cls,        TALER_amount_hton (&dwh->depconf.coin_contribution,                           &coin_contribution_s);        coin_contribution = &coin_contribution_s; -      TALER_amount_hton (&dwh->depconf.total_amount, -                         &total_amount_s); -      total_amount = &total_amount_s;        if (GNUNET_OK !=            verify_deposit_wtid_signature_ok (dwh,                                              json)) @@ -244,8 +238,7 @@ handle_deposit_wtid_finished (void *cls,             json,             wtid,             execution_time, -           coin_contribution, -           total_amount); +           coin_contribution);    json_decref (json);    TALER_MINT_deposit_wtid_cancel (dwh);  } diff --git a/src/mint/taler-mint-aggregator.c b/src/mint/taler-mint-aggregator.c index ee0f6ab2..5e05c867 100644 --- a/src/mint/taler-mint-aggregator.c +++ b/src/mint/taler-mint-aggregator.c @@ -18,6 +18,10 @@   * @file taler-mint-aggregator.c   * @brief Process that aggregates outgoing transactions and executes them   * @author Christian Grothoff + * + * TODO: + * - simplify global_ret: make it a global! + * - handle shutdown more nicely (call 'cancel' method on wire transfers)   */  #include "platform.h"  #include <gnunet/gnunet_util_lib.h> @@ -62,6 +66,16 @@ static struct TALER_WIRE_Plugin *wire_plugin;   */  static struct GNUNET_SCHEDULER_Task *task; +/** + * Limit on the number of transactions we aggregate at once.  Note + * that the limit must be big enough to ensure that when transactions + * of the smallest possible unit are aggregated, they do surpass the + * "tiny" threshold beyond which we never trigger a wire transaction! + * + * TODO: make configurable (via config file or command line option) + */ +static unsigned int aggregation_limit = 10000; +  /**   * Load configuration parameters for the mint @@ -144,10 +158,85 @@ mint_serve_process_config (const char *mint_directory)  /** + * Information about one aggregation process to + * be executed. + */ +struct AggregationUnit +{ +  /** +   * Public key of the merchant. +   */ +  struct TALER_MerchantPublicKeyP merchant_pub; + +  /** +   * Total amount to be transferred. +   */ +  struct TALER_Amount total_amount; + +  /** +   * Hash of @e wire. +   */ +  struct GNUNET_HashCode h_wire; + +  /** +   * Wire transfer identifier we use. +   */ +  struct TALER_WireTransferIdentifierRawP wtid; + +  /** +   * Row ID of the transaction that started it all. +   */ +  unsigned long long row_id; + +  /** +   * The current time. +   */ +  struct GNUNET_TIME_Absolute execution_time; + +  /** +   * Wire details of the merchant. +   */ +  json_t *wire; + +  /** +   * Database session for all of our transactions. +   */ +  struct TALER_MINTDB_Session *session; + +  /** +   * Wire preparation handle. +   */ +  struct TALER_WIRE_PrepareHandle *ph; + +  /** +   * Array of #aggregation_limit row_ids from the +   * aggregation. +   */ +  unsigned long long *additional_rows; + +  /** +   * Pointer to global return value. Closure for #run(). +   */ +  int *global_ret; + +  /** +   * Offset specifying how many #additional_rows are in use. +   */ +  unsigned int rows_offset; + +  /** +   * Set to #GNUNET_YES if we have to abort due to failure. +   */ +  int failed; + +}; + + +/**   * Function called with details about deposits that have been made,   * with the goal of executing the corresponding wire transaction.   * - * @param cls closure + * @param cls closure with the `struct AggregationUnit`   * @param row_id identifies database entry   * @param merchant_pub public key of the merchant   * @param coin_pub public key of the coin @@ -172,20 +261,187 @@ deposit_cb (void *cls,              struct GNUNET_TIME_Absolute wire_deadline,              const json_t *wire)  { -  /* FIXME: compute aggregates, etc. */ +  struct AggregationUnit *au = cls; + +  au->merchant_pub = *merchant_pub; +  if (GNUNET_OK != +      TALER_amount_subtract (&au->total_amount, +                             amount_with_fee, +                             deposit_fee)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Fatally malformed record at %llu\n", +                row_id); +    return GNUNET_SYSERR; +  } +  au->row_id = row_id; +  au->wire = (json_t *) wire; +  au->execution_time = GNUNET_TIME_absolute_get (); +  TALER_hash_json (au->wire, +                   &au->h_wire); +  json_incref (au->wire); +  GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, +                              &au->wtid, +                              sizeof (au->wtid)); +  if (GNUNET_OK != +      db_plugin->insert_aggregation_tracking (db_plugin->cls, +                                              au->session, +                                              &au->wtid, +                                              merchant_pub, +                                              &au->h_wire, +                                              h_contract, +                                              transaction_id, +                                              au->execution_time, +                                              coin_pub, +                                              amount_with_fee, +                                              deposit_fee)) +  { +    GNUNET_break (0); +    return GNUNET_SYSERR; +  } +  if (GNUNET_OK != +      db_plugin->mark_deposit_done (db_plugin->cls, +                                    au->session, +                                    row_id)) +  { +    GNUNET_break (0); +    au->failed = GNUNET_YES; +    return GNUNET_SYSERR; +  } +  return GNUNET_OK; +} + + + +/** + * Function called with details about another deposit we + * can aggregate into an existing aggregation unit. + * + * @param cls closure with the `struct AggregationUnit` + * @param row_id identifies database entry + * @param merchant_pub public key of the merchant + * @param coin_pub public key of the coin + * @param amount_with_fee amount that was deposited including fee + * @param deposit_fee amount the mint gets to keep as transaction fees + * @param transaction_id unique transaction ID chosen by the merchant + * @param h_contract hash of the contract between merchant and customer + * @param wire_deadline by which the merchant adviced that he would like the + *        wire transfer to be executed + * @param wire wire details for the merchant + * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + */ +static int +aggregate_cb (void *cls, +              unsigned long long row_id, +              const struct TALER_MerchantPublicKeyP *merchant_pub, +              const struct TALER_CoinSpendPublicKeyP *coin_pub, +              const struct TALER_Amount *amount_with_fee, +              const struct TALER_Amount *deposit_fee, +              uint64_t transaction_id, +              const struct GNUNET_HashCode *h_contract, +              struct GNUNET_TIME_Absolute wire_deadline, +              const json_t *wire) +{ +  struct AggregationUnit *au = cls; +  struct TALER_Amount delta; + +  GNUNET_break (0 == +                memcmp (&au->merchant_pub, +                        merchant_pub, +                        sizeof (struct TALER_MerchantPublicKeyP))); +  /* compute contribution of this coin after fees */ +  if (GNUNET_OK != +      TALER_amount_subtract (&delta, +                             amount_with_fee, +                             deposit_fee)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Fatally malformed record at %llu\n", +                row_id); +    return GNUNET_SYSERR; +  } +  /* add to total */ +  if (GNUNET_OK != +      TALER_amount_add (&au->total_amount, +                        &au->total_amount, +                        &delta)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Overflow or currency incompatibility during aggregation at %llu\n", +                row_id); +    /* Skip this one, but keep going! */ +    return GNUNET_OK; +  } +  if (au->rows_offset >= aggregation_limit) +  { +    /* Bug: we asked for at most #aggregation_limit results! */ +    GNUNET_break (0); +    /* Skip this one, but keep going. */ +    return GNUNET_OK; +  } +  if (NULL == au->additional_rows) +    au->additional_rows = GNUNET_new_array (aggregation_limit, +                                            unsigned long long); +  /* "append" to our list of rows */ +  au->additional_rows[au->rows_offset++] = row_id; +  /* insert into aggregation tracking table */ +  if (GNUNET_OK != +      db_plugin->insert_aggregation_tracking (db_plugin->cls, +                                              au->session, +                                              &au->wtid, +                                              merchant_pub, +                                              &au->h_wire, +                                              h_contract, +                                              transaction_id, +                                              au->execution_time, +                                              coin_pub, +                                              amount_with_fee, +                                              deposit_fee)) +  { +    GNUNET_break (0); +    return GNUNET_SYSERR; +  } +  if (GNUNET_OK != +      db_plugin->mark_deposit_done (db_plugin->cls, +                                    au->session, +                                    row_id)) +  { +    GNUNET_break (0); +    au->failed = GNUNET_YES; +    return GNUNET_SYSERR; +  }    return GNUNET_OK;  }  /** - * Main work function that queries the DB and executes transactions. + * Function to be called with the prepared transfer data. + * + * @param cls closure with the `struct AggregationUnit` + * @param buf transaction data to persist, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +prepare_cb (void *cls, +            const char *buf, +            size_t buf_size); + + +/** + * Main work function that queries the DB and aggregates transactions + * into larger wire transfers. + * + * @param cls pointer to an `int` which we will return from main() + * @param tc scheduler context   */  static void -run (void *cls, -     const struct GNUNET_SCHEDULER_TaskContext *tc) +run_aggregation (void *cls, +                 const struct GNUNET_SCHEDULER_TaskContext *tc)  {    int *global_ret = cls;    struct TALER_MINTDB_Session *session; +  struct AggregationUnit *au; +  unsigned int i;    int ret;    if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) @@ -207,50 +463,399 @@ run (void *cls,      *global_ret = GNUNET_SYSERR;      return;    } +  au = GNUNET_new (struct AggregationUnit); +  au->session = session;    ret = db_plugin->get_ready_deposit (db_plugin->cls,                                        session,                                        &deposit_cb, -                                      NULL); -  // FIXME: handle 0 == ret... - +                                      au);    if (GNUNET_OK != ret)    { +    GNUNET_free (au); +    db_plugin->rollback (db_plugin->cls, +                         session); +    if (0 != ret) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                  "Failed to execute deposit iteration!\n"); +      *global_ret = GNUNET_SYSERR; +      return; +    } +    /* nothing to do, sleep for a minute and try again */ +    task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, +                                         &run_aggregation, +                                         global_ret); +    return; +  } +  /* Now try to find other deposits to aggregate */ +  ret = db_plugin->iterate_matching_deposits (db_plugin->cls, +                                              session, +                                              &au->h_wire, +                                              &au->merchant_pub, +                                              &aggregate_cb, +                                              au, +                                              aggregation_limit); +  if ( (GNUNET_SYSERR == ret) || +       (GNUNET_YES == au->failed) ) +  {      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,                  "Failed to execute deposit iteration!\n"); +    GNUNET_free_non_null (au->additional_rows); +    GNUNET_free (au); +    db_plugin->rollback (db_plugin->cls, +                         session);      *global_ret = GNUNET_SYSERR; +    return; +  } +  /* Round to the unit supported by the wire transfer method */ +  GNUNET_assert (GNUNET_SYSERR != +                 wire_plugin->amount_round (wire_plugin->cls, +                                            &au->total_amount)); +  /* Check if after rounding down, we still have an amount to transfer */ +  if ( (0 == au->total_amount.value) && +       (0 == au->total_amount.fraction) ) +  { +    /* Rollback ongoing transaction, as we will not use the respective +       WTID and thus need to remove the tracking data */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    /* Start another transaction to mark all* of the selected deposits +       *as minor! */ +    if (GNUNET_OK != +        db_plugin->start (db_plugin->cls, +                          session)) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                  "Failed to start database transaction!\n"); +      *global_ret = GNUNET_SYSERR; +      GNUNET_free_non_null (au->additional_rows); +      GNUNET_free (au); +      return; +    } +    /* Mark transactions by row_id as minor */ +    ret = GNUNET_OK; +    if (GNUNET_OK != +        db_plugin->mark_deposit_tiny (db_plugin->cls, +                                      session, +                                      au->row_id)) +      ret = GNUNET_SYSERR; +    else +      for (i=0;i<au->rows_offset;i++) +        if (GNUNET_OK != +            db_plugin->mark_deposit_tiny (db_plugin->cls, +                                          session, +                                          au->additional_rows[i])) +          ret = GNUNET_SYSERR; +    /* commit */ +    if (GNUNET_OK != +        db_plugin->commit (db_plugin->cls, +                           session)) +    { +      GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +                  "Failed to commit database transaction!\n"); +    } +    GNUNET_free_non_null (au->additional_rows); +    GNUNET_free (au); +    /* start again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } +  au->global_ret = global_ret; +  au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls, +                                               au->wire, +                                               &au->total_amount, +                                               &au->wtid, +                                               &prepare_cb, +                                               au); +  /* FIXME: currently we have no clean-up plan on +     shutdown to call prepare_wire_transfer_cancel! +     Maybe make 'au' global? */ +  if (NULL == au->ph) +  { +    GNUNET_break (0); /* why? how to best recover? */      db_plugin->rollback (db_plugin->cls,                           session); +    GNUNET_free_non_null (au->additional_rows); +    GNUNET_free (au); +    /* start again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret);      return;    } -  /* FIXME: finish aggregate computation */ -  /* wire_plugin->prepare_wire_transfer () -- ASYNC! */ -  /* db_plugin->wire_prepare_data_insert () -- transactional! */ -  /* db_plugin->XXX () -- mark transactions selected for aggregate as finished */ +  /* otherwise we continue with #prepare_cb(), see below */ +} + -  /* then finally: commit! */ +/** + * Execute the wire transfers that we have committed to + * do. + * + * @param cls pointer to an `int` which we will return from main() + * @param tc scheduler context + */ +static void +run_transfers (void *cls, +               const struct GNUNET_SCHEDULER_TaskContext *tc); + + +/** + * Function to be called with the prepared transfer data. + * + * @param cls closure with the `struct AggregationUnit` + * @param buf transaction data to persist, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +prepare_cb (void *cls, +            const char *buf, +            size_t buf_size) +{ +  struct AggregationUnit *au = cls; +  int *global_ret = au->global_ret; +  struct TALER_MINTDB_Session *session = au->session; + +  GNUNET_free_non_null (au->additional_rows); +  GNUNET_free (au); +  if (NULL == buf) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    /* start again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } + +  /* Commit our intention to execute the wire transfer! */ +  if (GNUNET_OK != +      db_plugin->wire_prepare_data_insert (db_plugin->cls, +                                           session, +                                           mint_wireformat, +                                           buf, +                                           buf_size)) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    /* start again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } + +  /* Now we can finally commit the overall transaction, as we are +     again consistent if all of this passes. */    if (GNUNET_OK !=        db_plugin->commit (db_plugin->cls,                           session))    { -    GNUNET_log (GNUNET_ERROR_TYPE_WARNING, +    GNUNET_log (GNUNET_ERROR_TYPE_INFO,                  "Failed to commit database transaction!\n"); +    /* try again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return;    } -  /* While possible, run 2nd type of transaction: -     db_plugin->start() -     - select pre-commit data from DB: -     db_plugin->wire_prepare_data_iterate () -     - execute wire transfer (successfully!) -     wire_plugin->execute_wire_transfer() # ASYNC! -     db_plugin->wire_prepare_data_mark_finished () -     db_plugin->insert_aggregation_tracking () -     db_plugin->commit() -  */ +  /* run alternative task: actually do wire transfer! */ +  task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                   &global_ret); +} + + +/** + * Data we keep to #run_transfers(). + */ +struct WirePrepareData +{ + +  /** +   * Database session for all of our transactions. +   */ +  struct TALER_MINTDB_Session *session; + +  /** +   * Wire execution handle. +   */ +  struct TALER_WIRE_ExecuteHandle *eh; + +  /** +   * Pointer to global return value. Closure for #run(). +   */ +  int *global_ret; + + +  /** +   * Row ID of the transfer. +   */ +  unsigned long long row_id; +}; -  task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS /* FIXME: adjust! */, -                                       &run, -                                       global_ret); + +/** + * Function called with the result from the execute step. + * + * @param cls closure with the `struct WirePrepareData` + * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure + * @param emsg NULL on success, otherwise an error message + */ +static void +wire_confirm_cb (void *cls, +                 int success, +                 const char *emsg) +{ +  struct WirePrepareData *wpd = cls; +  int *global_ret = wpd->global_ret; +  struct TALER_MINTDB_Session *session = wpd->session; + +  wpd->eh = NULL; +  if (GNUNET_SYSERR == success) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Wire transaction failed: %s\n", +                emsg); +    db_plugin->rollback (db_plugin->cls, +                         session); +    *global_ret = GNUNET_SYSERR; +    GNUNET_free (wpd); +    return; +  } +  if (GNUNET_OK != +      db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, +                                                  session, +                                                  wpd->row_id)) +  { +    GNUNET_break (0); /* why!? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    *global_ret = GNUNET_SYSERR; +    GNUNET_free (wpd); +    return; +  } +  GNUNET_free (wpd); +  if (GNUNET_OK != +      db_plugin->commit (db_plugin->cls, +                         session)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_INFO, +                "Failed to commit database transaction!\n"); +    /* try again */ +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    return; +  } +  /* continue with #run_transfers(), just to guard +     against the unlikely case that there are more. */ +  task = GNUNET_SCHEDULER_add_now (&run_transfers, +                                   &global_ret); + +} + + +/** + * Callback with data about a prepared transaction. + * + * @param cls closure with the `struct WirePrepareData` + * @param rowid row identifier used to mark prepared transaction as done + * @param buf transaction data that was persisted, NULL on error + * @param buf_size number of bytes in @a buf, 0 on error + */ +static void +wire_prepare_cb (void *cls, +                 unsigned long long rowid, +                 const char *buf, +                 size_t buf_size) +{ +  struct WirePrepareData *wpd = cls; +  int *global_ret = wpd->global_ret; + +  wpd->row_id = rowid; +  wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, +                                                buf, +                                                buf_size, +                                                &wire_confirm_cb, +                                                wpd); +  /* FIXME: currently we have no clean-up plan on +     shutdown to call execute_wire_transfer_cancel! +     Maybe make 'wpd' global? */ +  if (NULL == wpd->eh) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         wpd->session); +    *global_ret = GNUNET_SYSERR; +    GNUNET_free (wpd); +    return; +  } +} + + +/** + * Execute the wire transfers that we have committed to + * do. + * + * @param cls pointer to an `int` which we will return from main() + * @param tc scheduler context + */ +static void +run_transfers (void *cls, +               const struct GNUNET_SCHEDULER_TaskContext *tc) +{ +  int *global_ret = cls; +  int ret; +  struct WirePrepareData *wpd; +  struct TALER_MINTDB_Session *session; + +  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) +    return; +  if (NULL == (session = db_plugin->get_session (db_plugin->cls, +                                                 GNUNET_NO))) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to obtain database session!\n"); +    *global_ret = GNUNET_SYSERR; +    return; +  } +  if (GNUNET_OK != +      db_plugin->start (db_plugin->cls, +                        session)) +  { +    GNUNET_log (GNUNET_ERROR_TYPE_ERROR, +                "Failed to start database transaction!\n"); +    *global_ret = GNUNET_SYSERR; +    return; +  } +  wpd = GNUNET_new (struct WirePrepareData); +  wpd->session = session; +  wpd->global_ret = global_ret; +  ret = db_plugin->wire_prepare_data_get (db_plugin->cls, +                                          session, +                                          mint_wireformat, +                                          &wire_prepare_cb, +                                          wpd); +  if (GNUNET_SYSERR == ret) +  { +    GNUNET_break (0); /* why? how to best recover? */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    *global_ret = GNUNET_SYSERR; +    GNUNET_free (wpd); +    return; +  } +  if (GNUNET_NO == ret) +  { +    /* no more prepared wire transfers, go back to aggregation! */ +    db_plugin->rollback (db_plugin->cls, +                         session); +    task = GNUNET_SCHEDULER_add_now (&run_aggregation, +                                     global_ret); +    GNUNET_free (wpd); +    return; +  } +  /* otherwise, continues in #wire_prepare_cb() */  } @@ -299,7 +904,7 @@ main (int argc,      return 1;    } -  GNUNET_SCHEDULER_run (&run, &ret); +  GNUNET_SCHEDULER_run (&run_transfers, &ret);    TALER_MINTDB_plugin_unload (db_plugin);    TALER_WIRE_plugin_unload (wire_plugin); diff --git a/src/mint/taler-mint-httpd_db.c b/src/mint/taler-mint-httpd_db.c index c39cbbcf..b93ead3a 100644 --- a/src/mint/taler-mint-httpd_db.c +++ b/src/mint/taler-mint-httpd_db.c @@ -1804,7 +1804,6 @@ struct DepositWtidContext   * @param coin_contribution how much did the coin we asked about   *        contribute to the total transfer value? (deposit value including fee)   * @param coin_fee how much did the mint charge for the deposit fee - * @param total_amount how much was the total wire transfer?   * @param execution_time when was the transaction done, or   *         when we expect it to be done (if @a wtid was NULL);   *         #GNUNET_TIME_UNIT_FOREVER_ABS if the /deposit is unknown @@ -1815,7 +1814,6 @@ handle_wtid_data (void *cls,  		  const struct TALER_WireTransferIdentifierRawP *wtid,                    const struct TALER_Amount *coin_contribution,                    const struct TALER_Amount *coin_fee, -                  const struct TALER_Amount *total_amount,  		  struct GNUNET_TIME_Absolute execution_time)  {    struct DepositWtidContext *ctx = cls; @@ -1843,7 +1841,6 @@ handle_wtid_data (void *cls,                                                    &ctx->h_wire,                                                    &ctx->coin_pub,                                                    &coin_delta, -                                                  total_amount,                                                    ctx->transaction_id,                                                    wtid,                                                    execution_time); diff --git a/src/mint/taler-mint-httpd_responses.c b/src/mint/taler-mint-httpd_responses.c index 041f694b..2ebd0d33 100644 --- a/src/mint/taler-mint-httpd_responses.c +++ b/src/mint/taler-mint-httpd_responses.c @@ -1097,7 +1097,6 @@ TMH_RESPONSE_reply_deposit_pending (struct MHD_Connection *connection,   * @param coin_pub public key of the coin   * @param coin_contribution how much did the coin we asked about   *        contribute to the total transfer value? (deposit value minus fee) - * @param total_amount how much was the total wire transfer?   * @param transaction_id merchant transaction identifier   * @param wtid raw wire transfer identifier   * @param exec_time execution time of the wire transfer @@ -1109,7 +1108,6 @@ TMH_RESPONSE_reply_deposit_wtid (struct MHD_Connection *connection,                                   const struct GNUNET_HashCode *h_wire,                                   const struct TALER_CoinSpendPublicKeyP *coin_pub,                                   const struct TALER_Amount *coin_contribution, -                                 const struct TALER_Amount *total_amount,                                   uint64_t transaction_id,  				 const struct TALER_WireTransferIdentifierRawP *wtid,                                   struct GNUNET_TIME_Absolute exec_time) @@ -1128,8 +1126,6 @@ TMH_RESPONSE_reply_deposit_wtid (struct MHD_Connection *connection,    cw.execution_time = GNUNET_TIME_absolute_hton (exec_time);    TALER_amount_hton (&cw.coin_contribution,                       coin_contribution); -  TALER_amount_hton (&cw.total_amount, -                     total_amount);    TMH_KS_sign (&cw.purpose,                 &pub,                 &sig); @@ -1140,7 +1136,6 @@ TMH_RESPONSE_reply_deposit_wtid (struct MHD_Connection *connection,                                                                       sizeof (*wtid)),                                         "execution_time", TALER_json_from_abs (exec_time),                                         "coin_contribution", TALER_json_from_amount (coin_contribution), -                                       "total_amount", TALER_json_from_amount (total_amount),                                         "mint_sig", TALER_json_from_data (&sig,                                                                           sizeof (sig)),                                         "mint_pub", TALER_json_from_data (&pub, diff --git a/src/mint/taler-mint-httpd_responses.h b/src/mint/taler-mint-httpd_responses.h index caad2904..a0396c8a 100644 --- a/src/mint/taler-mint-httpd_responses.h +++ b/src/mint/taler-mint-httpd_responses.h @@ -280,6 +280,7 @@ TMH_RESPONSE_reply_deposit_pending (struct MHD_Connection *connection,   * @param h_contract hash of the contract   * @param h_wire hash of wire account details   * @param coin_pub public key of the coin + * @param coin_contribution contribution of this coin to the total amount transferred   * @param transaction_id merchant transaction identifier   * @param wtid raw wire transfer identifier   * @param exec_time execution time of the wire transfer @@ -291,7 +292,6 @@ TMH_RESPONSE_reply_deposit_wtid (struct MHD_Connection *connection,                                   const struct GNUNET_HashCode *h_wire,                                   const struct TALER_CoinSpendPublicKeyP *coin_pub,                                   const struct TALER_Amount *coin_contribution, -                                 const struct TALER_Amount *total_amount,                                   uint64_t transaction_id,  				 const struct TALER_WireTransferIdentifierRawP *wtid,                                   struct GNUNET_TIME_Absolute exec_time); diff --git a/src/mintdb/plugin_mintdb_postgres.c b/src/mintdb/plugin_mintdb_postgres.c index fc204f5e..2ab3e81a 100644 --- a/src/mintdb/plugin_mintdb_postgres.c +++ b/src/mintdb/plugin_mintdb_postgres.c @@ -466,9 +466,6 @@ postgres_create_tables (void *cls,            ",coin_fee_val INT8 NOT NULL"            ",coin_fee_frac INT4 NOT NULL"            ",coin_fee_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL" -          ",transfer_total_val INT8 NOT NULL" -          ",transfer_total_frac INT4 NOT NULL" -          ",transfer_total_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"            ")");    /* Index for lookup_transactions statement on wtid */    SQLEXEC_INDEX("CREATE INDEX aggregation_tracking_wtid_index " @@ -1090,9 +1087,6 @@ postgres_prepare (PGconn *db_conn)             ",coin_fee_val"             ",coin_fee_frac"             ",coin_fee_curr" -           ",transfer_total_val" -           ",transfer_total_frac" -           ",transfer_total_curr"             " FROM aggregation_tracking"             " WHERE wtid_raw=$1",             1, NULL); @@ -1108,9 +1102,6 @@ postgres_prepare (PGconn *db_conn)             ",coin_fee_val"             ",coin_fee_frac"             ",coin_fee_curr" -           ",transfer_total_val" -           ",transfer_total_frac" -           ",transfer_total_curr"             " FROM aggregation_tracking"             " WHERE"             " coin_pub=$1 AND" @@ -1136,12 +1127,9 @@ postgres_prepare (PGconn *db_conn)             ",coin_fee_val"             ",coin_fee_frac"             ",coin_fee_curr" -           ",transfer_total_val" -           ",transfer_total_frac" -           ",transfer_total_curr"             ") VALUES " -           "($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)", -           16, NULL); +           "($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)", +           13, NULL);    /* Used in #postgres_wire_prepare_data_insert() to store @@ -3950,7 +3938,6 @@ postgres_wire_lookup_deposit_wtid (void *cls,            NULL,            &coin_amount,            &coin_fee, -          NULL,            exec_time);        PQclear (result);        return GNUNET_YES; @@ -3967,13 +3954,11 @@ postgres_wire_lookup_deposit_wtid (void *cls,      struct GNUNET_TIME_Absolute exec_time;      struct TALER_Amount coin_amount;      struct TALER_Amount coin_fee; -    struct TALER_Amount transaction_amount;      struct TALER_PQ_ResultSpec rs[] = {        TALER_PQ_result_spec_auto_from_type ("wtid_raw", &wtid),        TALER_PQ_result_spec_absolute_time ("execution_time", &exec_time),        TALER_PQ_result_spec_amount ("coin_amount", &coin_amount),        TALER_PQ_result_spec_amount ("coin_fee", &coin_fee), -      TALER_PQ_result_spec_amount ("transfer_total", &transaction_amount),        TALER_PQ_result_spec_end      };      if (GNUNET_OK != TALER_PQ_extract_result (result, rs, 0)) @@ -3986,7 +3971,6 @@ postgres_wire_lookup_deposit_wtid (void *cls,          &wtid,          &coin_amount,          &coin_fee, -        &transaction_amount,          exec_time);    }    PQclear (result); @@ -4007,7 +3991,6 @@ postgres_wire_lookup_deposit_wtid (void *cls,   * @param coin_pub which public key was this payment about   * @param coin_value amount contributed by this coin in total   * @param coin_fee deposit fee charged by mint for this coin - * @param transfer_value total amount of the wire transfer   * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors   */  static int @@ -4021,8 +4004,7 @@ postgres_insert_aggregation_tracking (void *cls,                                        struct GNUNET_TIME_Absolute execution_time,                                        const struct TALER_CoinSpendPublicKeyP *coin_pub,                                        const struct TALER_Amount *coin_value, -                                      const struct TALER_Amount *coin_fee, -                                      const struct TALER_Amount *transfer_value) +                                      const struct TALER_Amount *coin_fee)  {    struct TALER_PQ_QueryParam params[] = {      TALER_PQ_query_param_auto_from_type (h_contract), @@ -4034,7 +4016,6 @@ postgres_insert_aggregation_tracking (void *cls,      TALER_PQ_query_param_absolute_time (&execution_time),      TALER_PQ_query_param_amount (coin_value),      TALER_PQ_query_param_amount (coin_fee), -    TALER_PQ_query_param_amount (transfer_value),      TALER_PQ_query_param_end    };    PGresult *result;  | 
