aggregator clean up

This commit is contained in:
Christian Grothoff 2020-03-12 06:11:48 +01:00
parent 1896c1dfb5
commit a1db41e09a
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC

View File

@ -18,6 +18,19 @@
* @file taler-exchange-aggregator.c * @file taler-exchange-aggregator.c
* @brief Process that aggregates outgoing transactions and executes them * @brief Process that aggregates outgoing transactions and executes them
* @author Christian Grothoff * @author Christian Grothoff
*
* Note:
* It might be simpler and theoretically more performant to split up
* this process into three:
* - one that runs the 'pending' wire transfers
* - one that performs aggregation
* - one that closes (expired) reserves
*
* They would have some (minor) code duplication to load the database and wire
* plugins and account data, and this would also slightly complicate
* operations by having to launch three processes. OTOH, those processes could
* then fail independently, which might also be a good thing. In any case,
* doing this is not expected to be complicated.
*/ */
#include "platform.h" #include "platform.h"
#include <gnunet/gnunet_util_lib.h> #include <gnunet/gnunet_util_lib.h>
@ -30,7 +43,7 @@
/** /**
* Information we keep for each supported account. * Information we keep for each supported account of the exchange.
*/ */
struct WireAccount struct WireAccount
{ {
@ -70,6 +83,8 @@ struct WireAccount
/** /**
* Data we keep to #run_transfers(). There is at most * Data we keep to #run_transfers(). There is at most
* one of these around at any given point in time. * one of these around at any given point in time.
* Note that this limits parallelism, and we might want
* to revise this decision at a later point.
*/ */
struct WirePrepareData struct WirePrepareData
{ {
@ -100,6 +115,8 @@ struct WirePrepareData
/** /**
* Information about one aggregation process to be executed. There is * Information about one aggregation process to be executed. There is
* at most one of these around at any given point in time. * at most one of these around at any given point in time.
* Note that this limits parallelism, and we might want
* to revise this decision at a later point.
*/ */
struct AggregationUnit struct AggregationUnit
{ {
@ -139,7 +156,8 @@ struct AggregationUnit
unsigned long long row_id; unsigned long long row_id;
/** /**
* The current time. * The current time (which triggered the aggregation and
* defines the wire fee).
*/ */
struct GNUNET_TIME_Absolute execution_time; struct GNUNET_TIME_Absolute execution_time;
@ -149,7 +167,8 @@ struct AggregationUnit
json_t *wire; json_t *wire;
/** /**
* Wire account to be used for the preparation. * Exchange wire account to be used for the preparation and
* eventual execution of the aggregate wire transfer.
*/ */
struct WireAccount *wa; struct WireAccount *wa;
@ -164,13 +183,13 @@ struct AggregationUnit
struct TALER_BANK_PrepareHandle *ph; struct TALER_BANK_PrepareHandle *ph;
/** /**
* Array of #aggregation_limit row_ids from the * Array of #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT row_ids from the
* aggregation. * aggregation.
*/ */
unsigned long long *additional_rows; unsigned long long *additional_rows;
/** /**
* Offset specifying how many #additional_rows are in use. * Offset specifying how many @e additional_rows are in use.
*/ */
unsigned int rows_offset; unsigned int rows_offset;
@ -222,32 +241,35 @@ static struct CloseTransferContext *ctc;
static char *exchange_currency_string; static char *exchange_currency_string;
/** /**
* How many fractional digits does the currency use? * What is the smallest unit we support for wire transfers?
* We will need to round down to a multiple of this amount.
*/ */
static struct TALER_Amount currency_round_unit; static struct TALER_Amount currency_round_unit;
/** /**
* What is the base URL of this exchange? * What is the base URL of this exchange? Used in the
* wire transfer subjects to that merchants and governments
* can ask for the list of aggregated deposits.
*/ */
static char *exchange_base_url; static char *exchange_base_url;
/** /**
* The exchange's configuration (global) * The exchange's configuration.
*/ */
static struct GNUNET_CONFIGURATION_Handle *cfg; static const struct GNUNET_CONFIGURATION_Handle *cfg;
/** /**
* Our DB plugin. * Our database plugin.
*/ */
static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/** /**
* Head of list wire accounts of the exchange. * Head of list of wire accounts of the exchange.
*/ */
static struct WireAccount *wa_head; static struct WireAccount *wa_head;
/** /**
* Head of list wire accounts of the exchange. * Tail of list of wire accounts of the exchange.
*/ */
static struct WireAccount *wa_tail; static struct WireAccount *wa_tail;
@ -263,13 +285,7 @@ static struct GNUNET_SCHEDULER_Task *task;
static struct WirePrepareData *wpd; static struct WirePrepareData *wpd;
/** /**
* If we are currently aggregating transactions, information about the * Handle to the context for interacting with the bank / wire gateway.
* active aggregation is here. Otherwise, this variable is NULL.
*/
static struct AggregationUnit *au;
/**
* Handle to the context for interacting with the bank.
*/ */
static struct GNUNET_CURL_Context *ctx; static struct GNUNET_CURL_Context *ctx;
@ -296,21 +312,11 @@ static int test_mode;
/** /**
* Did #run_reserve_closures() have any work during its last run? * Did #run_reserve_closures() have any work during its last run?
* Used to detect when we should go to sleep for a while to avoid
* busy waiting.
*/ */
static int reserves_idle; static int reserves_idle;
/**
* Limit on the number of transactions we aggregate at once. Note
* that the limit must be big enough to ensure that when transactions
* of the smallest possible unit are aggregated, they do surpass the
* "tiny" threshold beyond which we never trigger a wire transaction!
*
* Note: do not change here, Postgres requires us to hard-code the
* LIMIT in the prepared statement.
*/
static unsigned int aggregation_limit =
TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT;
/** /**
* Main work function that finds and triggers transfers for reserves * Main work function that finds and triggers transfers for reserves
@ -336,15 +342,14 @@ run_aggregation (void *cls);
* Execute the wire transfers that we have committed to * Execute the wire transfers that we have committed to
* do. * do.
* *
* @param cls pointer to an `int` which we will return from main() * @param cls NULL
*/ */
static void static void
run_transfers (void *cls); run_transfers (void *cls);
/** /**
* Find the record valid at time @a now in the fee * Find the record valid at time @a now in the fee structure.
* structure.
* *
* @param wa wire transfer fee data structure to update * @param wa wire transfer fee data structure to update
* @param now timestamp to update fees to * @param now timestamp to update fees to
@ -356,7 +361,6 @@ advance_fees (struct WireAccount *wa,
{ {
struct TALER_EXCHANGEDB_AggregateFees *af; struct TALER_EXCHANGEDB_AggregateFees *af;
/* First, try to see if we have current fee information in memory */
af = wa->af; af = wa->af;
while ( (NULL != af) && while ( (NULL != af) &&
(af->end_date.abs_value_us < now.abs_value_us) ) (af->end_date.abs_value_us < now.abs_value_us) )
@ -416,8 +420,9 @@ update_fees (struct WireAccount *wa,
if (NULL != af) if (NULL != af)
return af; return af;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to find current wire transfer fees for `%s'\n", "Failed to find current wire transfer fees for `%s' at %s\n",
wa->method); wa->method,
GNUNET_STRINGS_absolute_time_to_string (now));
return NULL; return NULL;
} }
@ -435,6 +440,9 @@ find_account_by_method (const char *method)
if (0 == strcmp (method, if (0 == strcmp (method,
wa->method)) wa->method))
return wa; return wa;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No wire account known for method `%s'\n",
method);
return NULL; return NULL;
} }
@ -454,9 +462,9 @@ find_account_by_payto_uri (const char *url)
method = TALER_payto_get_method (url); method = TALER_payto_get_method (url);
if (NULL == method) if (NULL == method)
{ {
fprintf (stderr, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Invalid payto:// URL `%s'\n", "Invalid payto:// URL `%s'\n",
url); url);
return NULL; return NULL;
} }
wa = find_account_by_method (method); wa = find_account_by_method (method);
@ -496,6 +504,15 @@ add_account_cb (void *cls,
return; return;
} }
wa->method = TALER_payto_get_method (payto_uri); wa->method = TALER_payto_get_method (payto_uri);
if (NULL == wa->method)
{
GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
ai->section_name,
"PAYTO_URI",
"could not obtain wire method from URI");
GNUNET_free (wa);
return;
}
GNUNET_free (payto_uri); GNUNET_free (payto_uri);
if (GNUNET_OK != if (GNUNET_OK !=
TALER_BANK_auth_parse_cfg (cfg, TALER_BANK_auth_parse_cfg (cfg,
@ -517,21 +534,20 @@ add_account_cb (void *cls,
/** /**
* Free data stored in #au. * Free data stored in @a au, but not @a au itself (stack allocated).
*
* @param au aggreation unit to clean up
*/ */
static void static void
cleanup_au (void) cleanup_au (struct AggregationUnit *au)
{ {
if (NULL == au) GNUNET_assert (NULL != au);
return;
GNUNET_free_non_null (au->additional_rows); GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire) if (NULL != au->wire)
{
json_decref (au->wire); json_decref (au->wire);
au->wire = NULL; memset (au,
} 0,
GNUNET_free (au); sizeof (*au));
au = NULL;
} }
@ -573,12 +589,6 @@ shutdown_task (void *cls)
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL; wpd = NULL;
} }
if (NULL != au)
{
db_plugin->rollback (db_plugin->cls,
au->session);
cleanup_au ();
}
if (NULL != ctc) if (NULL != ctc)
{ {
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
@ -605,7 +615,6 @@ shutdown_task (void *cls)
GNUNET_free (wa); GNUNET_free (wa);
} }
} }
GNUNET_CONFIGURATION_destroy (cfg);
cfg = NULL; cfg = NULL;
} }
@ -643,20 +652,20 @@ parse_wirewatch_config ()
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg, GNUNET_CONFIGURATION_get_value_string (cfg,
"taler", "taler",
"currency", "CURRENCY",
&exchange_currency_string)) &exchange_currency_string))
{ {
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"taler", "taler",
"currency"); "CURRENCY");
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN) if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN)
{ {
fprintf (stderr, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Currency `%s' longer than the allowed limit of %u characters.", "Currency `%s' longer than the allowed limit of %u characters.",
exchange_currency_string, exchange_currency_string,
(unsigned int) TALER_CURRENCY_LEN); (unsigned int) TALER_CURRENCY_LEN);
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
@ -678,8 +687,8 @@ parse_wirewatch_config ()
if (NULL == if (NULL ==
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
{ {
fprintf (stderr, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to initialize DB subsystem\n"); "Failed to initialize DB subsystem\n");
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
TALER_EXCHANGEDB_find_accounts (cfg, TALER_EXCHANGEDB_find_accounts (cfg,
@ -687,8 +696,8 @@ parse_wirewatch_config ()
NULL); NULL);
if (NULL == wa_head) if (NULL == wa_head)
{ {
fprintf (stderr, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No wire accounts configured for debit!\n"); "No wire accounts configured for debit!\n");
TALER_EXCHANGEDB_plugin_unload (db_plugin); TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL; db_plugin = NULL;
return GNUNET_SYSERR; return GNUNET_SYSERR;
@ -732,7 +741,7 @@ refund_by_coin_cb (void *cls,
* Function called with details about deposits that have been made, * Function called with details about deposits that have been made,
* with the goal of executing the corresponding wire transaction. * with the goal of executing the corresponding wire transaction.
* *
* @param cls NULL * @param cls a `struct AggregationUnit`
* @param row_id identifies database entry * @param row_id identifies database entry
* @param merchant_pub public key of the merchant * @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin * @param coin_pub public key of the coin
@ -755,6 +764,7 @@ deposit_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline, struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire) const json_t *wire)
{ {
struct AggregationUnit *au = cls;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
(void) cls; (void) cls;
@ -886,7 +896,7 @@ deposit_cb (void *cls,
* Function called with details about another deposit we * Function called with details about another deposit we
* can aggregate into an existing aggregation unit. * can aggregate into an existing aggregation unit.
* *
* @param cls NULL * @param cls a `struct AggregationUnit`
* @param row_id identifies database entry * @param row_id identifies database entry
* @param merchant_pub public key of the merchant * @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin * @param coin_pub public key of the coin
@ -909,16 +919,25 @@ aggregate_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline, struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire) const json_t *wire)
{ {
struct AggregationUnit *au = cls;
struct TALER_Amount delta; struct TALER_Amount delta;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
(void) cls;
/* NOTE: potential optimization: use custom SQL API to not /* NOTE: potential optimization: use custom SQL API to not
fetch these: */ fetch these: */
(void) wire_deadline; /* checked by SQL */ (void) wire_deadline; /* checked by SQL */
(void) wire; /* must match */ (void) wire; /* must match */
GNUNET_break (0 == GNUNET_memcmp (&au->merchant_pub, GNUNET_break (0 == GNUNET_memcmp (&au->merchant_pub,
merchant_pub)); merchant_pub));
if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
{
/* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */
GNUNET_break (0);
/* Skip this one, but keep going with the overall transaction */
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
/* compute contribution of this coin after fees */ /* compute contribution of this coin after fees */
/* add to total */ /* add to total */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@ -968,16 +987,10 @@ aggregate_cb (void *cls,
au->total_amount = delta; au->total_amount = delta;
} }
if (au->rows_offset >= aggregation_limit)
{
/* Bug: we asked for at most #aggregation_limit results! */
GNUNET_break (0);
/* Skip this one, but keep going. */
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
if (NULL == au->additional_rows) if (NULL == au->additional_rows)
au->additional_rows = GNUNET_new_array (aggregation_limit, au->additional_rows = GNUNET_new_array (
unsigned long long); TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT,
unsigned long long);
/* "append" to our list of rows */ /* "append" to our list of rows */
au->additional_rows[au->rows_offset++] = row_id; au->additional_rows[au->rows_offset++] = row_id;
/* insert into aggregation tracking table */ /* insert into aggregation tracking table */
@ -990,9 +1003,6 @@ aggregate_cb (void *cls,
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs; return qs;
} }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Aggregator marks aggregated deposit %llu as DONE\n",
(unsigned long long) row_id);
qs = db_plugin->mark_deposit_done (db_plugin->cls, qs = db_plugin->mark_deposit_done (db_plugin->cls,
au->session, au->session,
row_id); row_id);
@ -1002,7 +1012,7 @@ aggregate_cb (void *cls,
return qs; return qs;
} }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Added row %llu with %s to aggregation\n", "Aggregator marked deposit %llu over %s as DONE\n",
(unsigned long long) row_id, (unsigned long long) row_id,
TALER_amount2s (&delta)); TALER_amount2s (&delta));
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
@ -1097,7 +1107,9 @@ expired_reserve_cb (void *cls,
wa = find_account_by_payto_uri (account_payto_uri); wa = find_account_by_payto_uri (account_payto_uri);
if (NULL == wa) if (NULL == wa)
{ {
GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No wire account configured to deal with target URI `%s'\n",
account_payto_uri);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return GNUNET_DB_STATUS_HARD_ERROR; return GNUNET_DB_STATUS_HARD_ERROR;
@ -1127,7 +1139,7 @@ expired_reserve_cb (void *cls,
if ( (GNUNET_SYSERR == ret) || if ( (GNUNET_SYSERR == ret) ||
(GNUNET_NO == ret) ) (GNUNET_NO == ret) )
{ {
/* Closing fee higher than remaining balance, close /* Closing fee higher than or equal to remaining balance, close
without wire transfer. */ without wire transfer. */
closing_fee = left; closing_fee = left;
GNUNET_assert (GNUNET_OK == GNUNET_assert (GNUNET_OK ==
@ -1345,6 +1357,7 @@ static void
run_aggregation (void *cls) run_aggregation (void *cls)
{ {
static unsigned int swap; static unsigned int swap;
struct AggregationUnit au_active;
struct TALER_EXCHANGEDB_Session *session; struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
const struct GNUNET_SCHEDULER_TaskContext *tc; const struct GNUNET_SCHEDULER_TaskContext *tc;
@ -1383,15 +1396,17 @@ run_aggregation (void *cls)
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
au = GNUNET_new (struct AggregationUnit); memset (&au_active,
au->session = session; 0,
sizeof (au_active));
au_active.session = session;
qs = db_plugin->get_ready_deposit (db_plugin->cls, qs = db_plugin->get_ready_deposit (db_plugin->cls,
session, session,
&deposit_cb, &deposit_cb,
au); &au_active);
if (0 >= qs) if (0 >= qs)
{ {
cleanup_au (); cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
if (GNUNET_DB_STATUS_HARD_ERROR == qs) if (GNUNET_DB_STATUS_HARD_ERROR == qs)
@ -1444,20 +1459,20 @@ run_aggregation (void *cls)
/* Now try to find other deposits to aggregate */ /* Now try to find other deposits to aggregate */
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Found ready deposit for %s, aggregating\n", "Found ready deposit for %s, aggregating\n",
TALER_B2S (&au->merchant_pub)); TALER_B2S (&au_active.merchant_pub));
qs = db_plugin->iterate_matching_deposits (db_plugin->cls, qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
session, session,
&au->h_wire, &au_active.h_wire,
&au->merchant_pub, &au_active.merchant_pub,
&aggregate_cb, &aggregate_cb,
au, &au_active,
aggregation_limit); TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT);
if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) || if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) ||
(GNUNET_YES == au->failed) ) (GNUNET_YES == au_active.failed) )
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n"); "Failed to execute deposit iteration!\n");
cleanup_au (); cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
@ -1471,6 +1486,7 @@ run_aggregation (void *cls)
"Serialization issue, trying again later!\n"); "Serialization issue, trying again later!\n");
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
cleanup_au (&au_active);
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
@ -1481,19 +1497,19 @@ run_aggregation (void *cls)
wire transfer method; Check if after rounding down, we still have wire transfer method; Check if after rounding down, we still have
an amount to transfer, and if not mark as 'tiny'. */ an amount to transfer, and if not mark as 'tiny'. */
if ( (GNUNET_OK != if ( (GNUNET_OK !=
TALER_amount_subtract (&au->final_amount, TALER_amount_subtract (&au_active.final_amount,
&au->total_amount, &au_active.total_amount,
&au->wire_fee)) || &au_active.wire_fee)) ||
(GNUNET_SYSERR == (GNUNET_SYSERR ==
TALER_amount_round_down (&au->final_amount, TALER_amount_round_down (&au_active.final_amount,
&currency_round_unit)) || &currency_round_unit)) ||
( (0 == au->final_amount.value) && ( (0 == au_active.final_amount.value) &&
(0 == au->final_amount.fraction) ) ) (0 == au_active.final_amount.fraction) ) )
{ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Aggregate value too low for transfer (%d/%s)\n", "Aggregate value too low for transfer (%d/%s)\n",
qs, qs,
TALER_amount2s (&au->final_amount)); TALER_amount2s (&au_active.final_amount));
/* Rollback ongoing transaction, as we will not use the respective /* Rollback ongoing transaction, as we will not use the respective
WTID and thus need to remove the tracking data */ WTID and thus need to remove the tracking data */
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
@ -1509,21 +1525,21 @@ run_aggregation (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n"); "Failed to start database transaction!\n");
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
cleanup_au (); cleanup_au (&au_active);
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
/* Mark transactions by row_id as minor */ /* Mark transactions by row_id as minor */
qs = db_plugin->mark_deposit_tiny (db_plugin->cls, qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
session, session,
au->row_id); au_active.row_id);
if (0 <= qs) if (0 <= qs)
{ {
for (unsigned int i = 0; i<au->rows_offset; i++) for (unsigned int i = 0; i<au_active.rows_offset; i++)
{ {
qs = db_plugin->mark_deposit_tiny (db_plugin->cls, qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
session, session,
au->additional_rows[i]); au_active.additional_rows[i]);
if (0 > qs) if (0 > qs)
break; break;
} }
@ -1534,7 +1550,7 @@ run_aggregation (void *cls)
"Serialization issue, trying again later!\n"); "Serialization issue, trying again later!\n");
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
cleanup_au (); cleanup_au (&au_active);
/* start again */ /* start again */
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
@ -1545,13 +1561,14 @@ run_aggregation (void *cls)
{ {
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
cleanup_au (); cleanup_au (&au_active);
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
/* commit */ /* commit */
(void) commit_or_warn (session); (void) commit_or_warn (session);
cleanup_au (); cleanup_au (&au_active);
/* start again */ /* start again */
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
@ -1561,34 +1578,34 @@ run_aggregation (void *cls)
{ {
char *amount_s; char *amount_s;
amount_s = TALER_amount_to_string (&au->final_amount); amount_s = TALER_amount_to_string (&au_active.final_amount);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Preparing wire transfer of %s to %s\n", "Preparing wire transfer of %s to %s\n",
amount_s, amount_s,
TALER_B2S (&au->merchant_pub)); TALER_B2S (&au_active.merchant_pub));
GNUNET_free (amount_s); GNUNET_free (amount_s);
} }
{ {
char *url; char *url;
url = TALER_JSON_wire_to_payto (au->wire); url = TALER_JSON_wire_to_payto (au_active.wire);
TALER_BANK_prepare_transfer (url, TALER_BANK_prepare_transfer (url,
&au->final_amount, &au_active.final_amount,
exchange_base_url, exchange_base_url,
&au->wtid, &au_active.wtid,
&buf, &buf,
&buf_size); &buf_size);
GNUNET_free (url); GNUNET_free (url);
} }
GNUNET_free_non_null (au->additional_rows); GNUNET_free_non_null (au_active.additional_rows);
au->additional_rows = NULL; au_active.additional_rows = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Storing %u bytes of wire prepare data\n", "Storing %u bytes of wire prepare data\n",
(unsigned int) buf_size); (unsigned int) buf_size);
/* Commit our intention to execute the wire transfer! */ /* Commit our intention to execute the wire transfer! */
qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
session, session,
au->wa->method, au_active.wa->method,
buf, buf,
buf_size); buf_size);
GNUNET_free (buf); GNUNET_free (buf);
@ -1597,12 +1614,13 @@ run_aggregation (void *cls)
if (qs >= 0) if (qs >= 0)
qs = db_plugin->store_wire_transfer_out (db_plugin->cls, qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
session, session,
au->execution_time, au_active.execution_time,
&au->wtid, &au_active.wtid,
au->wire, au_active.wire,
au->wa->section_name, au_active.wa->section_name,
&au->final_amount); &au_active.final_amount);
cleanup_au (); cleanup_au (&au_active);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs) if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@ -1912,10 +1930,9 @@ run (void *cls,
(void) args; (void) args;
(void) cfgfile; (void) cfgfile;
cfg = GNUNET_CONFIGURATION_dup (c); cfg = c;
if (GNUNET_OK != parse_wirewatch_config ()) if (GNUNET_OK != parse_wirewatch_config ())
{ {
GNUNET_CONFIGURATION_destroy (cfg);
cfg = NULL; cfg = NULL;
global_ret = 1; global_ret = 1;
return; return;
@ -1966,7 +1983,7 @@ main (int argc,
GNUNET_PROGRAM_run (argc, argv, GNUNET_PROGRAM_run (argc, argv,
"taler-exchange-aggregator", "taler-exchange-aggregator",
gettext_noop ( gettext_noop (
"background process that aggregates and executes wire transfers to merchants"), "background process that aggregates and executes wire transfers"),
options, options,
&run, NULL)) &run, NULL))
{ {