split reserve closing from main aggregation logic
This commit is contained in:
parent
a1db41e09a
commit
83631bc98f
1
src/exchange/.gitignore
vendored
1
src/exchange/.gitignore
vendored
@ -7,3 +7,4 @@ taler-exchange-httpd
|
||||
taler-exchange-wirewatch
|
||||
test_taler_exchange_wirewatch-postgres
|
||||
test_taler_exchange_httpd_home/.config/taler/account-1.json
|
||||
taler-exchange-closer
|
||||
|
@ -18,6 +18,7 @@ pkgcfg_DATA = \
|
||||
|
||||
bin_PROGRAMS = \
|
||||
taler-exchange-aggregator \
|
||||
taler-exchange-closer \
|
||||
taler-exchange-httpd \
|
||||
taler-exchange-wirewatch
|
||||
|
||||
@ -33,6 +34,19 @@ taler_exchange_aggregator_LDADD = \
|
||||
-lgnunetcurl \
|
||||
-lgnunetutil
|
||||
|
||||
|
||||
taler_exchange_closer_SOURCES = \
|
||||
taler-exchange-closer.c
|
||||
taler_exchange_closer_LDADD = \
|
||||
$(LIBGCRYPT_LIBS) \
|
||||
$(top_builddir)/src/json/libtalerjson.la \
|
||||
$(top_builddir)/src/util/libtalerutil.la \
|
||||
$(top_builddir)/src/bank-lib/libtalerbank.la \
|
||||
$(top_builddir)/src/exchangedb/libtalerexchangedb.la \
|
||||
-ljansson \
|
||||
-lgnunetcurl \
|
||||
-lgnunetutil
|
||||
|
||||
taler_exchange_wirewatch_SOURCES = \
|
||||
taler-exchange-wirewatch.c
|
||||
taler_exchange_wirewatch_LDADD = \
|
||||
|
@ -42,44 +42,6 @@
|
||||
#include "taler_bank_service.h"
|
||||
|
||||
|
||||
/**
|
||||
* Information we keep for each supported account of the exchange.
|
||||
*/
|
||||
struct WireAccount
|
||||
{
|
||||
/**
|
||||
* Accounts are kept in a DLL.
|
||||
*/
|
||||
struct WireAccount *next;
|
||||
|
||||
/**
|
||||
* Plugins are kept in a DLL.
|
||||
*/
|
||||
struct WireAccount *prev;
|
||||
|
||||
/**
|
||||
* Authentication data.
|
||||
*/
|
||||
struct TALER_BANK_AuthenticationData auth;
|
||||
|
||||
/**
|
||||
* Wire transfer fee structure.
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
/**
|
||||
* Name of the section that configures this account.
|
||||
*/
|
||||
char *section_name;
|
||||
|
||||
/**
|
||||
* Name of the wire method underlying the account.
|
||||
*/
|
||||
char *method;
|
||||
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Data we keep to #run_transfers(). There is at most
|
||||
* one of these around at any given point in time.
|
||||
@ -102,7 +64,7 @@ struct WirePrepareData
|
||||
/**
|
||||
* Wire account used for this preparation.
|
||||
*/
|
||||
struct WireAccount *wa;
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa;
|
||||
|
||||
/**
|
||||
* Row ID of the transfer.
|
||||
@ -170,7 +132,7 @@ struct AggregationUnit
|
||||
* Exchange wire account to be used for the preparation and
|
||||
* eventual execution of the aggregate wire transfer.
|
||||
*/
|
||||
struct WireAccount *wa;
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa;
|
||||
|
||||
/**
|
||||
* Database session for all of our transactions.
|
||||
@ -206,35 +168,6 @@ struct AggregationUnit
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Context we use while closing a reserve.
|
||||
*/
|
||||
struct CloseTransferContext
|
||||
{
|
||||
|
||||
/**
|
||||
* Our database session.
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_Session *session;
|
||||
|
||||
/**
|
||||
* Wire transfer method.
|
||||
*/
|
||||
char *method;
|
||||
|
||||
/**
|
||||
* Wire account used for closing the reserve.
|
||||
*/
|
||||
struct WireAccount *wa;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Active context while processing reserve closing,
|
||||
* or NULL.
|
||||
*/
|
||||
static struct CloseTransferContext *ctc;
|
||||
|
||||
/**
|
||||
* Which currency is used by this exchange?
|
||||
*/
|
||||
@ -263,16 +196,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
|
||||
*/
|
||||
static struct TALER_EXCHANGEDB_Plugin *db_plugin;
|
||||
|
||||
/**
|
||||
* Head of list of wire accounts of the exchange.
|
||||
*/
|
||||
static struct WireAccount *wa_head;
|
||||
|
||||
/**
|
||||
* Tail of list of wire accounts of the exchange.
|
||||
*/
|
||||
static struct WireAccount *wa_tail;
|
||||
|
||||
/**
|
||||
* Next task to run, if any.
|
||||
*/
|
||||
@ -310,23 +233,6 @@ static int global_ret;
|
||||
*/
|
||||
static int test_mode;
|
||||
|
||||
/**
|
||||
* Did #run_reserve_closures() have any work during its last run?
|
||||
* Used to detect when we should go to sleep for a while to avoid
|
||||
* busy waiting.
|
||||
*/
|
||||
static int reserves_idle;
|
||||
|
||||
|
||||
/**
|
||||
* Main work function that finds and triggers transfers for reserves
|
||||
* closures.
|
||||
*
|
||||
* @param cls closure
|
||||
*/
|
||||
static void
|
||||
run_reserve_closures (void *cls);
|
||||
|
||||
|
||||
/**
|
||||
* Main work function that queries the DB and aggregates transactions
|
||||
@ -348,191 +254,6 @@ static void
|
||||
run_transfers (void *cls);
|
||||
|
||||
|
||||
/**
|
||||
* Find the record valid at time @a now in the fee structure.
|
||||
*
|
||||
* @param wa wire transfer fee data structure to update
|
||||
* @param now timestamp to update fees to
|
||||
* @return fee valid at @a now, or NULL if unknown
|
||||
*/
|
||||
static struct TALER_EXCHANGEDB_AggregateFees *
|
||||
advance_fees (struct WireAccount *wa,
|
||||
struct GNUNET_TIME_Absolute now)
|
||||
{
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
af = wa->af;
|
||||
while ( (NULL != af) &&
|
||||
(af->end_date.abs_value_us < now.abs_value_us) )
|
||||
af = af->next;
|
||||
return af;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Update wire transfer fee data structure in @a wa.
|
||||
*
|
||||
* @param wa wire account data structure to update
|
||||
* @param now timestamp to update fees to
|
||||
* @param session DB session to use
|
||||
* @return fee valid at @a now, or NULL if unknown
|
||||
*/
|
||||
static struct TALER_EXCHANGEDB_AggregateFees *
|
||||
update_fees (struct WireAccount *wa,
|
||||
struct GNUNET_TIME_Absolute now,
|
||||
struct TALER_EXCHANGEDB_Session *session)
|
||||
{
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
af = advance_fees (wa,
|
||||
now);
|
||||
if (NULL != af)
|
||||
return af;
|
||||
/* Let's try to load it from disk... */
|
||||
wa->af = TALER_EXCHANGEDB_fees_read (cfg,
|
||||
wa->method);
|
||||
for (struct TALER_EXCHANGEDB_AggregateFees *p = wa->af;
|
||||
NULL != p;
|
||||
p = p->next)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Persisting fees starting at %s in database\n",
|
||||
GNUNET_STRINGS_absolute_time_to_string (p->start_date));
|
||||
qs = db_plugin->insert_wire_fee (db_plugin->cls,
|
||||
session,
|
||||
wa->method,
|
||||
p->start_date,
|
||||
p->end_date,
|
||||
&p->wire_fee,
|
||||
&p->closing_fee,
|
||||
&p->master_sig);
|
||||
if (qs < 0)
|
||||
{
|
||||
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
|
||||
TALER_EXCHANGEDB_fees_free (wa->af);
|
||||
wa->af = NULL;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
af = advance_fees (wa,
|
||||
now);
|
||||
if (NULL != af)
|
||||
return af;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to find current wire transfer fees for `%s' at %s\n",
|
||||
wa->method,
|
||||
GNUNET_STRINGS_absolute_time_to_string (now));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Find the wire plugin for the given payto:// URL
|
||||
*
|
||||
* @param method wire method we need an account for
|
||||
* @return NULL on error
|
||||
*/
|
||||
static struct WireAccount *
|
||||
find_account_by_method (const char *method)
|
||||
{
|
||||
for (struct WireAccount *wa = wa_head; NULL != wa; wa = wa->next)
|
||||
if (0 == strcmp (method,
|
||||
wa->method))
|
||||
return wa;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"No wire account known for method `%s'\n",
|
||||
method);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Find the wire plugin for the given payto:// URL
|
||||
*
|
||||
* @param url wire address we need an account for
|
||||
* @return NULL on error
|
||||
*/
|
||||
static struct WireAccount *
|
||||
find_account_by_payto_uri (const char *url)
|
||||
{
|
||||
char *method;
|
||||
struct WireAccount *wa;
|
||||
|
||||
method = TALER_payto_get_method (url);
|
||||
if (NULL == method)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Invalid payto:// URL `%s'\n",
|
||||
url);
|
||||
return NULL;
|
||||
}
|
||||
wa = find_account_by_method (method);
|
||||
GNUNET_free (method);
|
||||
return wa;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Function called with information about a wire account. Adds
|
||||
* the account to our list.
|
||||
*
|
||||
* @param cls closure, NULL
|
||||
* @param ai account information
|
||||
*/
|
||||
static void
|
||||
add_account_cb (void *cls,
|
||||
const struct TALER_EXCHANGEDB_AccountInfo *ai)
|
||||
{
|
||||
struct WireAccount *wa;
|
||||
char *payto_uri;
|
||||
|
||||
(void) cls;
|
||||
if (GNUNET_YES != ai->debit_enabled)
|
||||
return; /* not enabled for us, skip */
|
||||
wa = GNUNET_new (struct WireAccount);
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_CONFIGURATION_get_value_string (cfg,
|
||||
ai->section_name,
|
||||
"PAYTO_URI",
|
||||
&payto_uri))
|
||||
{
|
||||
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
||||
ai->section_name,
|
||||
"PAYTO_URI");
|
||||
GNUNET_free (wa);
|
||||
return;
|
||||
}
|
||||
wa->method = TALER_payto_get_method (payto_uri);
|
||||
if (NULL == wa->method)
|
||||
{
|
||||
GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
|
||||
ai->section_name,
|
||||
"PAYTO_URI",
|
||||
"could not obtain wire method from URI");
|
||||
GNUNET_free (wa);
|
||||
return;
|
||||
}
|
||||
GNUNET_free (payto_uri);
|
||||
if (GNUNET_OK !=
|
||||
TALER_BANK_auth_parse_cfg (cfg,
|
||||
ai->section_name,
|
||||
&wa->auth))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
|
||||
"Failed to load exchange account `%s'\n",
|
||||
ai->section_name);
|
||||
GNUNET_free (wa->method);
|
||||
GNUNET_free (wa);
|
||||
return;
|
||||
}
|
||||
wa->section_name = GNUNET_strdup (ai->section_name);
|
||||
GNUNET_CONTAINER_DLL_insert (wa_head,
|
||||
wa_tail,
|
||||
wa);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Free data stored in @a au, but not @a au itself (stack allocated).
|
||||
*
|
||||
@ -589,32 +310,9 @@ shutdown_task (void *cls)
|
||||
GNUNET_free (wpd);
|
||||
wpd = NULL;
|
||||
}
|
||||
if (NULL != ctc)
|
||||
{
|
||||
db_plugin->rollback (db_plugin->cls,
|
||||
ctc->session);
|
||||
GNUNET_free (ctc->method);
|
||||
GNUNET_free (ctc);
|
||||
ctc = NULL;
|
||||
}
|
||||
TALER_EXCHANGEDB_plugin_unload (db_plugin);
|
||||
db_plugin = NULL;
|
||||
|
||||
{
|
||||
struct WireAccount *wa;
|
||||
|
||||
while (NULL != (wa = wa_head))
|
||||
{
|
||||
GNUNET_CONTAINER_DLL_remove (wa_head,
|
||||
wa_tail,
|
||||
wa);
|
||||
TALER_BANK_auth_free (&wa->auth);
|
||||
TALER_EXCHANGEDB_fees_free (wa->af);
|
||||
GNUNET_free (wa->section_name);
|
||||
GNUNET_free (wa->method);
|
||||
GNUNET_free (wa);
|
||||
}
|
||||
}
|
||||
TALER_EXCHANGEDB_unload_accounts ();
|
||||
cfg = NULL;
|
||||
}
|
||||
|
||||
@ -691,10 +389,8 @@ parse_wirewatch_config ()
|
||||
"Failed to initialize DB subsystem\n");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
TALER_EXCHANGEDB_find_accounts (cfg,
|
||||
&add_account_cb,
|
||||
NULL);
|
||||
if (NULL == wa_head)
|
||||
if (GNUNET_OK !=
|
||||
TALER_EXCHANGEDB_load_accounts (cfg))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"No wire accounts configured for debit!\n");
|
||||
@ -836,7 +532,7 @@ deposit_cb (void *cls,
|
||||
char *url;
|
||||
|
||||
url = TALER_JSON_wire_to_payto (au->wire);
|
||||
au->wa = find_account_by_payto_uri (url);
|
||||
au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url);
|
||||
GNUNET_free (url);
|
||||
}
|
||||
if (NULL == au->wa)
|
||||
@ -851,7 +547,9 @@ deposit_cb (void *cls,
|
||||
{
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
af = update_fees (au->wa,
|
||||
af = TALER_EXCHANGEDB_update_fees (cfg,
|
||||
db_plugin,
|
||||
au->wa,
|
||||
au->execution_time,
|
||||
au->session);
|
||||
if (NULL == af)
|
||||
@ -1042,311 +740,6 @@ commit_or_warn (struct TALER_EXCHANGEDB_Session *session)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Closure for #expired_reserve_cb().
|
||||
*/
|
||||
struct ExpiredReserveContext
|
||||
{
|
||||
|
||||
/**
|
||||
* Database session we are using.
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_Session *session;
|
||||
|
||||
/**
|
||||
* Set to #GNUNET_YES if the transaction continues
|
||||
* asynchronously.
|
||||
*/
|
||||
int async_cont;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Function called with details about expired reserves.
|
||||
* We trigger the reserve closure by inserting the respective
|
||||
* closing record and prewire instructions into the respective
|
||||
* tables.
|
||||
*
|
||||
* @param cls a `struct ExpiredReserveContext *`
|
||||
* @param reserve_pub public key of the reserve
|
||||
* @param left amount left in the reserve
|
||||
* @param account_payto_uri information about the bank account that initially
|
||||
* caused the reserve to be created
|
||||
* @param expiration_date when did the reserve expire
|
||||
* @return transaction status code
|
||||
*/
|
||||
static enum GNUNET_DB_QueryStatus
|
||||
expired_reserve_cb (void *cls,
|
||||
const struct TALER_ReservePublicKeyP *reserve_pub,
|
||||
const struct TALER_Amount *left,
|
||||
const char *account_payto_uri,
|
||||
struct GNUNET_TIME_Absolute expiration_date)
|
||||
{
|
||||
struct ExpiredReserveContext *erc = cls;
|
||||
struct TALER_EXCHANGEDB_Session *session = erc->session;
|
||||
struct GNUNET_TIME_Absolute now;
|
||||
struct TALER_WireTransferIdentifierRawP wtid;
|
||||
struct TALER_Amount amount_without_fee;
|
||||
const struct TALER_Amount *closing_fee;
|
||||
int ret;
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
struct WireAccount *wa;
|
||||
void *buf;
|
||||
size_t buf_size;
|
||||
|
||||
/* NOTE: potential optimization: use custom SQL API to not
|
||||
fetch this: */
|
||||
GNUNET_assert (NULL == ctc);
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Processing reserve closure at %s\n",
|
||||
GNUNET_STRINGS_absolute_time_to_string (expiration_date));
|
||||
now = GNUNET_TIME_absolute_get ();
|
||||
(void) GNUNET_TIME_round_abs (&now);
|
||||
|
||||
/* lookup account we should use */
|
||||
wa = find_account_by_payto_uri (account_payto_uri);
|
||||
if (NULL == wa)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"No wire account configured to deal with target URI `%s'\n",
|
||||
account_payto_uri);
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
|
||||
/* lookup `closing_fee` from time of actual reserve expiration
|
||||
(we may be lagging behind!) */
|
||||
{
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
af = update_fees (wa,
|
||||
expiration_date,
|
||||
session);
|
||||
if (NULL == af)
|
||||
{
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
closing_fee = &af->closing_fee;
|
||||
}
|
||||
|
||||
/* calculate transfer amount */
|
||||
ret = TALER_amount_subtract (&amount_without_fee,
|
||||
left,
|
||||
closing_fee);
|
||||
if ( (GNUNET_SYSERR == ret) ||
|
||||
(GNUNET_NO == ret) )
|
||||
{
|
||||
/* Closing fee higher than or equal to remaining balance, close
|
||||
without wire transfer. */
|
||||
closing_fee = left;
|
||||
GNUNET_assert (GNUNET_OK ==
|
||||
TALER_amount_get_zero (left->currency,
|
||||
&amount_without_fee));
|
||||
}
|
||||
/* round down to enable transfer */
|
||||
if (GNUNET_SYSERR ==
|
||||
TALER_amount_round_down (&amount_without_fee,
|
||||
¤cy_round_unit))
|
||||
{
|
||||
GNUNET_break (0);
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
if ( (0 == amount_without_fee.value) &&
|
||||
(0 == amount_without_fee.fraction) )
|
||||
ret = GNUNET_NO;
|
||||
|
||||
/* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to
|
||||
be future-compatible, we use the memset + min construction */
|
||||
memset (&wtid,
|
||||
0,
|
||||
sizeof (wtid));
|
||||
memcpy (&wtid,
|
||||
reserve_pub,
|
||||
GNUNET_MIN (sizeof (wtid),
|
||||
sizeof (*reserve_pub)));
|
||||
if (GNUNET_SYSERR != ret)
|
||||
qs = db_plugin->insert_reserve_closed (db_plugin->cls,
|
||||
session,
|
||||
reserve_pub,
|
||||
now,
|
||||
account_payto_uri,
|
||||
&wtid,
|
||||
left,
|
||||
closing_fee);
|
||||
else
|
||||
qs = GNUNET_DB_STATUS_HARD_ERROR;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Closing reserve %s over %s (%d, %d)\n",
|
||||
TALER_B2S (reserve_pub),
|
||||
TALER_amount2s (left),
|
||||
ret,
|
||||
qs);
|
||||
/* Check for hard failure */
|
||||
if ( (GNUNET_SYSERR == ret) ||
|
||||
(GNUNET_DB_STATUS_HARD_ERROR == qs) )
|
||||
{
|
||||
GNUNET_break (0);
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
if ( (GNUNET_OK != ret) ||
|
||||
(GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) )
|
||||
{
|
||||
/* Reserve balance was almost zero OR soft error */
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Reserve was virtually empty, moving on\n");
|
||||
(void) commit_or_warn (session);
|
||||
erc->async_cont = GNUNET_YES;
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_transfers,
|
||||
NULL);
|
||||
return qs;
|
||||
}
|
||||
|
||||
/* success, perform wire transfer */
|
||||
ctc = GNUNET_new (struct CloseTransferContext);
|
||||
ctc->wa = wa;
|
||||
ctc->session = session;
|
||||
ctc->method = TALER_payto_get_method (account_payto_uri);
|
||||
TALER_BANK_prepare_transfer (account_payto_uri,
|
||||
&amount_without_fee,
|
||||
exchange_base_url,
|
||||
&wtid,
|
||||
&buf,
|
||||
&buf_size);
|
||||
/* Commit our intention to execute the wire transfer! */
|
||||
qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
|
||||
ctc->session,
|
||||
ctc->method,
|
||||
buf,
|
||||
buf_size);
|
||||
GNUNET_free (buf);
|
||||
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
|
||||
{
|
||||
GNUNET_break (0);
|
||||
GNUNET_free (ctc->method);
|
||||
GNUNET_free (ctc);
|
||||
ctc = NULL;
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
|
||||
{
|
||||
/* start again */
|
||||
GNUNET_free (ctc->method);
|
||||
GNUNET_free (ctc);
|
||||
ctc = NULL;
|
||||
return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
|
||||
}
|
||||
erc->async_cont = GNUNET_YES;
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_transfers,
|
||||
NULL);
|
||||
GNUNET_free (ctc->method);
|
||||
GNUNET_free (ctc);
|
||||
ctc = NULL;
|
||||
|
||||
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Main work function that finds and triggers transfers for reserves
|
||||
* closures.
|
||||
*
|
||||
* @param cls closure
|
||||
*/
|
||||
static void
|
||||
run_reserve_closures (void *cls)
|
||||
{
|
||||
struct TALER_EXCHANGEDB_Session *session;
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
const struct GNUNET_SCHEDULER_TaskContext *tc;
|
||||
struct ExpiredReserveContext erc;
|
||||
struct GNUNET_TIME_Absolute now;
|
||||
|
||||
(void) cls;
|
||||
task = NULL;
|
||||
reserves_idle = GNUNET_NO;
|
||||
tc = GNUNET_SCHEDULER_get_task_context ();
|
||||
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
|
||||
return;
|
||||
if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to obtain database session!\n");
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
}
|
||||
|
||||
if (GNUNET_OK !=
|
||||
db_plugin->start (db_plugin->cls,
|
||||
session,
|
||||
"aggregator reserve closures"))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to start database transaction!\n");
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
}
|
||||
erc.session = session;
|
||||
erc.async_cont = GNUNET_NO;
|
||||
now = GNUNET_TIME_absolute_get ();
|
||||
(void) GNUNET_TIME_round_abs (&now);
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Checking for reserves to close by date %s\n",
|
||||
GNUNET_STRINGS_absolute_time_to_string (now));
|
||||
qs = db_plugin->get_expired_reserves (db_plugin->cls,
|
||||
session,
|
||||
now,
|
||||
&expired_reserve_cb,
|
||||
&erc);
|
||||
GNUNET_assert (1 >= qs);
|
||||
switch (qs)
|
||||
{
|
||||
case GNUNET_DB_STATUS_HARD_ERROR:
|
||||
GNUNET_break (0);
|
||||
db_plugin->rollback (db_plugin->cls,
|
||||
session);
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
case GNUNET_DB_STATUS_SOFT_ERROR:
|
||||
db_plugin->rollback (db_plugin->cls,
|
||||
session);
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
return;
|
||||
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"No more idle reserves, going back to aggregation\n");
|
||||
reserves_idle = GNUNET_YES;
|
||||
db_plugin->rollback (db_plugin->cls,
|
||||
session);
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||
NULL);
|
||||
return;
|
||||
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
|
||||
(void) commit_or_warn (session);
|
||||
if (GNUNET_YES == erc.async_cont)
|
||||
break;
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Main work function that queries the DB and aggregates transactions
|
||||
* into larger wire transfers.
|
||||
@ -1356,7 +749,6 @@ run_reserve_closures (void *cls)
|
||||
static void
|
||||
run_aggregation (void *cls)
|
||||
{
|
||||
static unsigned int swap;
|
||||
struct AggregationUnit au_active;
|
||||
struct TALER_EXCHANGEDB_Session *session;
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
@ -1369,13 +761,6 @@ run_aggregation (void *cls)
|
||||
tc = GNUNET_SCHEDULER_get_task_context ();
|
||||
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
|
||||
return;
|
||||
if (0 == (++swap % 2))
|
||||
{
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
return;
|
||||
}
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Checking for ready deposits to aggregate\n");
|
||||
if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
|
||||
@ -1420,7 +805,6 @@ run_aggregation (void *cls)
|
||||
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
|
||||
{
|
||||
/* should re-try immediately */
|
||||
swap--; /* do not count failed attempts */
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
|
||||
NULL);
|
||||
@ -1428,23 +812,12 @@ run_aggregation (void *cls)
|
||||
}
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"No more ready deposits, going to sleep\n");
|
||||
if ( (GNUNET_YES == test_mode) &&
|
||||
(swap >= 2) )
|
||||
if (GNUNET_YES == test_mode)
|
||||
{
|
||||
/* in test mode, shutdown if we end up being idle */
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
}
|
||||
else
|
||||
{
|
||||
if ( (GNUNET_NO == reserves_idle) ||
|
||||
(GNUNET_YES == test_mode) )
|
||||
{
|
||||
/* Possibly more to on reserves, go for it immediately */
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* nothing to do, sleep for a minute and try again */
|
||||
GNUNET_assert (NULL == task);
|
||||
@ -1452,7 +825,6 @@ run_aggregation (void *cls)
|
||||
&run_aggregation,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -1791,14 +1163,14 @@ wire_prepare_cb (void *cls,
|
||||
const char *buf,
|
||||
size_t buf_size)
|
||||
{
|
||||
struct WireAccount *wa;
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa;
|
||||
|
||||
(void) cls;
|
||||
wpd->row_id = rowid;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Starting wire transfer %llu\n",
|
||||
(unsigned long long) rowid);
|
||||
wpd->wa = find_account_by_method (wire_method);
|
||||
wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method);
|
||||
if (NULL == wpd->wa)
|
||||
{
|
||||
/* Should really never happen here, as when we get
|
||||
|
601
src/exchange/taler-exchange-closer.c
Normal file
601
src/exchange/taler-exchange-closer.c
Normal file
@ -0,0 +1,601 @@
|
||||
/*
|
||||
This file is part of TALER
|
||||
Copyright (C) 2016-2020 Taler Systems SA
|
||||
|
||||
TALER is free software; you can redistribute it and/or modify it under the
|
||||
terms of the GNU Affero General Public License as published by the Free Software
|
||||
Foundation; either version 3, or (at your option) any later version.
|
||||
|
||||
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License along with
|
||||
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file taler-exchange-closer.c
|
||||
* @brief Process that closes expired reserves
|
||||
* @author Christian Grothoff
|
||||
*/
|
||||
#include "platform.h"
|
||||
#include <gnunet/gnunet_util_lib.h>
|
||||
#include <jansson.h>
|
||||
#include <pthread.h>
|
||||
#include "taler_exchangedb_lib.h"
|
||||
#include "taler_exchangedb_plugin.h"
|
||||
#include "taler_json_lib.h"
|
||||
#include "taler_bank_service.h"
|
||||
|
||||
|
||||
/**
|
||||
* Which currency is used by this exchange?
|
||||
*/
|
||||
static char *exchange_currency_string;
|
||||
|
||||
/**
|
||||
* What is the smallest unit we support for wire transfers?
|
||||
* We will need to round down to a multiple of this amount.
|
||||
*/
|
||||
static struct TALER_Amount currency_round_unit;
|
||||
|
||||
/**
|
||||
* What is the base URL of this exchange? Used in the
|
||||
* wire transfer subjects to that merchants and governments
|
||||
* can ask for the list of aggregated deposits.
|
||||
*/
|
||||
static char *exchange_base_url;
|
||||
|
||||
/**
|
||||
* The exchange's configuration.
|
||||
*/
|
||||
static const struct GNUNET_CONFIGURATION_Handle *cfg;
|
||||
|
||||
/**
|
||||
* Our database plugin.
|
||||
*/
|
||||
static struct TALER_EXCHANGEDB_Plugin *db_plugin;
|
||||
|
||||
/**
|
||||
* Next task to run, if any.
|
||||
*/
|
||||
static struct GNUNET_SCHEDULER_Task *task;
|
||||
|
||||
/**
|
||||
* How long should we sleep when idle before trying to find more work?
|
||||
*/
|
||||
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
|
||||
|
||||
/**
|
||||
* Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
|
||||
* on serious errors.
|
||||
*/
|
||||
static int global_ret;
|
||||
|
||||
/**
|
||||
* #GNUNET_YES if we are in test mode and should exit when idle.
|
||||
*/
|
||||
static int test_mode;
|
||||
|
||||
|
||||
/**
|
||||
* Main work function that finds and triggers transfers for reserves
|
||||
* closures.
|
||||
*
|
||||
* @param cls closure
|
||||
*/
|
||||
static void
|
||||
run_reserve_closures (void *cls);
|
||||
|
||||
|
||||
/**
|
||||
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
|
||||
*
|
||||
* @param cls closure
|
||||
*/
|
||||
static void
|
||||
shutdown_task (void *cls)
|
||||
{
|
||||
(void) cls;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Running shutdown\n");
|
||||
if (NULL != task)
|
||||
{
|
||||
GNUNET_SCHEDULER_cancel (task);
|
||||
task = NULL;
|
||||
}
|
||||
TALER_EXCHANGEDB_plugin_unload (db_plugin);
|
||||
db_plugin = NULL;
|
||||
TALER_EXCHANGEDB_unload_accounts ();
|
||||
cfg = NULL;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Parse the configuration for wirewatch.
|
||||
*
|
||||
* @return #GNUNET_OK on success
|
||||
*/
|
||||
static int
|
||||
parse_wirewatch_config ()
|
||||
{
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_CONFIGURATION_get_value_string (cfg,
|
||||
"exchange",
|
||||
"BASE_URL",
|
||||
&exchange_base_url))
|
||||
{
|
||||
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
||||
"exchange",
|
||||
"BASE_URL");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_CONFIGURATION_get_value_time (cfg,
|
||||
"exchange",
|
||||
"AGGREGATOR_IDLE_SLEEP_INTERVAL",
|
||||
&aggregator_idle_sleep_interval))
|
||||
{
|
||||
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
||||
"exchange",
|
||||
"AGGREGATOR_IDLE_SLEEP_INTERVAL");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_CONFIGURATION_get_value_string (cfg,
|
||||
"taler",
|
||||
"CURRENCY",
|
||||
&exchange_currency_string))
|
||||
{
|
||||
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
||||
"taler",
|
||||
"CURRENCY");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
if (strlen (exchange_currency_string) >= TALER_CURRENCY_LEN)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Currency `%s' longer than the allowed limit of %u characters.",
|
||||
exchange_currency_string,
|
||||
(unsigned int) TALER_CURRENCY_LEN);
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
|
||||
if ( (GNUNET_OK !=
|
||||
TALER_config_get_amount (cfg,
|
||||
"taler",
|
||||
"CURRENCY_ROUND_UNIT",
|
||||
¤cy_round_unit)) ||
|
||||
(0 != strcasecmp (exchange_currency_string,
|
||||
currency_round_unit.currency)) ||
|
||||
( (0 != currency_round_unit.fraction) &&
|
||||
(0 != currency_round_unit.value) ) )
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Invalid value specified in section `TALER' under `CURRENCY_ROUND_UNIT'\n");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
|
||||
if (NULL ==
|
||||
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to initialize DB subsystem\n");
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
if (GNUNET_OK !=
|
||||
TALER_EXCHANGEDB_load_accounts (cfg))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"No wire accounts configured for debit!\n");
|
||||
TALER_EXCHANGEDB_plugin_unload (db_plugin);
|
||||
db_plugin = NULL;
|
||||
return GNUNET_SYSERR;
|
||||
}
|
||||
return GNUNET_OK;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Perform a database commit. If it fails, print a warning.
|
||||
*
|
||||
* @param session session to perform the commit for.
|
||||
* @return status of commit
|
||||
*/
|
||||
static enum GNUNET_DB_QueryStatus
|
||||
commit_or_warn (struct TALER_EXCHANGEDB_Session *session)
|
||||
{
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
|
||||
qs = db_plugin->commit (db_plugin->cls,
|
||||
session);
|
||||
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
|
||||
return qs;
|
||||
GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
|
||||
? GNUNET_ERROR_TYPE_INFO
|
||||
: GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to commit database transaction!\n");
|
||||
return qs;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Closure for #expired_reserve_cb().
|
||||
*/
|
||||
struct ExpiredReserveContext
|
||||
{
|
||||
|
||||
/**
|
||||
* Database session we are using.
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_Session *session;
|
||||
|
||||
/**
|
||||
* Set to #GNUNET_YES if the transaction continues
|
||||
* asynchronously.
|
||||
*/
|
||||
int async_cont;
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Function called with details about expired reserves.
|
||||
* We trigger the reserve closure by inserting the respective
|
||||
* closing record and prewire instructions into the respective
|
||||
* tables.
|
||||
*
|
||||
* @param cls a `struct ExpiredReserveContext *`
|
||||
* @param reserve_pub public key of the reserve
|
||||
* @param left amount left in the reserve
|
||||
* @param account_payto_uri information about the bank account that initially
|
||||
* caused the reserve to be created
|
||||
* @param expiration_date when did the reserve expire
|
||||
* @return transaction status code
|
||||
*/
|
||||
static enum GNUNET_DB_QueryStatus
|
||||
expired_reserve_cb (void *cls,
|
||||
const struct TALER_ReservePublicKeyP *reserve_pub,
|
||||
const struct TALER_Amount *left,
|
||||
const char *account_payto_uri,
|
||||
struct GNUNET_TIME_Absolute expiration_date)
|
||||
{
|
||||
struct ExpiredReserveContext *erc = cls;
|
||||
struct TALER_EXCHANGEDB_Session *session = erc->session;
|
||||
struct GNUNET_TIME_Absolute now;
|
||||
struct TALER_WireTransferIdentifierRawP wtid;
|
||||
struct TALER_Amount amount_without_fee;
|
||||
const struct TALER_Amount *closing_fee;
|
||||
int ret;
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa;
|
||||
|
||||
/* NOTE: potential optimization: use custom SQL API to not
|
||||
fetch this: */
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Processing reserve closure at %s\n",
|
||||
GNUNET_STRINGS_absolute_time_to_string (expiration_date));
|
||||
now = GNUNET_TIME_absolute_get ();
|
||||
(void) GNUNET_TIME_round_abs (&now);
|
||||
|
||||
/* lookup account we should use */
|
||||
wa = TALER_EXCHANGEDB_find_account_by_payto_uri (account_payto_uri);
|
||||
if (NULL == wa)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"No wire account configured to deal with target URI `%s'\n",
|
||||
account_payto_uri);
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
|
||||
/* lookup `closing_fee` from time of actual reserve expiration
|
||||
(we may be lagging behind!) */
|
||||
{
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
af = TALER_EXCHANGEDB_update_fees (cfg,
|
||||
db_plugin,
|
||||
wa,
|
||||
expiration_date,
|
||||
session);
|
||||
if (NULL == af)
|
||||
{
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
closing_fee = &af->closing_fee;
|
||||
}
|
||||
|
||||
/* calculate transfer amount */
|
||||
ret = TALER_amount_subtract (&amount_without_fee,
|
||||
left,
|
||||
closing_fee);
|
||||
if ( (GNUNET_SYSERR == ret) ||
|
||||
(GNUNET_NO == ret) )
|
||||
{
|
||||
/* Closing fee higher than or equal to remaining balance, close
|
||||
without wire transfer. */
|
||||
closing_fee = left;
|
||||
GNUNET_assert (GNUNET_OK ==
|
||||
TALER_amount_get_zero (left->currency,
|
||||
&amount_without_fee));
|
||||
}
|
||||
/* round down to enable transfer */
|
||||
if (GNUNET_SYSERR ==
|
||||
TALER_amount_round_down (&amount_without_fee,
|
||||
¤cy_round_unit))
|
||||
{
|
||||
GNUNET_break (0);
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
if ( (0 == amount_without_fee.value) &&
|
||||
(0 == amount_without_fee.fraction) )
|
||||
ret = GNUNET_NO;
|
||||
|
||||
/* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to
|
||||
be future-compatible, we use the memset + min construction */
|
||||
memset (&wtid,
|
||||
0,
|
||||
sizeof (wtid));
|
||||
memcpy (&wtid,
|
||||
reserve_pub,
|
||||
GNUNET_MIN (sizeof (wtid),
|
||||
sizeof (*reserve_pub)));
|
||||
if (GNUNET_SYSERR != ret)
|
||||
qs = db_plugin->insert_reserve_closed (db_plugin->cls,
|
||||
session,
|
||||
reserve_pub,
|
||||
now,
|
||||
account_payto_uri,
|
||||
&wtid,
|
||||
left,
|
||||
closing_fee);
|
||||
else
|
||||
qs = GNUNET_DB_STATUS_HARD_ERROR;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Closing reserve %s over %s (%d, %d)\n",
|
||||
TALER_B2S (reserve_pub),
|
||||
TALER_amount2s (left),
|
||||
ret,
|
||||
qs);
|
||||
/* Check for hard failure */
|
||||
if ( (GNUNET_SYSERR == ret) ||
|
||||
(GNUNET_DB_STATUS_HARD_ERROR == qs) )
|
||||
{
|
||||
GNUNET_break (0);
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
if ( (GNUNET_OK != ret) ||
|
||||
(GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) )
|
||||
{
|
||||
/* Reserve balance was almost zero OR soft error */
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Reserve was virtually empty, moving on\n");
|
||||
(void) commit_or_warn (session);
|
||||
erc->async_cont = GNUNET_YES;
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
return qs;
|
||||
}
|
||||
|
||||
/* success, perform wire transfer */
|
||||
{
|
||||
char *method;
|
||||
void *buf;
|
||||
size_t buf_size;
|
||||
|
||||
method = TALER_payto_get_method (account_payto_uri);
|
||||
TALER_BANK_prepare_transfer (account_payto_uri,
|
||||
&amount_without_fee,
|
||||
exchange_base_url,
|
||||
&wtid,
|
||||
&buf,
|
||||
&buf_size);
|
||||
/* Commit our intention to execute the wire transfer! */
|
||||
qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
|
||||
session,
|
||||
method,
|
||||
buf,
|
||||
buf_size);
|
||||
GNUNET_free (buf);
|
||||
GNUNET_free (method);
|
||||
}
|
||||
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
|
||||
{
|
||||
GNUNET_break (0);
|
||||
return GNUNET_DB_STATUS_HARD_ERROR;
|
||||
}
|
||||
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
|
||||
{
|
||||
/* start again */
|
||||
return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
|
||||
}
|
||||
erc->async_cont = GNUNET_YES;
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Main work function that finds and triggers transfers for reserves
|
||||
* closures.
|
||||
*
|
||||
* @param cls closure
|
||||
*/
|
||||
static void
|
||||
run_reserve_closures (void *cls)
|
||||
{
|
||||
struct TALER_EXCHANGEDB_Session *session;
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
const struct GNUNET_SCHEDULER_TaskContext *tc;
|
||||
struct ExpiredReserveContext erc;
|
||||
struct GNUNET_TIME_Absolute now;
|
||||
|
||||
(void) cls;
|
||||
task = NULL;
|
||||
tc = GNUNET_SCHEDULER_get_task_context ();
|
||||
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
|
||||
return;
|
||||
if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to obtain database session!\n");
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
}
|
||||
|
||||
if (GNUNET_OK !=
|
||||
db_plugin->start (db_plugin->cls,
|
||||
session,
|
||||
"aggregator reserve closures"))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to start database transaction!\n");
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
}
|
||||
erc.session = session;
|
||||
erc.async_cont = GNUNET_NO;
|
||||
now = GNUNET_TIME_absolute_get ();
|
||||
(void) GNUNET_TIME_round_abs (&now);
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Checking for reserves to close by date %s\n",
|
||||
GNUNET_STRINGS_absolute_time_to_string (now));
|
||||
qs = db_plugin->get_expired_reserves (db_plugin->cls,
|
||||
session,
|
||||
now,
|
||||
&expired_reserve_cb,
|
||||
&erc);
|
||||
GNUNET_assert (1 >= qs);
|
||||
switch (qs)
|
||||
{
|
||||
case GNUNET_DB_STATUS_HARD_ERROR:
|
||||
GNUNET_break (0);
|
||||
db_plugin->rollback (db_plugin->cls,
|
||||
session);
|
||||
global_ret = GNUNET_SYSERR;
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
return;
|
||||
case GNUNET_DB_STATUS_SOFT_ERROR:
|
||||
db_plugin->rollback (db_plugin->cls,
|
||||
session);
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
return;
|
||||
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"No more idle reserves, going back to aggregation\n");
|
||||
db_plugin->rollback (db_plugin->cls,
|
||||
session);
|
||||
GNUNET_assert (NULL == task);
|
||||
if (GNUNET_YES == test_mode)
|
||||
{
|
||||
GNUNET_SCHEDULER_shutdown ();
|
||||
}
|
||||
else
|
||||
{
|
||||
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
|
||||
&run_reserve_closures,
|
||||
NULL);
|
||||
}
|
||||
return;
|
||||
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
|
||||
(void) commit_or_warn (session);
|
||||
if (GNUNET_YES == erc.async_cont)
|
||||
break;
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* First task.
|
||||
*
|
||||
* @param cls closure, NULL
|
||||
* @param args remaining command-line arguments
|
||||
* @param cfgfile name of the configuration file used (for saving, can be NULL!)
|
||||
* @param c configuration
|
||||
*/
|
||||
static void
|
||||
run (void *cls,
|
||||
char *const *args,
|
||||
const char *cfgfile,
|
||||
const struct GNUNET_CONFIGURATION_Handle *c)
|
||||
{
|
||||
(void) cls;
|
||||
(void) args;
|
||||
(void) cfgfile;
|
||||
|
||||
cfg = c;
|
||||
if (GNUNET_OK != parse_wirewatch_config ())
|
||||
{
|
||||
cfg = NULL;
|
||||
global_ret = 1;
|
||||
return;
|
||||
}
|
||||
GNUNET_assert (NULL == task);
|
||||
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
|
||||
NULL);
|
||||
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
|
||||
cls);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The main function of the taler-exchange-closer.
|
||||
*
|
||||
* @param argc number of arguments from the command line
|
||||
* @param argv command line arguments
|
||||
* @return 0 ok, 1 on error
|
||||
*/
|
||||
int
|
||||
main (int argc,
|
||||
char *const *argv)
|
||||
{
|
||||
struct GNUNET_GETOPT_CommandLineOption options[] = {
|
||||
GNUNET_GETOPT_option_timetravel ('T',
|
||||
"timetravel"),
|
||||
GNUNET_GETOPT_option_flag ('t',
|
||||
"test",
|
||||
"run in test mode and exit when idle",
|
||||
&test_mode),
|
||||
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
|
||||
GNUNET_GETOPT_OPTION_END
|
||||
};
|
||||
|
||||
if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv,
|
||||
&argc, &argv))
|
||||
return 2;
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_PROGRAM_run (argc, argv,
|
||||
"taler-exchange-closer",
|
||||
gettext_noop (
|
||||
"background process that closes expired reserves"),
|
||||
options,
|
||||
&run, NULL))
|
||||
{
|
||||
GNUNET_free ((void *) argv);
|
||||
return 1;
|
||||
}
|
||||
GNUNET_free ((void *) argv);
|
||||
return global_ret;
|
||||
}
|
||||
|
||||
|
||||
/* end of taler-exchange-closer.c */
|
@ -56,11 +56,10 @@ libtalerexchangedb_la_SOURCES = \
|
||||
exchangedb_plugin.c \
|
||||
exchangedb_signkeys.c \
|
||||
exchangedb_transactions.c
|
||||
|
||||
libtalerexchangedb_la_LIBADD = \
|
||||
$(top_builddir)/src/bank-lib/libtalerbank.la \
|
||||
$(top_builddir)/src/util/libtalerutil.la \
|
||||
-lgnunetutil $(XLIB)
|
||||
|
||||
libtalerexchangedb_la_LDFLAGS = \
|
||||
$(POSTGRESQL_LDFLAGS) \
|
||||
-version-info 1:0:0 \
|
||||
|
@ -22,6 +22,17 @@
|
||||
#include "taler_exchangedb_lib.h"
|
||||
|
||||
|
||||
/**
|
||||
* Head of list of wire accounts of the exchange.
|
||||
*/
|
||||
static struct TALER_EXCHANGEDB_WireAccount *wa_head;
|
||||
|
||||
/**
|
||||
* Tail of list of wire accounts of the exchange.
|
||||
*/
|
||||
static struct TALER_EXCHANGEDB_WireAccount *wa_tail;
|
||||
|
||||
|
||||
/**
|
||||
* Closure of #check_for_account.
|
||||
*/
|
||||
@ -141,4 +152,154 @@ TALER_EXCHANGEDB_find_accounts (const struct GNUNET_CONFIGURATION_Handle *cfg,
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Find the wire plugin for the given payto:// URL
|
||||
*
|
||||
* @param method wire method we need an account for
|
||||
* @return NULL on error
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_WireAccount *
|
||||
TALER_EXCHANGEDB_find_account_by_method (const char *method)
|
||||
{
|
||||
for (struct TALER_EXCHANGEDB_WireAccount *wa = wa_head; NULL != wa; wa =
|
||||
wa->next)
|
||||
if (0 == strcmp (method,
|
||||
wa->method))
|
||||
return wa;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"No wire account known for method `%s'\n",
|
||||
method);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Find the wire plugin for the given payto:// URL
|
||||
*
|
||||
* @param url wire address we need an account for
|
||||
* @return NULL on error
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_WireAccount *
|
||||
TALER_EXCHANGEDB_find_account_by_payto_uri (const char *url)
|
||||
{
|
||||
char *method;
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa;
|
||||
|
||||
method = TALER_payto_get_method (url);
|
||||
if (NULL == method)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Invalid payto:// URL `%s'\n",
|
||||
url);
|
||||
return NULL;
|
||||
}
|
||||
wa = TALER_EXCHANGEDB_find_account_by_method (method);
|
||||
GNUNET_free (method);
|
||||
return wa;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Function called with information about a wire account. Adds
|
||||
* the account to our list.
|
||||
*
|
||||
* @param cls closure, a `struct GNUNET_CONFIGURATION_Handle`
|
||||
* @param ai account information
|
||||
*/
|
||||
static void
|
||||
add_account_cb (void *cls,
|
||||
const struct TALER_EXCHANGEDB_AccountInfo *ai)
|
||||
{
|
||||
const struct GNUNET_CONFIGURATION_Handle *cfg = cls;
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa;
|
||||
char *payto_uri;
|
||||
|
||||
(void) cls;
|
||||
if (GNUNET_YES != ai->debit_enabled)
|
||||
return; /* not enabled for us, skip */
|
||||
wa = GNUNET_new (struct TALER_EXCHANGEDB_WireAccount);
|
||||
if (GNUNET_OK !=
|
||||
GNUNET_CONFIGURATION_get_value_string (cfg,
|
||||
ai->section_name,
|
||||
"PAYTO_URI",
|
||||
&payto_uri))
|
||||
{
|
||||
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
|
||||
ai->section_name,
|
||||
"PAYTO_URI");
|
||||
GNUNET_free (wa);
|
||||
return;
|
||||
}
|
||||
wa->method = TALER_payto_get_method (payto_uri);
|
||||
if (NULL == wa->method)
|
||||
{
|
||||
GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
|
||||
ai->section_name,
|
||||
"PAYTO_URI",
|
||||
"could not obtain wire method from URI");
|
||||
GNUNET_free (wa);
|
||||
return;
|
||||
}
|
||||
GNUNET_free (payto_uri);
|
||||
if (GNUNET_OK !=
|
||||
TALER_BANK_auth_parse_cfg (cfg,
|
||||
ai->section_name,
|
||||
&wa->auth))
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
|
||||
"Failed to load exchange account `%s'\n",
|
||||
ai->section_name);
|
||||
GNUNET_free (wa->method);
|
||||
GNUNET_free (wa);
|
||||
return;
|
||||
}
|
||||
wa->section_name = GNUNET_strdup (ai->section_name);
|
||||
GNUNET_CONTAINER_DLL_insert (wa_head,
|
||||
wa_tail,
|
||||
wa);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Load account information opf the exchange from
|
||||
* @a cfg.
|
||||
*
|
||||
* @param cfg configuration to load from
|
||||
* @return #GNUNET_OK on success, #GNUNET_NO if no accounts are configured
|
||||
*/
|
||||
int
|
||||
TALER_EXCHANGEDB_load_accounts (const struct GNUNET_CONFIGURATION_Handle *cfg)
|
||||
{
|
||||
TALER_EXCHANGEDB_find_accounts (cfg,
|
||||
&add_account_cb,
|
||||
(void *) cfg);
|
||||
if (NULL == wa_head)
|
||||
return GNUNET_NO;
|
||||
return GNUNET_OK;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Free resources allocated by
|
||||
* #TALER_EXCHANGEDB_load_accounts().
|
||||
*/
|
||||
void
|
||||
TALER_EXCHANGEDB_unload_accounts (void)
|
||||
{
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa;
|
||||
|
||||
while (NULL != (wa = wa_head))
|
||||
{
|
||||
GNUNET_CONTAINER_DLL_remove (wa_head,
|
||||
wa_tail,
|
||||
wa);
|
||||
TALER_BANK_auth_free (&wa->auth);
|
||||
TALER_EXCHANGEDB_fees_free (wa->af);
|
||||
GNUNET_free (wa->section_name);
|
||||
GNUNET_free (wa->method);
|
||||
GNUNET_free (wa);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* end of exchangedb_accounts.c */
|
||||
|
@ -326,4 +326,87 @@ TALER_EXCHANGEDB_fees_free (struct TALER_EXCHANGEDB_AggregateFees *af)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Find the record valid at time @a now in the fee structure.
|
||||
*
|
||||
* @param wa wire transfer fee data structure to update
|
||||
* @param now timestamp to update fees to
|
||||
* @return fee valid at @a now, or NULL if unknown
|
||||
*/
|
||||
static struct TALER_EXCHANGEDB_AggregateFees *
|
||||
advance_fees (struct TALER_EXCHANGEDB_WireAccount *wa,
|
||||
struct GNUNET_TIME_Absolute now)
|
||||
{
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
af = wa->af;
|
||||
while ( (NULL != af) &&
|
||||
(af->end_date.abs_value_us < now.abs_value_us) )
|
||||
af = af->next;
|
||||
return af;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Update wire transfer fee data structure in @a wa.
|
||||
*
|
||||
* @param cfg configuration to use
|
||||
* @param db_plugin database plugin to use
|
||||
* @param wa wire account data structure to update
|
||||
* @param now timestamp to update fees to
|
||||
* @param session DB session to use
|
||||
* @return fee valid at @a now, or NULL if unknown
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_AggregateFees *
|
||||
TALER_EXCHANGEDB_update_fees (const struct GNUNET_CONFIGURATION_Handle *cfg,
|
||||
struct TALER_EXCHANGEDB_Plugin *db_plugin,
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa,
|
||||
struct GNUNET_TIME_Absolute now,
|
||||
struct TALER_EXCHANGEDB_Session *session)
|
||||
{
|
||||
enum GNUNET_DB_QueryStatus qs;
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
af = advance_fees (wa,
|
||||
now);
|
||||
if (NULL != af)
|
||||
return af;
|
||||
/* Let's try to load it from disk... */
|
||||
wa->af = TALER_EXCHANGEDB_fees_read (cfg,
|
||||
wa->method);
|
||||
for (struct TALER_EXCHANGEDB_AggregateFees *p = wa->af;
|
||||
NULL != p;
|
||||
p = p->next)
|
||||
{
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
|
||||
"Persisting fees starting at %s in database\n",
|
||||
GNUNET_STRINGS_absolute_time_to_string (p->start_date));
|
||||
qs = db_plugin->insert_wire_fee (db_plugin->cls,
|
||||
session,
|
||||
wa->method,
|
||||
p->start_date,
|
||||
p->end_date,
|
||||
&p->wire_fee,
|
||||
&p->closing_fee,
|
||||
&p->master_sig);
|
||||
if (qs < 0)
|
||||
{
|
||||
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
|
||||
TALER_EXCHANGEDB_fees_free (wa->af);
|
||||
wa->af = NULL;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
af = advance_fees (wa,
|
||||
now);
|
||||
if (NULL != af)
|
||||
return af;
|
||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
|
||||
"Failed to find current wire transfer fees for `%s' at %s\n",
|
||||
wa->method,
|
||||
GNUNET_STRINGS_absolute_time_to_string (now));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/* end of exchangedb_fees.c */
|
||||
|
@ -25,6 +25,7 @@
|
||||
|
||||
#include "taler_signatures.h"
|
||||
#include "taler_exchangedb_plugin.h"
|
||||
#include "taler_bank_service.h"
|
||||
|
||||
/**
|
||||
* Subdirectroy under the exchange's base directory which contains
|
||||
@ -460,4 +461,104 @@ TALER_EXCHANGEDB_calculate_transaction_list_totals (
|
||||
struct TALER_Amount *ret);
|
||||
|
||||
|
||||
/* ***************** convenience functions ******** */
|
||||
|
||||
/**
|
||||
* Information we keep for each supported account of the exchange.
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_WireAccount
|
||||
{
|
||||
/**
|
||||
* Accounts are kept in a DLL.
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_WireAccount *next;
|
||||
|
||||
/**
|
||||
* Plugins are kept in a DLL.
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_WireAccount *prev;
|
||||
|
||||
/**
|
||||
* Authentication data.
|
||||
*/
|
||||
struct TALER_BANK_AuthenticationData auth;
|
||||
|
||||
/**
|
||||
* Wire transfer fee structure.
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_AggregateFees *af;
|
||||
|
||||
/**
|
||||
* Name of the section that configures this account.
|
||||
*/
|
||||
char *section_name;
|
||||
|
||||
/**
|
||||
* Name of the wire method underlying the account.
|
||||
*/
|
||||
char *method;
|
||||
|
||||
};
|
||||
|
||||
|
||||
/**
|
||||
* Update wire transfer fee data structure in @a wa.
|
||||
*
|
||||
* @param cfg configuration to use
|
||||
* @param db_plugin database plugin to use
|
||||
* @param wa wire account data structure to update
|
||||
* @param now timestamp to update fees to
|
||||
* @param session DB session to use
|
||||
* @return fee valid at @a now, or NULL if unknown
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_AggregateFees *
|
||||
TALER_EXCHANGEDB_update_fees (const struct GNUNET_CONFIGURATION_Handle *cfg,
|
||||
struct TALER_EXCHANGEDB_Plugin *db_plugin,
|
||||
struct TALER_EXCHANGEDB_WireAccount *wa,
|
||||
struct GNUNET_TIME_Absolute now,
|
||||
struct TALER_EXCHANGEDB_Session *session);
|
||||
|
||||
|
||||
/**
|
||||
* Find the wire plugin for the given payto:// URL.
|
||||
* Only useful after the accounts have been loaded
|
||||
* using #TALER_EXCHANGEDB_load_accounts().
|
||||
*
|
||||
* @param method wire method we need an account for
|
||||
* @return NULL on error
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_WireAccount *
|
||||
TALER_EXCHANGEDB_find_account_by_method (const char *method);
|
||||
|
||||
|
||||
/**
|
||||
* Find the wire plugin for the given payto:// URL
|
||||
* Only useful after the accounts have been loaded
|
||||
* using #TALER_EXCHANGEDB_load_accounts().
|
||||
*
|
||||
* @param url wire address we need an account for
|
||||
* @return NULL on error
|
||||
*/
|
||||
struct TALER_EXCHANGEDB_WireAccount *
|
||||
TALER_EXCHANGEDB_find_account_by_payto_uri (const char *url);
|
||||
|
||||
|
||||
/**
|
||||
* Load account information opf the exchange from
|
||||
* @a cfg.
|
||||
*
|
||||
* @param cfg configuration to load from
|
||||
* @return #GNUNET_OK on success, #GNUNET_NO if no accounts are configured
|
||||
*/
|
||||
int
|
||||
TALER_EXCHANGEDB_load_accounts (const struct GNUNET_CONFIGURATION_Handle *cfg);
|
||||
|
||||
|
||||
/**
|
||||
* Free resources allocated by
|
||||
* #TALER_EXCHANGEDB_load_accounts().
|
||||
*/
|
||||
void
|
||||
TALER_EXCHANGEDB_unload_accounts (void);
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user