address 'shutdown' TODO in taler-exchange-aggregator

This commit is contained in:
Christian Grothoff 2016-04-06 11:24:33 +02:00
parent 4977a3eb8f
commit 07541d319c

View File

@ -18,9 +18,6 @@
* @file taler-exchange-aggregator.c * @file taler-exchange-aggregator.c
* @brief Process that aggregates outgoing transactions and executes them * @brief Process that aggregates outgoing transactions and executes them
* @author Christian Grothoff * @author Christian Grothoff
*
* TODO:
* - handle shutdown more nicely (call 'cancel' method on wire transfers)
*/ */
#include "platform.h" #include "platform.h"
#include <gnunet/gnunet_util_lib.h> #include <gnunet/gnunet_util_lib.h>
@ -31,6 +28,102 @@
#include "taler_json_lib.h" #include "taler_json_lib.h"
#include "taler_wire_lib.h" #include "taler_wire_lib.h"
/**
* Data we keep to #run_transfers(). There is at most
* one of these around at any given point in time.
*/
struct WirePrepareData
{
/**
* Database session for all of our transactions.
*/
struct TALER_EXCHANGEDB_Session *session;
/**
* Wire execution handle.
*/
struct TALER_WIRE_ExecuteHandle *eh;
/**
* Row ID of the transfer.
*/
unsigned long long row_id;
};
/**
* Information about one aggregation process to be executed. There is
* at most one of these around at any given point in time.
*/
struct AggregationUnit
{
/**
* Public key of the merchant.
*/
struct TALER_MerchantPublicKeyP merchant_pub;
/**
* Total amount to be transferred.
*/
struct TALER_Amount total_amount;
/**
* Hash of @e wire.
*/
struct GNUNET_HashCode h_wire;
/**
* Wire transfer identifier we use.
*/
struct TALER_WireTransferIdentifierRawP wtid;
/**
* Row ID of the transaction that started it all.
*/
unsigned long long row_id;
/**
* The current time.
*/
struct GNUNET_TIME_Absolute execution_time;
/**
* Wire details of the merchant.
*/
json_t *wire;
/**
* Database session for all of our transactions.
*/
struct TALER_EXCHANGEDB_Session *session;
/**
* Wire preparation handle.
*/
struct TALER_WIRE_PrepareHandle *ph;
/**
* Array of #aggregation_limit row_ids from the
* aggregation.
*/
unsigned long long *additional_rows;
/**
* Offset specifying how many #additional_rows are in use.
*/
unsigned int rows_offset;
/**
* Set to #GNUNET_YES if we have to abort due to failure.
*/
int failed;
};
/** /**
* Which currency is used by this exchange? * Which currency is used by this exchange?
*/ */
@ -62,10 +155,22 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct TALER_WIRE_Plugin *wire_plugin; static struct TALER_WIRE_Plugin *wire_plugin;
/** /**
* Task for the main #run() function. * Next task to run, if any.
*/ */
static struct GNUNET_SCHEDULER_Task *task; static struct GNUNET_SCHEDULER_Task *task;
/**
* If we are currently executing a transfer, information about
* the active transfer is here. Otherwise, this variable is NULL.
*/
static struct WirePrepareData *wpd;
/**
* If we are currently aggregating transactions, information about the
* active aggregation is here. Otherwise, this variable is NULL.
*/
static struct AggregationUnit *au;
/** /**
* Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
* on serious errors. * on serious errors.
@ -104,7 +209,36 @@ shutdown_task (void *cls,
GNUNET_SCHEDULER_cancel (task); GNUNET_SCHEDULER_cancel (task);
task = NULL; task = NULL;
} }
/* FIXME: other shutdown stuff here! */ if (NULL != wpd)
{
if (NULL != wpd->eh)
{
wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls,
wpd->eh);
wpd->eh = NULL;
}
db_plugin->rollback (db_plugin->cls,
wpd->session);
GNUNET_free (wpd);
wpd = NULL;
}
if (NULL != au)
{
if (NULL != au->ph)
{
wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls,
au->ph);
au->ph = NULL;
}
db_plugin->rollback (db_plugin->cls,
au->session);
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
au = NULL;
GNUNET_free (au);
}
} }
@ -183,81 +317,11 @@ exchange_serve_process_config (const char *exchange_directory)
} }
/**
* Information about one aggregation process to
* be executed.
*/
struct AggregationUnit
{
/**
* Public key of the merchant.
*/
struct TALER_MerchantPublicKeyP merchant_pub;
/**
* Total amount to be transferred.
*/
struct TALER_Amount total_amount;
/**
* Hash of @e wire.
*/
struct GNUNET_HashCode h_wire;
/**
* Wire transfer identifier we use.
*/
struct TALER_WireTransferIdentifierRawP wtid;
/**
* Row ID of the transaction that started it all.
*/
unsigned long long row_id;
/**
* The current time.
*/
struct GNUNET_TIME_Absolute execution_time;
/**
* Wire details of the merchant.
*/
json_t *wire;
/**
* Database session for all of our transactions.
*/
struct TALER_EXCHANGEDB_Session *session;
/**
* Wire preparation handle.
*/
struct TALER_WIRE_PrepareHandle *ph;
/**
* Array of #aggregation_limit row_ids from the
* aggregation.
*/
unsigned long long *additional_rows;
/**
* Offset specifying how many #additional_rows are in use.
*/
unsigned int rows_offset;
/**
* Set to #GNUNET_YES if we have to abort due to failure.
*/
int failed;
};
/** /**
* Function called with details about deposits that have been made, * Function called with details about deposits that have been made,
* with the goal of executing the corresponding wire transaction. * with the goal of executing the corresponding wire transaction.
* *
* @param cls closure with the `struct AggregationUnit` * @param cls NULL
* @param row_id identifies database entry * @param row_id identifies database entry
* @param merchant_pub public key of the merchant * @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin * @param coin_pub public key of the coin
@ -282,8 +346,6 @@ deposit_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline, struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire) const json_t *wire)
{ {
struct AggregationUnit *au = cls;
au->merchant_pub = *merchant_pub; au->merchant_pub = *merchant_pub;
if (GNUNET_OK != if (GNUNET_OK !=
TALER_amount_subtract (&au->total_amount, TALER_amount_subtract (&au->total_amount,
@ -337,7 +399,7 @@ deposit_cb (void *cls,
* Function called with details about another deposit we * Function called with details about another deposit we
* can aggregate into an existing aggregation unit. * can aggregate into an existing aggregation unit.
* *
* @param cls closure with the `struct AggregationUnit` * @param cls NULL
* @param row_id identifies database entry * @param row_id identifies database entry
* @param merchant_pub public key of the merchant * @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin * @param coin_pub public key of the coin
@ -362,7 +424,6 @@ aggregate_cb (void *cls,
struct GNUNET_TIME_Absolute wire_deadline, struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire) const json_t *wire)
{ {
struct AggregationUnit *au = cls;
struct TALER_Amount delta; struct TALER_Amount delta;
GNUNET_break (0 == GNUNET_break (0 ==
@ -459,7 +520,6 @@ run_aggregation (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc) const struct GNUNET_SCHEDULER_TaskContext *tc)
{ {
struct TALER_EXCHANGEDB_Session *session; struct TALER_EXCHANGEDB_Session *session;
struct AggregationUnit *au;
unsigned int i; unsigned int i;
int ret; int ret;
@ -493,6 +553,7 @@ run_aggregation (void *cls,
if (NULL != au->wire) if (NULL != au->wire)
json_decref (au->wire); json_decref (au->wire);
GNUNET_free (au); GNUNET_free (au);
au = NULL;
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
if (0 != ret) if (0 != ret)
@ -533,6 +594,7 @@ run_aggregation (void *cls,
if (NULL != au->wire) if (NULL != au->wire)
json_decref (au->wire); json_decref (au->wire);
GNUNET_free (au); GNUNET_free (au);
au = NULL;
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
@ -563,6 +625,7 @@ run_aggregation (void *cls,
if (NULL != au->wire) if (NULL != au->wire)
json_decref (au->wire); json_decref (au->wire);
GNUNET_free (au); GNUNET_free (au);
au = NULL;
return; return;
} }
/* Mark transactions by row_id as minor */ /* Mark transactions by row_id as minor */
@ -591,6 +654,7 @@ run_aggregation (void *cls,
if (NULL != au->wire) if (NULL != au->wire)
json_decref (au->wire); json_decref (au->wire);
GNUNET_free (au); GNUNET_free (au);
au = NULL;
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
@ -602,9 +666,6 @@ run_aggregation (void *cls,
&au->wtid, &au->wtid,
&prepare_cb, &prepare_cb,
au); au);
/* FIXME: currently we have no clean-up plan on
shutdown to call prepare_wire_transfer_cancel!
Maybe make 'au' global? */
if (NULL == au->ph) if (NULL == au->ph)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
@ -613,6 +674,7 @@ run_aggregation (void *cls,
GNUNET_free_non_null (au->additional_rows); GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire) if (NULL != au->wire)
json_decref (au->wire); json_decref (au->wire);
au = NULL;
GNUNET_free (au); GNUNET_free (au);
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
@ -638,7 +700,7 @@ run_transfers (void *cls,
/** /**
* Function to be called with the prepared transfer data. * Function to be called with the prepared transfer data.
* *
* @param cls closure with the `struct AggregationUnit` * @param cls NULL
* @param buf transaction data to persist, NULL on error * @param buf transaction data to persist, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error * @param buf_size number of bytes in @a buf, 0 on error
*/ */
@ -647,13 +709,13 @@ prepare_cb (void *cls,
const char *buf, const char *buf,
size_t buf_size) size_t buf_size)
{ {
struct AggregationUnit *au = cls;
struct TALER_EXCHANGEDB_Session *session = au->session; struct TALER_EXCHANGEDB_Session *session = au->session;
GNUNET_free_non_null (au->additional_rows); GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire) if (NULL != au->wire)
json_decref (au->wire); json_decref (au->wire);
GNUNET_free (au); GNUNET_free (au);
au = NULL;
if (NULL == buf) if (NULL == buf)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
@ -702,34 +764,10 @@ prepare_cb (void *cls,
} }
/**
* Data we keep to #run_transfers().
*/
struct WirePrepareData
{
/**
* Database session for all of our transactions.
*/
struct TALER_EXCHANGEDB_Session *session;
/**
* Wire execution handle.
*/
struct TALER_WIRE_ExecuteHandle *eh;
/**
* Row ID of the transfer.
*/
unsigned long long row_id;
};
/** /**
* Function called with the result from the execute step. * Function called with the result from the execute step.
* *
* @param cls closure with the `struct WirePrepareData` * @param cls NULL
* @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure
* @param emsg NULL on success, otherwise an error message * @param emsg NULL on success, otherwise an error message
*/ */
@ -738,7 +776,6 @@ wire_confirm_cb (void *cls,
int success, int success,
const char *emsg) const char *emsg)
{ {
struct WirePrepareData *wpd = cls;
struct TALER_EXCHANGEDB_Session *session = wpd->session; struct TALER_EXCHANGEDB_Session *session = wpd->session;
wpd->eh = NULL; wpd->eh = NULL;
@ -751,6 +788,7 @@ wire_confirm_cb (void *cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL;
return; return;
} }
if (GNUNET_OK != if (GNUNET_OK !=
@ -763,9 +801,11 @@ wire_confirm_cb (void *cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL;
return; return;
} }
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL;
if (GNUNET_OK != if (GNUNET_OK !=
db_plugin->commit (db_plugin->cls, db_plugin->commit (db_plugin->cls,
session)) session))
@ -788,7 +828,7 @@ wire_confirm_cb (void *cls,
/** /**
* Callback with data about a prepared transaction. * Callback with data about a prepared transaction.
* *
* @param cls closure with the `struct WirePrepareData` * @param cls NULL
* @param rowid row identifier used to mark prepared transaction as done * @param rowid row identifier used to mark prepared transaction as done
* @param buf transaction data that was persisted, NULL on error * @param buf transaction data that was persisted, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error * @param buf_size number of bytes in @a buf, 0 on error
@ -799,17 +839,12 @@ wire_prepare_cb (void *cls,
const char *buf, const char *buf,
size_t buf_size) size_t buf_size)
{ {
struct WirePrepareData *wpd = cls;
wpd->row_id = rowid; wpd->row_id = rowid;
wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls,
buf, buf,
buf_size, buf_size,
&wire_confirm_cb, &wire_confirm_cb,
wpd); NULL);
/* FIXME: currently we have no clean-up plan on
shutdown to call execute_wire_transfer_cancel!
Maybe make 'wpd' global? */
if (NULL == wpd->eh) if (NULL == wpd->eh)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
@ -817,6 +852,7 @@ wire_prepare_cb (void *cls,
wpd->session); wpd->session);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL;
return; return;
} }
} }
@ -834,7 +870,6 @@ run_transfers (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc) const struct GNUNET_SCHEDULER_TaskContext *tc)
{ {
int ret; int ret;
struct WirePrepareData *wpd;
struct TALER_EXCHANGEDB_Session *session; struct TALER_EXCHANGEDB_Session *session;
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
@ -862,7 +897,7 @@ run_transfers (void *cls,
session, session,
exchange_wireformat, exchange_wireformat,
&wire_prepare_cb, &wire_prepare_cb,
wpd); NULL);
if (GNUNET_SYSERR == ret) if (GNUNET_SYSERR == ret)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
@ -870,6 +905,7 @@ run_transfers (void *cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL;
return; return;
} }
if (GNUNET_NO == ret) if (GNUNET_NO == ret)
@ -880,6 +916,7 @@ run_transfers (void *cls,
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL;
return; return;
} }
/* otherwise, continues in #wire_prepare_cb() */ /* otherwise, continues in #wire_prepare_cb() */