Merge branch 'master' of git+ssh://taler.net/var/git/exchange
This commit is contained in:
commit
148dda09d4
2
.gitignore
vendored
2
.gitignore
vendored
@ -38,7 +38,7 @@ src/exchange-tools/taler-exchange-dbinit
|
|||||||
src/exchange-tools/taler-exchange-keycheck
|
src/exchange-tools/taler-exchange-keycheck
|
||||||
src/exchange-tools/taler-exchange-keyup
|
src/exchange-tools/taler-exchange-keyup
|
||||||
src/exchange-tools/taler-exchange-reservemod
|
src/exchange-tools/taler-exchange-reservemod
|
||||||
src/exchange-tools/taler-exchange-sepa
|
src/exchange-tools/taler-exchange-wire
|
||||||
src/exchangedb/perf-exchangedb
|
src/exchangedb/perf-exchangedb
|
||||||
src/json/test_json
|
src/json/test_json
|
||||||
src/wire/test_sepa_wireformat
|
src/wire/test_sepa_wireformat
|
||||||
|
@ -18,10 +18,6 @@
|
|||||||
* @file taler-exchange-aggregator.c
|
* @file taler-exchange-aggregator.c
|
||||||
* @brief Process that aggregates outgoing transactions and executes them
|
* @brief Process that aggregates outgoing transactions and executes them
|
||||||
* @author Christian Grothoff
|
* @author Christian Grothoff
|
||||||
*
|
|
||||||
* TODO:
|
|
||||||
* - simplify global_ret: make it a global!
|
|
||||||
* - handle shutdown more nicely (call 'cancel' method on wire transfers)
|
|
||||||
*/
|
*/
|
||||||
#include "platform.h"
|
#include "platform.h"
|
||||||
#include <gnunet/gnunet_util_lib.h>
|
#include <gnunet/gnunet_util_lib.h>
|
||||||
@ -32,6 +28,102 @@
|
|||||||
#include "taler_json_lib.h"
|
#include "taler_json_lib.h"
|
||||||
#include "taler_wire_lib.h"
|
#include "taler_wire_lib.h"
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Data we keep to #run_transfers(). There is at most
|
||||||
|
* one of these around at any given point in time.
|
||||||
|
*/
|
||||||
|
struct WirePrepareData
|
||||||
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Database session for all of our transactions.
|
||||||
|
*/
|
||||||
|
struct TALER_EXCHANGEDB_Session *session;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wire execution handle.
|
||||||
|
*/
|
||||||
|
struct TALER_WIRE_ExecuteHandle *eh;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Row ID of the transfer.
|
||||||
|
*/
|
||||||
|
unsigned long long row_id;
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Information about one aggregation process to be executed. There is
|
||||||
|
* at most one of these around at any given point in time.
|
||||||
|
*/
|
||||||
|
struct AggregationUnit
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Public key of the merchant.
|
||||||
|
*/
|
||||||
|
struct TALER_MerchantPublicKeyP merchant_pub;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Total amount to be transferred.
|
||||||
|
*/
|
||||||
|
struct TALER_Amount total_amount;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hash of @e wire.
|
||||||
|
*/
|
||||||
|
struct GNUNET_HashCode h_wire;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wire transfer identifier we use.
|
||||||
|
*/
|
||||||
|
struct TALER_WireTransferIdentifierRawP wtid;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Row ID of the transaction that started it all.
|
||||||
|
*/
|
||||||
|
unsigned long long row_id;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current time.
|
||||||
|
*/
|
||||||
|
struct GNUNET_TIME_Absolute execution_time;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wire details of the merchant.
|
||||||
|
*/
|
||||||
|
json_t *wire;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Database session for all of our transactions.
|
||||||
|
*/
|
||||||
|
struct TALER_EXCHANGEDB_Session *session;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wire preparation handle.
|
||||||
|
*/
|
||||||
|
struct TALER_WIRE_PrepareHandle *ph;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Array of #aggregation_limit row_ids from the
|
||||||
|
* aggregation.
|
||||||
|
*/
|
||||||
|
unsigned long long *additional_rows;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Offset specifying how many #additional_rows are in use.
|
||||||
|
*/
|
||||||
|
unsigned int rows_offset;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set to #GNUNET_YES if we have to abort due to failure.
|
||||||
|
*/
|
||||||
|
int failed;
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Which currency is used by this exchange?
|
* Which currency is used by this exchange?
|
||||||
*/
|
*/
|
||||||
@ -63,10 +155,28 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
|
|||||||
static struct TALER_WIRE_Plugin *wire_plugin;
|
static struct TALER_WIRE_Plugin *wire_plugin;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Task for the main #run() function.
|
* Next task to run, if any.
|
||||||
*/
|
*/
|
||||||
static struct GNUNET_SCHEDULER_Task *task;
|
static struct GNUNET_SCHEDULER_Task *task;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If we are currently executing a transfer, information about
|
||||||
|
* the active transfer is here. Otherwise, this variable is NULL.
|
||||||
|
*/
|
||||||
|
static struct WirePrepareData *wpd;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If we are currently aggregating transactions, information about the
|
||||||
|
* active aggregation is here. Otherwise, this variable is NULL.
|
||||||
|
*/
|
||||||
|
static struct AggregationUnit *au;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
|
||||||
|
* on serious errors.
|
||||||
|
*/
|
||||||
|
static int global_ret;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* #GNUNET_YES if we are in test mode and are using temporary tables.
|
* #GNUNET_YES if we are in test mode and are using temporary tables.
|
||||||
*/
|
*/
|
||||||
@ -78,9 +188,59 @@ static int test_mode;
|
|||||||
* of the smallest possible unit are aggregated, they do surpass the
|
* of the smallest possible unit are aggregated, they do surpass the
|
||||||
* "tiny" threshold beyond which we never trigger a wire transaction!
|
* "tiny" threshold beyond which we never trigger a wire transaction!
|
||||||
*
|
*
|
||||||
* TODO: make configurable (via config file or command line option)
|
* Note: do not change here, Postgres requires us to hard-code the
|
||||||
|
* LIMIT in the prepared statement.
|
||||||
*/
|
*/
|
||||||
static unsigned int aggregation_limit = 10000;
|
static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
|
||||||
|
*
|
||||||
|
* @param cls closure
|
||||||
|
* @param tc scheduler context
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
shutdown_task (void *cls,
|
||||||
|
const struct GNUNET_SCHEDULER_TaskContext *tc)
|
||||||
|
{
|
||||||
|
if (NULL != task)
|
||||||
|
{
|
||||||
|
GNUNET_SCHEDULER_cancel (task);
|
||||||
|
task = NULL;
|
||||||
|
}
|
||||||
|
if (NULL != wpd)
|
||||||
|
{
|
||||||
|
if (NULL != wpd->eh)
|
||||||
|
{
|
||||||
|
wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls,
|
||||||
|
wpd->eh);
|
||||||
|
wpd->eh = NULL;
|
||||||
|
}
|
||||||
|
db_plugin->rollback (db_plugin->cls,
|
||||||
|
wpd->session);
|
||||||
|
GNUNET_free (wpd);
|
||||||
|
wpd = NULL;
|
||||||
|
}
|
||||||
|
if (NULL != au)
|
||||||
|
{
|
||||||
|
if (NULL != au->ph)
|
||||||
|
{
|
||||||
|
wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls,
|
||||||
|
au->ph);
|
||||||
|
au->ph = NULL;
|
||||||
|
}
|
||||||
|
db_plugin->rollback (db_plugin->cls,
|
||||||
|
au->session);
|
||||||
|
GNUNET_free_non_null (au->additional_rows);
|
||||||
|
if (NULL != au->wire)
|
||||||
|
json_decref (au->wire);
|
||||||
|
au = NULL;
|
||||||
|
GNUNET_free (au);
|
||||||
|
}
|
||||||
|
TALER_EXCHANGEDB_plugin_unload (db_plugin);
|
||||||
|
TALER_WIRE_plugin_unload (wire_plugin);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -158,86 +318,11 @@ exchange_serve_process_config (const char *exchange_directory)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Information about one aggregation process to
|
|
||||||
* be executed.
|
|
||||||
*/
|
|
||||||
struct AggregationUnit
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Public key of the merchant.
|
|
||||||
*/
|
|
||||||
struct TALER_MerchantPublicKeyP merchant_pub;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Total amount to be transferred.
|
|
||||||
*/
|
|
||||||
struct TALER_Amount total_amount;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Hash of @e wire.
|
|
||||||
*/
|
|
||||||
struct GNUNET_HashCode h_wire;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wire transfer identifier we use.
|
|
||||||
*/
|
|
||||||
struct TALER_WireTransferIdentifierRawP wtid;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Row ID of the transaction that started it all.
|
|
||||||
*/
|
|
||||||
unsigned long long row_id;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The current time.
|
|
||||||
*/
|
|
||||||
struct GNUNET_TIME_Absolute execution_time;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wire details of the merchant.
|
|
||||||
*/
|
|
||||||
json_t *wire;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Database session for all of our transactions.
|
|
||||||
*/
|
|
||||||
struct TALER_EXCHANGEDB_Session *session;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wire preparation handle.
|
|
||||||
*/
|
|
||||||
struct TALER_WIRE_PrepareHandle *ph;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Array of #aggregation_limit row_ids from the
|
|
||||||
* aggregation.
|
|
||||||
*/
|
|
||||||
unsigned long long *additional_rows;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pointer to global return value. Closure for #run().
|
|
||||||
*/
|
|
||||||
int *global_ret;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Offset specifying how many #additional_rows are in use.
|
|
||||||
*/
|
|
||||||
unsigned int rows_offset;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set to #GNUNET_YES if we have to abort due to failure.
|
|
||||||
*/
|
|
||||||
int failed;
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Function called with details about deposits that have been made,
|
* Function called with details about deposits that have been made,
|
||||||
* with the goal of executing the corresponding wire transaction.
|
* with the goal of executing the corresponding wire transaction.
|
||||||
*
|
*
|
||||||
* @param cls closure with the `struct AggregationUnit`
|
* @param cls NULL
|
||||||
* @param row_id identifies database entry
|
* @param row_id identifies database entry
|
||||||
* @param merchant_pub public key of the merchant
|
* @param merchant_pub public key of the merchant
|
||||||
* @param coin_pub public key of the coin
|
* @param coin_pub public key of the coin
|
||||||
@ -262,8 +347,6 @@ deposit_cb (void *cls,
|
|||||||
struct GNUNET_TIME_Absolute wire_deadline,
|
struct GNUNET_TIME_Absolute wire_deadline,
|
||||||
const json_t *wire)
|
const json_t *wire)
|
||||||
{
|
{
|
||||||
struct AggregationUnit *au = cls;
|
|
||||||
|
|
||||||
au->merchant_pub = *merchant_pub;
|
au->merchant_pub = *merchant_pub;
|
||||||
if (GNUNET_OK !=
|
if (GNUNET_OK !=
|
||||||
TALER_amount_subtract (&au->total_amount,
|
TALER_amount_subtract (&au->total_amount,
|
||||||
@ -284,6 +367,9 @@ deposit_cb (void *cls,
|
|||||||
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
|
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
|
||||||
&au->wtid,
|
&au->wtid,
|
||||||
sizeof (au->wtid));
|
sizeof (au->wtid));
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Starting aggregation under WTID %s\n",
|
||||||
|
TALER_B2S (&au->wtid));
|
||||||
if (GNUNET_OK !=
|
if (GNUNET_OK !=
|
||||||
db_plugin->insert_aggregation_tracking (db_plugin->cls,
|
db_plugin->insert_aggregation_tracking (db_plugin->cls,
|
||||||
au->session,
|
au->session,
|
||||||
@ -313,12 +399,11 @@ deposit_cb (void *cls,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Function called with details about another deposit we
|
* Function called with details about another deposit we
|
||||||
* can aggregate into an existing aggregation unit.
|
* can aggregate into an existing aggregation unit.
|
||||||
*
|
*
|
||||||
* @param cls closure with the `struct AggregationUnit`
|
* @param cls NULL
|
||||||
* @param row_id identifies database entry
|
* @param row_id identifies database entry
|
||||||
* @param merchant_pub public key of the merchant
|
* @param merchant_pub public key of the merchant
|
||||||
* @param coin_pub public key of the coin
|
* @param coin_pub public key of the coin
|
||||||
@ -343,7 +428,6 @@ aggregate_cb (void *cls,
|
|||||||
struct GNUNET_TIME_Absolute wire_deadline,
|
struct GNUNET_TIME_Absolute wire_deadline,
|
||||||
const json_t *wire)
|
const json_t *wire)
|
||||||
{
|
{
|
||||||
struct AggregationUnit *au = cls;
|
|
||||||
struct TALER_Amount delta;
|
struct TALER_Amount delta;
|
||||||
|
|
||||||
GNUNET_break (0 ==
|
GNUNET_break (0 ==
|
||||||
@ -432,27 +516,28 @@ prepare_cb (void *cls,
|
|||||||
* Main work function that queries the DB and aggregates transactions
|
* Main work function that queries the DB and aggregates transactions
|
||||||
* into larger wire transfers.
|
* into larger wire transfers.
|
||||||
*
|
*
|
||||||
* @param cls pointer to an `int` which we will return from main()
|
* @param cls NULL
|
||||||
* @param tc scheduler context
|
* @param tc scheduler context
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
run_aggregation (void *cls,
|
run_aggregation (void *cls,
|
||||||
const struct GNUNET_SCHEDULER_TaskContext *tc)
|
const struct GNUNET_SCHEDULER_TaskContext *tc)
|
||||||
{
|
{
|
||||||
int *global_ret = cls;
|
|
||||||
struct TALER_EXCHANGEDB_Session *session;
|
struct TALER_EXCHANGEDB_Session *session;
|
||||||
struct AggregationUnit *au;
|
|
||||||
unsigned int i;
|
unsigned int i;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
task = NULL;
|
||||||
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
|
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
|
||||||
return;
|
return;
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Checking for ready deposits to aggregate\n");
|
||||||
if (NULL == (session = db_plugin->get_session (db_plugin->cls,
|
if (NULL == (session = db_plugin->get_session (db_plugin->cls,
|
||||||
test_mode)))
|
test_mode)))
|
||||||
{
|
{
|
||||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||||
"Failed to obtain database session!\n");
|
"Failed to obtain database session!\n");
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (GNUNET_OK !=
|
if (GNUNET_OK !=
|
||||||
@ -461,7 +546,7 @@ run_aggregation (void *cls,
|
|||||||
{
|
{
|
||||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||||
"Failed to start database transaction!\n");
|
"Failed to start database transaction!\n");
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
au = GNUNET_new (struct AggregationUnit);
|
au = GNUNET_new (struct AggregationUnit);
|
||||||
@ -475,15 +560,18 @@ run_aggregation (void *cls,
|
|||||||
if (NULL != au->wire)
|
if (NULL != au->wire)
|
||||||
json_decref (au->wire);
|
json_decref (au->wire);
|
||||||
GNUNET_free (au);
|
GNUNET_free (au);
|
||||||
|
au = NULL;
|
||||||
db_plugin->rollback (db_plugin->cls,
|
db_plugin->rollback (db_plugin->cls,
|
||||||
session);
|
session);
|
||||||
if (0 != ret)
|
if (0 != ret)
|
||||||
{
|
{
|
||||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||||
"Failed to execute deposit iteration!\n");
|
"Failed to execute deposit iteration!\n");
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"No more ready deposits, going to sleep\n");
|
||||||
if (GNUNET_YES == test_mode)
|
if (GNUNET_YES == test_mode)
|
||||||
{
|
{
|
||||||
/* in test mode, shutdown if we end up being idle */
|
/* in test mode, shutdown if we end up being idle */
|
||||||
@ -494,11 +582,14 @@ run_aggregation (void *cls,
|
|||||||
/* nothing to do, sleep for a minute and try again */
|
/* nothing to do, sleep for a minute and try again */
|
||||||
task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
|
task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
|
||||||
&run_aggregation,
|
&run_aggregation,
|
||||||
global_ret);
|
NULL);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
/* Now try to find other deposits to aggregate */
|
/* Now try to find other deposits to aggregate */
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Found ready deposit for %s, aggregating\n",
|
||||||
|
TALER_B2S (&au->merchant_pub));
|
||||||
ret = db_plugin->iterate_matching_deposits (db_plugin->cls,
|
ret = db_plugin->iterate_matching_deposits (db_plugin->cls,
|
||||||
session,
|
session,
|
||||||
&au->h_wire,
|
&au->h_wire,
|
||||||
@ -515,9 +606,10 @@ run_aggregation (void *cls,
|
|||||||
if (NULL != au->wire)
|
if (NULL != au->wire)
|
||||||
json_decref (au->wire);
|
json_decref (au->wire);
|
||||||
GNUNET_free (au);
|
GNUNET_free (au);
|
||||||
|
au = NULL;
|
||||||
db_plugin->rollback (db_plugin->cls,
|
db_plugin->rollback (db_plugin->cls,
|
||||||
session);
|
session);
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
/* Round to the unit supported by the wire transfer method */
|
/* Round to the unit supported by the wire transfer method */
|
||||||
@ -528,6 +620,8 @@ run_aggregation (void *cls,
|
|||||||
if ( (0 == au->total_amount.value) &&
|
if ( (0 == au->total_amount.value) &&
|
||||||
(0 == au->total_amount.fraction) )
|
(0 == au->total_amount.fraction) )
|
||||||
{
|
{
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Aggregate value too low for transfer\n");
|
||||||
/* Rollback ongoing transaction, as we will not use the respective
|
/* Rollback ongoing transaction, as we will not use the respective
|
||||||
WTID and thus need to remove the tracking data */
|
WTID and thus need to remove the tracking data */
|
||||||
db_plugin->rollback (db_plugin->cls,
|
db_plugin->rollback (db_plugin->cls,
|
||||||
@ -540,11 +634,12 @@ run_aggregation (void *cls,
|
|||||||
{
|
{
|
||||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||||
"Failed to start database transaction!\n");
|
"Failed to start database transaction!\n");
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
GNUNET_free_non_null (au->additional_rows);
|
GNUNET_free_non_null (au->additional_rows);
|
||||||
if (NULL != au->wire)
|
if (NULL != au->wire)
|
||||||
json_decref (au->wire);
|
json_decref (au->wire);
|
||||||
GNUNET_free (au);
|
GNUNET_free (au);
|
||||||
|
au = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
/* Mark transactions by row_id as minor */
|
/* Mark transactions by row_id as minor */
|
||||||
@ -573,21 +668,27 @@ run_aggregation (void *cls,
|
|||||||
if (NULL != au->wire)
|
if (NULL != au->wire)
|
||||||
json_decref (au->wire);
|
json_decref (au->wire);
|
||||||
GNUNET_free (au);
|
GNUNET_free (au);
|
||||||
|
au = NULL;
|
||||||
/* start again */
|
/* start again */
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||||
global_ret);
|
NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
au->global_ret = global_ret;
|
{
|
||||||
|
char *amount_s;
|
||||||
|
|
||||||
|
amount_s = TALER_amount_to_string (&au->total_amount);
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Preparing wire transfer of %s to %s\n",
|
||||||
|
amount_s,
|
||||||
|
TALER_B2S (&au->merchant_pub));
|
||||||
|
}
|
||||||
au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls,
|
au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls,
|
||||||
au->wire,
|
au->wire,
|
||||||
&au->total_amount,
|
&au->total_amount,
|
||||||
&au->wtid,
|
&au->wtid,
|
||||||
&prepare_cb,
|
&prepare_cb,
|
||||||
au);
|
au);
|
||||||
/* FIXME: currently we have no clean-up plan on
|
|
||||||
shutdown to call prepare_wire_transfer_cancel!
|
|
||||||
Maybe make 'au' global? */
|
|
||||||
if (NULL == au->ph)
|
if (NULL == au->ph)
|
||||||
{
|
{
|
||||||
GNUNET_break (0); /* why? how to best recover? */
|
GNUNET_break (0); /* why? how to best recover? */
|
||||||
@ -596,10 +697,11 @@ run_aggregation (void *cls,
|
|||||||
GNUNET_free_non_null (au->additional_rows);
|
GNUNET_free_non_null (au->additional_rows);
|
||||||
if (NULL != au->wire)
|
if (NULL != au->wire)
|
||||||
json_decref (au->wire);
|
json_decref (au->wire);
|
||||||
|
au = NULL;
|
||||||
GNUNET_free (au);
|
GNUNET_free (au);
|
||||||
/* start again */
|
/* start again */
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||||
global_ret);
|
NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
/* otherwise we continue with #prepare_cb(), see below */
|
/* otherwise we continue with #prepare_cb(), see below */
|
||||||
@ -621,7 +723,7 @@ run_transfers (void *cls,
|
|||||||
/**
|
/**
|
||||||
* Function to be called with the prepared transfer data.
|
* Function to be called with the prepared transfer data.
|
||||||
*
|
*
|
||||||
* @param cls closure with the `struct AggregationUnit`
|
* @param cls NULL
|
||||||
* @param buf transaction data to persist, NULL on error
|
* @param buf transaction data to persist, NULL on error
|
||||||
* @param buf_size number of bytes in @a buf, 0 on error
|
* @param buf_size number of bytes in @a buf, 0 on error
|
||||||
*/
|
*/
|
||||||
@ -630,14 +732,13 @@ prepare_cb (void *cls,
|
|||||||
const char *buf,
|
const char *buf,
|
||||||
size_t buf_size)
|
size_t buf_size)
|
||||||
{
|
{
|
||||||
struct AggregationUnit *au = cls;
|
|
||||||
int *global_ret = au->global_ret;
|
|
||||||
struct TALER_EXCHANGEDB_Session *session = au->session;
|
struct TALER_EXCHANGEDB_Session *session = au->session;
|
||||||
|
|
||||||
GNUNET_free_non_null (au->additional_rows);
|
GNUNET_free_non_null (au->additional_rows);
|
||||||
if (NULL != au->wire)
|
if (NULL != au->wire)
|
||||||
json_decref (au->wire);
|
json_decref (au->wire);
|
||||||
GNUNET_free (au);
|
GNUNET_free (au);
|
||||||
|
au = NULL;
|
||||||
if (NULL == buf)
|
if (NULL == buf)
|
||||||
{
|
{
|
||||||
GNUNET_break (0); /* why? how to best recover? */
|
GNUNET_break (0); /* why? how to best recover? */
|
||||||
@ -645,7 +746,7 @@ prepare_cb (void *cls,
|
|||||||
session);
|
session);
|
||||||
/* start again */
|
/* start again */
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||||
global_ret);
|
NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -662,7 +763,7 @@ prepare_cb (void *cls,
|
|||||||
session);
|
session);
|
||||||
/* start again */
|
/* start again */
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||||
global_ret);
|
NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -676,50 +777,21 @@ prepare_cb (void *cls,
|
|||||||
"Failed to commit database transaction!\n");
|
"Failed to commit database transaction!\n");
|
||||||
/* try again */
|
/* try again */
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||||
global_ret);
|
NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Preparation complete, switching to transfer mode\n");
|
||||||
/* run alternative task: actually do wire transfer! */
|
/* run alternative task: actually do wire transfer! */
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_transfers,
|
task = GNUNET_SCHEDULER_add_now (&run_transfers,
|
||||||
&global_ret);
|
NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Data we keep to #run_transfers().
|
|
||||||
*/
|
|
||||||
struct WirePrepareData
|
|
||||||
{
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Database session for all of our transactions.
|
|
||||||
*/
|
|
||||||
struct TALER_EXCHANGEDB_Session *session;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Wire execution handle.
|
|
||||||
*/
|
|
||||||
struct TALER_WIRE_ExecuteHandle *eh;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pointer to global return value. Closure for #run().
|
|
||||||
*/
|
|
||||||
int *global_ret;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Row ID of the transfer.
|
|
||||||
*/
|
|
||||||
unsigned long long row_id;
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Function called with the result from the execute step.
|
* Function called with the result from the execute step.
|
||||||
*
|
*
|
||||||
* @param cls closure with the `struct WirePrepareData`
|
* @param cls NULL
|
||||||
* @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
|
* @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
|
||||||
* @param emsg NULL on success, otherwise an error message
|
* @param emsg NULL on success, otherwise an error message
|
||||||
*/
|
*/
|
||||||
@ -728,8 +800,6 @@ wire_confirm_cb (void *cls,
|
|||||||
int success,
|
int success,
|
||||||
const char *emsg)
|
const char *emsg)
|
||||||
{
|
{
|
||||||
struct WirePrepareData *wpd = cls;
|
|
||||||
int *global_ret = wpd->global_ret;
|
|
||||||
struct TALER_EXCHANGEDB_Session *session = wpd->session;
|
struct TALER_EXCHANGEDB_Session *session = wpd->session;
|
||||||
|
|
||||||
wpd->eh = NULL;
|
wpd->eh = NULL;
|
||||||
@ -740,8 +810,9 @@ wire_confirm_cb (void *cls,
|
|||||||
emsg);
|
emsg);
|
||||||
db_plugin->rollback (db_plugin->cls,
|
db_plugin->rollback (db_plugin->cls,
|
||||||
session);
|
session);
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
GNUNET_free (wpd);
|
GNUNET_free (wpd);
|
||||||
|
wpd = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (GNUNET_OK !=
|
if (GNUNET_OK !=
|
||||||
@ -752,11 +823,13 @@ wire_confirm_cb (void *cls,
|
|||||||
GNUNET_break (0); /* why!? */
|
GNUNET_break (0); /* why!? */
|
||||||
db_plugin->rollback (db_plugin->cls,
|
db_plugin->rollback (db_plugin->cls,
|
||||||
session);
|
session);
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
GNUNET_free (wpd);
|
GNUNET_free (wpd);
|
||||||
|
wpd = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
GNUNET_free (wpd);
|
GNUNET_free (wpd);
|
||||||
|
wpd = NULL;
|
||||||
if (GNUNET_OK !=
|
if (GNUNET_OK !=
|
||||||
db_plugin->commit (db_plugin->cls,
|
db_plugin->commit (db_plugin->cls,
|
||||||
session))
|
session))
|
||||||
@ -765,13 +838,15 @@ wire_confirm_cb (void *cls,
|
|||||||
"Failed to commit database transaction!\n");
|
"Failed to commit database transaction!\n");
|
||||||
/* try again */
|
/* try again */
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||||
global_ret);
|
NULL);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Wire transfer complete\n");
|
||||||
/* continue with #run_transfers(), just to guard
|
/* continue with #run_transfers(), just to guard
|
||||||
against the unlikely case that there are more. */
|
against the unlikely case that there are more. */
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_transfers,
|
task = GNUNET_SCHEDULER_add_now (&run_transfers,
|
||||||
&global_ret);
|
NULL);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -779,7 +854,7 @@ wire_confirm_cb (void *cls,
|
|||||||
/**
|
/**
|
||||||
* Callback with data about a prepared transaction.
|
* Callback with data about a prepared transaction.
|
||||||
*
|
*
|
||||||
* @param cls closure with the `struct WirePrepareData`
|
* @param cls NULL
|
||||||
* @param rowid row identifier used to mark prepared transaction as done
|
* @param rowid row identifier used to mark prepared transaction as done
|
||||||
* @param buf transaction data that was persisted, NULL on error
|
* @param buf transaction data that was persisted, NULL on error
|
||||||
* @param buf_size number of bytes in @a buf, 0 on error
|
* @param buf_size number of bytes in @a buf, 0 on error
|
||||||
@ -790,25 +865,23 @@ wire_prepare_cb (void *cls,
|
|||||||
const char *buf,
|
const char *buf,
|
||||||
size_t buf_size)
|
size_t buf_size)
|
||||||
{
|
{
|
||||||
struct WirePrepareData *wpd = cls;
|
|
||||||
int *global_ret = wpd->global_ret;
|
|
||||||
|
|
||||||
wpd->row_id = rowid;
|
wpd->row_id = rowid;
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Starting wire transfer %llu\n",
|
||||||
|
rowid);
|
||||||
wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls,
|
wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls,
|
||||||
buf,
|
buf,
|
||||||
buf_size,
|
buf_size,
|
||||||
&wire_confirm_cb,
|
&wire_confirm_cb,
|
||||||
wpd);
|
NULL);
|
||||||
/* FIXME: currently we have no clean-up plan on
|
|
||||||
shutdown to call execute_wire_transfer_cancel!
|
|
||||||
Maybe make 'wpd' global? */
|
|
||||||
if (NULL == wpd->eh)
|
if (NULL == wpd->eh)
|
||||||
{
|
{
|
||||||
GNUNET_break (0); /* why? how to best recover? */
|
GNUNET_break (0); /* why? how to best recover? */
|
||||||
db_plugin->rollback (db_plugin->cls,
|
db_plugin->rollback (db_plugin->cls,
|
||||||
wpd->session);
|
wpd->session);
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
GNUNET_free (wpd);
|
GNUNET_free (wpd);
|
||||||
|
wpd = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -818,18 +891,19 @@ wire_prepare_cb (void *cls,
|
|||||||
* Execute the wire transfers that we have committed to
|
* Execute the wire transfers that we have committed to
|
||||||
* do.
|
* do.
|
||||||
*
|
*
|
||||||
* @param cls pointer to an `int` which we will return from main()
|
* @param cls NULL
|
||||||
* @param tc scheduler context
|
* @param tc scheduler context
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
run_transfers (void *cls,
|
run_transfers (void *cls,
|
||||||
const struct GNUNET_SCHEDULER_TaskContext *tc)
|
const struct GNUNET_SCHEDULER_TaskContext *tc)
|
||||||
{
|
{
|
||||||
int *global_ret = cls;
|
|
||||||
int ret;
|
int ret;
|
||||||
struct WirePrepareData *wpd;
|
|
||||||
struct TALER_EXCHANGEDB_Session *session;
|
struct TALER_EXCHANGEDB_Session *session;
|
||||||
|
|
||||||
|
task = NULL;
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Checking for pending wire transfers\n");
|
||||||
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
|
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
|
||||||
return;
|
return;
|
||||||
if (NULL == (session = db_plugin->get_session (db_plugin->cls,
|
if (NULL == (session = db_plugin->get_session (db_plugin->cls,
|
||||||
@ -837,7 +911,7 @@ run_transfers (void *cls,
|
|||||||
{
|
{
|
||||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||||
"Failed to obtain database session!\n");
|
"Failed to obtain database session!\n");
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (GNUNET_OK !=
|
if (GNUNET_OK !=
|
||||||
@ -846,40 +920,61 @@ run_transfers (void *cls,
|
|||||||
{
|
{
|
||||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||||
"Failed to start database transaction!\n");
|
"Failed to start database transaction!\n");
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
wpd = GNUNET_new (struct WirePrepareData);
|
wpd = GNUNET_new (struct WirePrepareData);
|
||||||
wpd->session = session;
|
wpd->session = session;
|
||||||
wpd->global_ret = global_ret;
|
|
||||||
ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
|
ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
|
||||||
session,
|
session,
|
||||||
exchange_wireformat,
|
exchange_wireformat,
|
||||||
&wire_prepare_cb,
|
&wire_prepare_cb,
|
||||||
wpd);
|
NULL);
|
||||||
if (GNUNET_SYSERR == ret)
|
if (GNUNET_SYSERR == ret)
|
||||||
{
|
{
|
||||||
GNUNET_break (0); /* why? how to best recover? */
|
GNUNET_break (0); /* why? how to best recover? */
|
||||||
db_plugin->rollback (db_plugin->cls,
|
db_plugin->rollback (db_plugin->cls,
|
||||||
session);
|
session);
|
||||||
*global_ret = GNUNET_SYSERR;
|
global_ret = GNUNET_SYSERR;
|
||||||
GNUNET_free (wpd);
|
GNUNET_free (wpd);
|
||||||
|
wpd = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (GNUNET_NO == ret)
|
if (GNUNET_NO == ret)
|
||||||
{
|
{
|
||||||
/* no more prepared wire transfers, go back to aggregation! */
|
/* no more prepared wire transfers, go back to aggregation! */
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"No more pending wire transfers, starting aggregation\n");
|
||||||
db_plugin->rollback (db_plugin->cls,
|
db_plugin->rollback (db_plugin->cls,
|
||||||
session);
|
session);
|
||||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||||
global_ret);
|
NULL);
|
||||||
GNUNET_free (wpd);
|
GNUNET_free (wpd);
|
||||||
|
wpd = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
/* otherwise, continues in #wire_prepare_cb() */
|
/* otherwise, continues in #wire_prepare_cb() */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* First task.
|
||||||
|
*
|
||||||
|
* @param cls closure, NULL
|
||||||
|
* @param tc scheduler context
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
run (void *cls,
|
||||||
|
const struct GNUNET_SCHEDULER_TaskContext *tc)
|
||||||
|
{
|
||||||
|
task = GNUNET_SCHEDULER_add_now (&run_transfers,
|
||||||
|
NULL);
|
||||||
|
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
|
||||||
|
&shutdown_task,
|
||||||
|
cls);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The main function of the taler-exchange-httpd server ("the exchange").
|
* The main function of the taler-exchange-httpd server ("the exchange").
|
||||||
*
|
*
|
||||||
@ -906,7 +1001,6 @@ main (int argc,
|
|||||||
GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION),
|
GNUNET_GETOPT_OPTION_VERSION (VERSION "-" VCS_VERSION),
|
||||||
GNUNET_GETOPT_OPTION_END
|
GNUNET_GETOPT_OPTION_END
|
||||||
};
|
};
|
||||||
int ret = GNUNET_OK;
|
|
||||||
|
|
||||||
GNUNET_assert (GNUNET_OK ==
|
GNUNET_assert (GNUNET_OK ==
|
||||||
GNUNET_log_setup ("taler-exchange-aggregator",
|
GNUNET_log_setup ("taler-exchange-aggregator",
|
||||||
@ -928,12 +1022,10 @@ main (int argc,
|
|||||||
{
|
{
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
global_ret = GNUNET_OK;
|
||||||
|
GNUNET_SCHEDULER_run (&run, NULL);
|
||||||
|
|
||||||
GNUNET_SCHEDULER_run (&run_transfers, &ret);
|
return (GNUNET_SYSERR == global_ret) ? 1 : 0;
|
||||||
|
|
||||||
TALER_EXCHANGEDB_plugin_unload (db_plugin);
|
|
||||||
TALER_WIRE_plugin_unload (wire_plugin);
|
|
||||||
return (GNUNET_SYSERR == ret) ? 1 : 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* end of taler-exchange-aggregator.c */
|
/* end of taler-exchange-aggregator.c */
|
||||||
|
@ -668,25 +668,19 @@ run_test ()
|
|||||||
.label = "run-aggregator-deposit-1"
|
.label = "run-aggregator-deposit-1"
|
||||||
},
|
},
|
||||||
|
|
||||||
/* The above step is already known to fail (with an error message)
|
|
||||||
right now, so we skip the rest of the test. */
|
|
||||||
{
|
|
||||||
.opcode = OPCODE_TERMINATE_SKIP,
|
|
||||||
.label = "testcase-incomplete-terminating-with-skip"
|
|
||||||
},
|
|
||||||
|
|
||||||
|
|
||||||
{
|
{
|
||||||
.opcode = OPCODE_EXPECT_TRANSACTION,
|
.opcode = OPCODE_EXPECT_TRANSACTION,
|
||||||
.label = "expect-deposit-1",
|
.label = "expect-deposit-1",
|
||||||
.details.expect_transaction.debit_account = 1,
|
.details.expect_transaction.debit_account = 3,
|
||||||
.details.expect_transaction.credit_account = 4,
|
.details.expect_transaction.credit_account = 4,
|
||||||
.details.expect_transaction.amount = "EUR:1"
|
.details.expect_transaction.amount = "EUR:1"
|
||||||
},
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
.opcode = OPCODE_EXPECT_TRANSACTIONS_EMPTY,
|
.opcode = OPCODE_EXPECT_TRANSACTIONS_EMPTY,
|
||||||
.label = "expect-empty-transactions-on-start"
|
.label = "expect-empty-transactions-on-start"
|
||||||
},
|
},
|
||||||
|
|
||||||
/* test idempotency: run again on transactions already done */
|
/* test idempotency: run again on transactions already done */
|
||||||
{
|
{
|
||||||
.opcode = OPCODE_DATABASE_DEPOSIT,
|
.opcode = OPCODE_DATABASE_DEPOSIT,
|
||||||
@ -703,6 +697,11 @@ run_test ()
|
|||||||
.label = "expect-empty-transactions-on-start"
|
.label = "expect-empty-transactions-on-start"
|
||||||
},
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
.opcode = OPCODE_TERMINATE_SUCCESS,
|
||||||
|
.label = "testcase-incomplete-terminating-with-skip"
|
||||||
|
},
|
||||||
|
|
||||||
{
|
{
|
||||||
.opcode = OPCODE_TERMINATE_SKIP,
|
.opcode = OPCODE_TERMINATE_SKIP,
|
||||||
.label = "testcase-incomplete-terminating-with-skip"
|
.label = "testcase-incomplete-terminating-with-skip"
|
||||||
@ -804,7 +803,6 @@ handle_mhd_request (void *cls,
|
|||||||
GNUNET_break_op (0);
|
GNUNET_break_op (0);
|
||||||
return MHD_NO;
|
return MHD_NO;
|
||||||
}
|
}
|
||||||
/* FIXME: to be implemented! */
|
|
||||||
pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX,
|
pr = GNUNET_JSON_post_parser (REQUEST_BUFFER_MAX,
|
||||||
con_cls,
|
con_cls,
|
||||||
upload_data,
|
upload_data,
|
||||||
@ -848,6 +846,10 @@ handle_mhd_request (void *cls,
|
|||||||
transactions_tail,
|
transactions_tail,
|
||||||
t);
|
t);
|
||||||
}
|
}
|
||||||
|
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||||
|
"Receiving incoming wire transfer: %llu->%llu\n",
|
||||||
|
(unsigned long long) t->debit_account,
|
||||||
|
(unsigned long long) t->credit_account);
|
||||||
json_decref (json);
|
json_decref (json);
|
||||||
resp = MHD_create_response_from_buffer (0, "", MHD_RESPMEM_PERSISTENT);
|
resp = MHD_create_response_from_buffer (0, "", MHD_RESPMEM_PERSISTENT);
|
||||||
ret = MHD_queue_response (connection,
|
ret = MHD_queue_response (connection,
|
||||||
|
@ -80,6 +80,7 @@ test_exchangedb_postgres_SOURCES = \
|
|||||||
test_exchangedb.c
|
test_exchangedb.c
|
||||||
test_exchangedb_postgres_LDADD = \
|
test_exchangedb_postgres_LDADD = \
|
||||||
libtalerexchangedb.la \
|
libtalerexchangedb.la \
|
||||||
|
$(top_builddir)/src/json/libtalerjson.la \
|
||||||
$(top_srcdir)/src/util/libtalerutil.la \
|
$(top_srcdir)/src/util/libtalerutil.la \
|
||||||
$(top_srcdir)/src/pq/libtalerpq.la \
|
$(top_srcdir)/src/pq/libtalerpq.la \
|
||||||
-lgnunetutil -ljansson
|
-lgnunetutil -ljansson
|
||||||
|
@ -952,7 +952,7 @@ postgres_prepare (PGconn *db_conn)
|
|||||||
" tiny=false AND"
|
" tiny=false AND"
|
||||||
" done=false"
|
" done=false"
|
||||||
" ORDER BY wire_deadline ASC"
|
" ORDER BY wire_deadline ASC"
|
||||||
" LIMIT 1;",
|
" LIMIT 1",
|
||||||
0, NULL);
|
0, NULL);
|
||||||
|
|
||||||
/* Used in #postgres_iterate_matching_deposits() */
|
/* Used in #postgres_iterate_matching_deposits() */
|
||||||
@ -975,8 +975,8 @@ postgres_prepare (PGconn *db_conn)
|
|||||||
" h_wire=$2 AND"
|
" h_wire=$2 AND"
|
||||||
" done=false"
|
" done=false"
|
||||||
" ORDER BY wire_deadline ASC"
|
" ORDER BY wire_deadline ASC"
|
||||||
" LIMIT $3",
|
" LIMIT " TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT_STR,
|
||||||
3, NULL);
|
2, NULL);
|
||||||
|
|
||||||
/* Used in #postgres_mark_deposit_tiny() */
|
/* Used in #postgres_mark_deposit_tiny() */
|
||||||
PREPARE ("mark_deposit_tiny",
|
PREPARE ("mark_deposit_tiny",
|
||||||
@ -2336,7 +2336,6 @@ postgres_iterate_matching_deposits (void *cls,
|
|||||||
struct GNUNET_PQ_QueryParam params[] = {
|
struct GNUNET_PQ_QueryParam params[] = {
|
||||||
GNUNET_PQ_query_param_auto_from_type (merchant_pub),
|
GNUNET_PQ_query_param_auto_from_type (merchant_pub),
|
||||||
GNUNET_PQ_query_param_auto_from_type (h_wire),
|
GNUNET_PQ_query_param_auto_from_type (h_wire),
|
||||||
GNUNET_PQ_query_param_uint32 (&limit),
|
|
||||||
GNUNET_PQ_query_param_end
|
GNUNET_PQ_query_param_end
|
||||||
};
|
};
|
||||||
PGresult *result;
|
PGresult *result;
|
||||||
@ -2366,7 +2365,6 @@ postgres_iterate_matching_deposits (void *cls,
|
|||||||
struct TALER_Amount deposit_fee;
|
struct TALER_Amount deposit_fee;
|
||||||
struct GNUNET_TIME_Absolute wire_deadline;
|
struct GNUNET_TIME_Absolute wire_deadline;
|
||||||
struct GNUNET_HashCode h_contract;
|
struct GNUNET_HashCode h_contract;
|
||||||
struct TALER_MerchantPublicKeyP merchant_pub;
|
|
||||||
struct TALER_CoinSpendPublicKeyP coin_pub;
|
struct TALER_CoinSpendPublicKeyP coin_pub;
|
||||||
uint64_t transaction_id;
|
uint64_t transaction_id;
|
||||||
uint64_t serial_id;
|
uint64_t serial_id;
|
||||||
@ -2384,8 +2382,6 @@ postgres_iterate_matching_deposits (void *cls,
|
|||||||
&wire_deadline),
|
&wire_deadline),
|
||||||
GNUNET_PQ_result_spec_auto_from_type ("h_contract",
|
GNUNET_PQ_result_spec_auto_from_type ("h_contract",
|
||||||
&h_contract),
|
&h_contract),
|
||||||
GNUNET_PQ_result_spec_auto_from_type ("merchant_pub",
|
|
||||||
&merchant_pub),
|
|
||||||
GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
|
GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
|
||||||
&coin_pub),
|
&coin_pub),
|
||||||
GNUNET_PQ_result_spec_end
|
GNUNET_PQ_result_spec_end
|
||||||
@ -2399,7 +2395,7 @@ postgres_iterate_matching_deposits (void *cls,
|
|||||||
}
|
}
|
||||||
ret = deposit_cb (deposit_cb_cls,
|
ret = deposit_cb (deposit_cb_cls,
|
||||||
serial_id,
|
serial_id,
|
||||||
&merchant_pub,
|
merchant_pub,
|
||||||
&coin_pub,
|
&coin_pub,
|
||||||
&amount_with_fee,
|
&amount_with_fee,
|
||||||
&deposit_fee,
|
&deposit_fee,
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
*/
|
*/
|
||||||
#include "platform.h"
|
#include "platform.h"
|
||||||
#include "taler_exchangedb_lib.h"
|
#include "taler_exchangedb_lib.h"
|
||||||
|
#include "taler_json_lib.h"
|
||||||
#include "taler_exchangedb_plugin.h"
|
#include "taler_exchangedb_plugin.h"
|
||||||
|
|
||||||
static int result;
|
static int result;
|
||||||
@ -546,6 +547,70 @@ cb_wtid_check (void *cls,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function called with details about deposits that
|
||||||
|
* have been made. Called in the test on the
|
||||||
|
* deposit given in @a cls.
|
||||||
|
*
|
||||||
|
* @param cls closure a `struct TALER_EXCHANGEDB_Deposit *`
|
||||||
|
* @param rowid unique ID for the deposit in our DB, used for marking
|
||||||
|
* it as 'tiny' or 'done'
|
||||||
|
* @param merchant_pub public key of the merchant
|
||||||
|
* @param coin_pub public key of the coin
|
||||||
|
* @param amount_with_fee amount that was deposited including fee
|
||||||
|
* @param deposit_fee amount the exchange gets to keep as transaction fees
|
||||||
|
* @param transaction_id unique transaction ID chosen by the merchant
|
||||||
|
* @param h_contract hash of the contract between merchant and customer
|
||||||
|
* @param wire_deadline by which the merchant adviced that he would like the
|
||||||
|
* wire transfer to be executed
|
||||||
|
* @param wire wire details for the merchant, NULL from iterate_matching_deposits()
|
||||||
|
* @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR if deposit does
|
||||||
|
* not match our expectations
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
deposit_cb (void *cls,
|
||||||
|
unsigned long long rowid,
|
||||||
|
const struct TALER_MerchantPublicKeyP *merchant_pub,
|
||||||
|
const struct TALER_CoinSpendPublicKeyP *coin_pub,
|
||||||
|
const struct TALER_Amount *amount_with_fee,
|
||||||
|
const struct TALER_Amount *deposit_fee,
|
||||||
|
uint64_t transaction_id,
|
||||||
|
const struct GNUNET_HashCode *h_contract,
|
||||||
|
struct GNUNET_TIME_Absolute wire_deadline,
|
||||||
|
const json_t *wire)
|
||||||
|
{
|
||||||
|
struct TALER_EXCHANGEDB_Deposit *deposit = cls;
|
||||||
|
struct GNUNET_HashCode h_wire;
|
||||||
|
|
||||||
|
if (NULL != wire)
|
||||||
|
TALER_JSON_hash (wire, &h_wire);
|
||||||
|
if ( (0 != memcmp (merchant_pub,
|
||||||
|
&deposit->merchant_pub,
|
||||||
|
sizeof (struct TALER_MerchantPublicKeyP))) ||
|
||||||
|
(0 != TALER_amount_cmp (amount_with_fee,
|
||||||
|
&deposit->amount_with_fee)) ||
|
||||||
|
(0 != TALER_amount_cmp (deposit_fee,
|
||||||
|
&deposit->deposit_fee)) ||
|
||||||
|
(0 != memcmp (h_contract,
|
||||||
|
&deposit->h_contract,
|
||||||
|
sizeof (struct GNUNET_HashCode))) ||
|
||||||
|
(0 != memcmp (coin_pub,
|
||||||
|
&deposit->coin.coin_pub,
|
||||||
|
sizeof (struct TALER_CoinSpendPublicKeyP))) ||
|
||||||
|
(transaction_id != deposit->transaction_id) ||
|
||||||
|
( (NULL != wire) &&
|
||||||
|
(0 != memcmp (&h_wire,
|
||||||
|
&deposit->h_wire,
|
||||||
|
sizeof (struct GNUNET_HashCode))) ) )
|
||||||
|
{
|
||||||
|
GNUNET_break (0);
|
||||||
|
return GNUNET_SYSERR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return GNUNET_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main function that will be run by the scheduler.
|
* Main function that will be run by the scheduler.
|
||||||
*
|
*
|
||||||
@ -739,14 +804,16 @@ run (void *cls,
|
|||||||
RND_BLK (&deposit.csig);
|
RND_BLK (&deposit.csig);
|
||||||
RND_BLK (&deposit.merchant_pub);
|
RND_BLK (&deposit.merchant_pub);
|
||||||
RND_BLK (&deposit.h_contract);
|
RND_BLK (&deposit.h_contract);
|
||||||
RND_BLK (&deposit.h_wire);
|
|
||||||
wire = json_loads (json_wire_str, 0, NULL);
|
wire = json_loads (json_wire_str, 0, NULL);
|
||||||
|
TALER_JSON_hash (wire,
|
||||||
|
&deposit.h_wire);
|
||||||
deposit.wire = wire;
|
deposit.wire = wire;
|
||||||
deposit.transaction_id =
|
deposit.transaction_id =
|
||||||
GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
|
GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK, UINT64_MAX);
|
||||||
deposit.amount_with_fee = value;
|
deposit.amount_with_fee = value;
|
||||||
GNUNET_assert (GNUNET_OK ==
|
GNUNET_assert (GNUNET_OK ==
|
||||||
TALER_amount_get_zero (CURRENCY, &deposit.deposit_fee));
|
TALER_amount_get_zero (CURRENCY, &deposit.deposit_fee));
|
||||||
|
result = 8;
|
||||||
FAILIF (GNUNET_OK !=
|
FAILIF (GNUNET_OK !=
|
||||||
plugin->insert_deposit (plugin->cls,
|
plugin->insert_deposit (plugin->cls,
|
||||||
session, &deposit));
|
session, &deposit));
|
||||||
@ -754,6 +821,15 @@ run (void *cls,
|
|||||||
plugin->have_deposit (plugin->cls,
|
plugin->have_deposit (plugin->cls,
|
||||||
session,
|
session,
|
||||||
&deposit));
|
&deposit));
|
||||||
|
result = 9;
|
||||||
|
FAILIF (1 !=
|
||||||
|
plugin->iterate_matching_deposits (plugin->cls,
|
||||||
|
session,
|
||||||
|
&deposit.h_wire,
|
||||||
|
&deposit.merchant_pub,
|
||||||
|
&deposit_cb, &deposit,
|
||||||
|
2));
|
||||||
|
result = 10;
|
||||||
deposit2 = deposit;
|
deposit2 = deposit;
|
||||||
deposit2.transaction_id++; /* should fail if transaction id is different */
|
deposit2.transaction_id++; /* should fail if transaction id is different */
|
||||||
FAILIF (GNUNET_NO !=
|
FAILIF (GNUNET_NO !=
|
||||||
@ -880,6 +956,9 @@ main (int argc,
|
|||||||
GNUNET_break (0);
|
GNUNET_break (0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
GNUNET_log_setup (argv[0],
|
||||||
|
"WARNING",
|
||||||
|
NULL);
|
||||||
plugin_name++;
|
plugin_name++;
|
||||||
(void) GNUNET_asprintf (&testname,
|
(void) GNUNET_asprintf (&testname,
|
||||||
"test-exchange-db-%s", plugin_name);
|
"test-exchange-db-%s", plugin_name);
|
||||||
|
@ -947,6 +947,17 @@ struct TALER_EXCHANGEDB_Plugin
|
|||||||
void *deposit_cb_cls);
|
void *deposit_cb_cls);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum number of results we return from iterate_matching_deposits().
|
||||||
|
*
|
||||||
|
* Limit on the number of transactions we aggregate at once. Note
|
||||||
|
* that the limit must be big enough to ensure that when transactions
|
||||||
|
* of the smallest possible unit are aggregated, they do surpass the
|
||||||
|
* "tiny" threshold beyond which we never trigger a wire transaction!
|
||||||
|
*/
|
||||||
|
#define TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT 10000
|
||||||
|
#define TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT_STR "10000"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain information about other pending deposits for the same
|
* Obtain information about other pending deposits for the same
|
||||||
* destination. Those deposits must not already be "done".
|
* destination. Those deposits must not already be "done".
|
||||||
@ -957,7 +968,9 @@ struct TALER_EXCHANGEDB_Plugin
|
|||||||
* @param merchant_pub public key of the merchant
|
* @param merchant_pub public key of the merchant
|
||||||
* @param deposit_cb function to call for each deposit
|
* @param deposit_cb function to call for each deposit
|
||||||
* @param deposit_cb_cls closure for @a deposit_cb
|
* @param deposit_cb_cls closure for @a deposit_cb
|
||||||
* @param limit maximum number of matching deposits to return
|
* @param limit maximum number of matching deposits to return; should
|
||||||
|
* be #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, larger values
|
||||||
|
* are not supported, smaller values would be inefficient.
|
||||||
* @return number of rows processed, 0 if none exist,
|
* @return number of rows processed, 0 if none exist,
|
||||||
* #GNUNET_SYSERR on error
|
* #GNUNET_SYSERR on error
|
||||||
*/
|
*/
|
||||||
|
@ -89,7 +89,7 @@ TALER_JSON_spec_denomination_signature (const char *field,
|
|||||||
* @return #GNUNET_OK on success, #GNUNET_SYSERR on error
|
* @return #GNUNET_OK on success, #GNUNET_SYSERR on error
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
TALER_JSON_hash (json_t *json,
|
TALER_JSON_hash (const json_t *json,
|
||||||
struct GNUNET_HashCode *hc);
|
struct GNUNET_HashCode *hc);
|
||||||
|
|
||||||
#endif /* TALER_JSON_LIB_H_ */
|
#endif /* TALER_JSON_LIB_H_ */
|
||||||
|
@ -32,7 +32,7 @@
|
|||||||
* @return #GNUNET_OK on success, #GNUNET_SYSERR on error
|
* @return #GNUNET_OK on success, #GNUNET_SYSERR on error
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
TALER_JSON_hash (json_t *json,
|
TALER_JSON_hash (const json_t *json,
|
||||||
struct GNUNET_HashCode *hc)
|
struct GNUNET_HashCode *hc)
|
||||||
{
|
{
|
||||||
char *wire_enc;
|
char *wire_enc;
|
||||||
|
@ -178,7 +178,7 @@ context_task (void *cls,
|
|||||||
rs,
|
rs,
|
||||||
ws,
|
ws,
|
||||||
&context_task,
|
&context_task,
|
||||||
cls);
|
tc);
|
||||||
GNUNET_NETWORK_fdset_destroy (rs);
|
GNUNET_NETWORK_fdset_destroy (rs);
|
||||||
GNUNET_NETWORK_fdset_destroy (ws);
|
GNUNET_NETWORK_fdset_destroy (ws);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user