fix #5010 for taler-exchange-aggregator

This commit is contained in:
Christian Grothoff 2017-06-24 12:15:11 +02:00
parent 0599b3b35b
commit 2d662e3f8e
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
4 changed files with 469 additions and 601 deletions

View File

@ -346,18 +346,19 @@ advance_fees (struct WirePlugin *wp,
* @param wp wire transfer fee data structure to update * @param wp wire transfer fee data structure to update
* @param now timestamp to update fees to * @param now timestamp to update fees to
* @param session DB session to use * @param session DB session to use
* @return #GNUNET_OK on success, #GNUNET_SYSERR if we * @return transaction status
* lack current fee information (and need to exit)
*/ */
static int static enum GNUNET_DB_QueryStatus
update_fees (struct WirePlugin *wp, update_fees (struct WirePlugin *wp,
struct GNUNET_TIME_Absolute now, struct GNUNET_TIME_Absolute now,
struct TALER_EXCHANGEDB_Session *session) struct TALER_EXCHANGEDB_Session *session)
{ {
enum GNUNET_DB_QueryStatus qs;
advance_fees (wp, advance_fees (wp,
now); now);
if (NULL != wp->af) if (NULL != wp->af)
return GNUNET_OK; return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
/* Let's try to load it from disk... */ /* Let's try to load it from disk... */
wp->af = TALER_EXCHANGEDB_fees_read (cfg, wp->af = TALER_EXCHANGEDB_fees_read (cfg,
wp->type); wp->type);
@ -367,26 +368,26 @@ update_fees (struct WirePlugin *wp,
NULL != p; NULL != p;
p = p->next) p = p->next)
{ {
if (GNUNET_SYSERR == qs = db_plugin->insert_wire_fee (db_plugin->cls,
db_plugin->insert_wire_fee (db_plugin->cls,
session, session,
wp->type, wp->type,
p->start_date, p->start_date,
p->end_date, p->end_date,
&p->wire_fee, &p->wire_fee,
&p->master_sig)) &p->master_sig);
if (qs < 0)
{ {
TALER_EXCHANGEDB_fees_free (wp->af); TALER_EXCHANGEDB_fees_free (wp->af);
wp->af = NULL; wp->af = NULL;
return GNUNET_SYSERR; return qs;
} }
} }
if (NULL != wp->af) if (NULL != wp->af)
return GNUNET_OK; return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to find current wire transfer fees for `%s'\n", "Failed to find current wire transfer fees for `%s'\n",
wp->type); wp->type);
return GNUNET_SYSERR; return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
} }
@ -425,6 +426,26 @@ find_plugin (const char *type)
return wp; return wp;
} }
/**
* Free data stored in #au.
*/
static void
cleanup_au (void)
{
if (NULL == au)
return;
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
{
json_decref (au->wire);
au->wire = NULL;
}
GNUNET_free (au);
au = NULL;
}
/** /**
* We're being aborted with CTRL-C (or SIGTERM). Shut down. * We're being aborted with CTRL-C (or SIGTERM). Shut down.
* *
@ -463,11 +484,7 @@ shutdown_task (void *cls)
} }
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
au->session); au->session);
GNUNET_free_non_null (au->additional_rows); cleanup_au ();
if (NULL != au->wire)
json_decref (au->wire);
au = NULL;
GNUNET_free (au);
} }
if (NULL != ctc) if (NULL != ctc)
{ {
@ -564,9 +581,9 @@ exchange_serve_process_config ()
* @param wire_deadline by which the merchant adviced that he would like the * @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed * wire transfer to be executed
* @param wire wire details for the merchant * @param wire wire details for the merchant
* @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
*/ */
static int static enum GNUNET_DB_QueryStatus
deposit_cb (void *cls, deposit_cb (void *cls,
uint64_t row_id, uint64_t row_id,
const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_MerchantPublicKeyP *merchant_pub,
@ -588,7 +605,7 @@ deposit_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Fatally malformed record at row %llu\n", "Fatally malformed record at row %llu\n",
(unsigned long long) row_id); (unsigned long long) row_id);
return GNUNET_SYSERR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
au->row_id = row_id; au->row_id = row_id;
GNUNET_assert (NULL == au->wire); GNUNET_assert (NULL == au->wire);
@ -604,38 +621,41 @@ deposit_cb (void *cls,
au->wp = find_plugin (extract_type (au->wire)); au->wp = find_plugin (extract_type (au->wire));
if (NULL == au->wp) if (NULL == au->wp)
return GNUNET_SYSERR; return GNUNET_DB_STATUS_HARD_ERROR;
/* make sure we have current fees */ /* make sure we have current fees */
au->execution_time = GNUNET_TIME_absolute_get (); au->execution_time = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&au->execution_time); (void) GNUNET_TIME_round_abs (&au->execution_time);
if (GNUNET_OK != qs = update_fees (au->wp,
update_fees (au->wp,
au->execution_time, au->execution_time,
au->session)) au->session);
return GNUNET_SYSERR; if (qs <= 0)
{
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
qs = GNUNET_DB_STATUS_HARD_ERROR;
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs;
}
au->wire_fee = au->wp->af->wire_fee; au->wire_fee = au->wp->af->wire_fee;
if (GNUNET_OK != qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
db_plugin->insert_aggregation_tracking (db_plugin->cls,
au->session, au->session,
&au->wtid, &au->wtid,
row_id)) row_id);
if (qs <= 0)
{ {
GNUNET_break (0); GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return GNUNET_SYSERR; return qs;
} }
qs = db_plugin->mark_deposit_done (db_plugin->cls, qs = db_plugin->mark_deposit_done (db_plugin->cls,
au->session, au->session,
row_id); row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{ {
/* FIXME #5010 */ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
GNUNET_break (0); return qs;
au->failed = GNUNET_YES;
return GNUNET_SYSERR;
} }
return GNUNET_OK; return qs;
} }
@ -653,9 +673,9 @@ deposit_cb (void *cls,
* @param wire_deadline by which the merchant adviced that he would like the * @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed * wire transfer to be executed
* @param wire wire details for the merchant * @param wire wire details for the merchant
* @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop * @return transaction status code
*/ */
static int static enum GNUNET_DB_QueryStatus
aggregate_cb (void *cls, aggregate_cb (void *cls,
uint64_t row_id, uint64_t row_id,
const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_MerchantPublicKeyP *merchant_pub,
@ -682,9 +702,12 @@ aggregate_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Fatally malformed record at %llu\n", "Fatally malformed record at %llu\n",
(unsigned long long) row_id); (unsigned long long) row_id);
return GNUNET_SYSERR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
/* add to total */ /* add to total */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding transaction amount %s to aggregation\n",
TALER_amount2s (&delta));
if (GNUNET_OK != if (GNUNET_OK !=
TALER_amount_add (&au->total_amount, TALER_amount_add (&au->total_amount,
&au->total_amount, &au->total_amount,
@ -694,14 +717,14 @@ aggregate_cb (void *cls,
"Overflow or currency incompatibility during aggregation at %llu\n", "Overflow or currency incompatibility during aggregation at %llu\n",
(unsigned long long) row_id); (unsigned long long) row_id);
/* Skip this one, but keep going! */ /* Skip this one, but keep going! */
return GNUNET_OK; return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
} }
if (au->rows_offset >= aggregation_limit) if (au->rows_offset >= aggregation_limit)
{ {
/* Bug: we asked for at most #aggregation_limit results! */ /* Bug: we asked for at most #aggregation_limit results! */
GNUNET_break (0); GNUNET_break (0);
/* Skip this one, but keep going. */ /* Skip this one, but keep going. */
return GNUNET_OK; return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
} }
if (NULL == au->additional_rows) if (NULL == au->additional_rows)
au->additional_rows = GNUNET_new_array (aggregation_limit, au->additional_rows = GNUNET_new_array (aggregation_limit,
@ -709,26 +732,27 @@ aggregate_cb (void *cls,
/* "append" to our list of rows */ /* "append" to our list of rows */
au->additional_rows[au->rows_offset++] = row_id; au->additional_rows[au->rows_offset++] = row_id;
/* insert into aggregation tracking table */ /* insert into aggregation tracking table */
if (GNUNET_OK != qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
db_plugin->insert_aggregation_tracking (db_plugin->cls,
au->session, au->session,
&au->wtid, &au->wtid,
row_id)) row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{ {
GNUNET_break (0); GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return GNUNET_SYSERR; return qs;
} }
qs = db_plugin->mark_deposit_done (db_plugin->cls, qs = db_plugin->mark_deposit_done (db_plugin->cls,
au->session, au->session,
row_id); row_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
{ {
/* FIXME: #5010 */ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
GNUNET_break (0); return qs;
au->failed = GNUNET_YES;
return GNUNET_SYSERR;
} }
return GNUNET_OK; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Added row %llu to aggregation\n",
(unsigned long long) row_id);
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
} }
@ -948,15 +972,18 @@ expired_reserve_cb (void *cls,
} }
/* lookup `closing_fee` */ /* lookup `closing_fee` */
if (GNUNET_OK != qs = update_fees (wp,
update_fees (wp,
now, now,
session)) session);
if (qs <= 0)
{ {
GNUNET_break (0); if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
qs = GNUNET_DB_STATUS_HARD_ERROR;
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return GNUNET_DB_STATUS_HARD_ERROR; return qs;
} }
closing_fee = &wp->af->closing_fee; closing_fee = &wp->af->closing_fee;
@ -1144,7 +1171,6 @@ run_aggregation (void *cls)
static int swap; static int swap;
struct TALER_EXCHANGEDB_Session *session; struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
int ret;
const struct GNUNET_SCHEDULER_TaskContext *tc; const struct GNUNET_SCHEDULER_TaskContext *tc;
task = NULL; task = NULL;
@ -1179,19 +1205,16 @@ run_aggregation (void *cls)
} }
au = GNUNET_new (struct AggregationUnit); au = GNUNET_new (struct AggregationUnit);
au->session = session; au->session = session;
ret = db_plugin->get_ready_deposit (db_plugin->cls, qs = db_plugin->get_ready_deposit (db_plugin->cls,
session, session,
&deposit_cb, &deposit_cb,
au); au);
if (0 >= ret) if (0 >= qs)
{ {
if (NULL != au->wire) cleanup_au ();
json_decref (au->wire);
GNUNET_free (au);
au = NULL;
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
if (GNUNET_SYSERR == ret) if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n"); "Failed to execute deposit iteration!\n");
@ -1199,6 +1222,14 @@ run_aggregation (void *cls)
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
/* should re-try immediately */
swap--; /* do not count failed attempts */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more ready deposits, going to sleep\n"); "No more ready deposits, going to sleep\n");
if ( (GNUNET_YES == test_mode) && if ( (GNUNET_YES == test_mode) &&
@ -1209,12 +1240,13 @@ run_aggregation (void *cls)
} }
else else
{ {
/* nothing to do, sleep for a minute and try again */
if ( (GNUNET_NO == reserves_idle) || if ( (GNUNET_NO == reserves_idle) ||
(GNUNET_YES == test_mode) ) (GNUNET_YES == test_mode) )
/* Possibly more to on reserves, go for it immediately */
task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, task = GNUNET_SCHEDULER_add_now (&run_reserve_closures,
NULL); NULL);
else else
/* nothing to do, sleep for a minute and try again */
task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES, task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MINUTES,
&run_aggregation, &run_aggregation,
NULL); NULL);
@ -1226,28 +1258,36 @@ run_aggregation (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Found ready deposit for %s, aggregating\n", "Found ready deposit for %s, aggregating\n",
TALER_B2S (&au->merchant_pub)); TALER_B2S (&au->merchant_pub));
ret = db_plugin->iterate_matching_deposits (db_plugin->cls, qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
session, session,
&au->h_wire, &au->h_wire,
&au->merchant_pub, &au->merchant_pub,
&aggregate_cb, &aggregate_cb,
au, au,
aggregation_limit); aggregation_limit);
if ( (GNUNET_SYSERR == ret) || if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) ||
(GNUNET_YES == au->failed) ) (GNUNET_YES == au->failed) )
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n"); "Failed to execute deposit iteration!\n");
GNUNET_free_non_null (au->additional_rows); cleanup_au ();
json_decref (au->wire);
GNUNET_free (au);
au = NULL;
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
/* serializiability issue, try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
db_plugin->rollback (db_plugin->cls,
session);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
return;
}
/* Subtract wire transfer fee and round to the unit supported by the /* Subtract wire transfer fee and round to the unit supported by the
wire transfer method; Check if after rounding down, we still have wire transfer method; Check if after rounding down, we still have
@ -1263,13 +1303,16 @@ run_aggregation (void *cls)
(0 == au->final_amount.fraction) ) ) (0 == au->final_amount.fraction) ) )
{ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Aggregate value too low for transfer\n"); "Aggregate value too low for transfer (%d/%s)\n",
qs,
TALER_amount2s (&au->final_amount));
/* Rollback ongoing transaction, as we will not use the respective /* Rollback ongoing transaction, as we will not use the respective
WTID and thus need to remove the tracking data */ WTID and thus need to remove the tracking data */
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
/* Start another transaction to mark all* of the selected deposits
*as minor! */ /* There were results, just the value was too low. Start another
transaction to mark all* of the selected deposits as minor! */
if (GNUNET_OK != if (GNUNET_OK !=
db_plugin->start (db_plugin->cls, db_plugin->start (db_plugin->cls,
session)) session))
@ -1277,16 +1320,11 @@ run_aggregation (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n"); "Failed to start database transaction!\n");
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
cleanup_au ();
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
au = NULL;
return; return;
} }
/* Mark transactions by row_id as minor */ /* Mark transactions by row_id as minor */
ret = GNUNET_OK;
qs = db_plugin->mark_deposit_tiny (db_plugin->cls, qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
session, session,
au->row_id); au->row_id);
@ -1303,13 +1341,11 @@ run_aggregation (void *cls)
} }
if (GNUNET_DB_STATUS_SOFT_ERROR == qs) if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
GNUNET_free_non_null (au->additional_rows); cleanup_au ();
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
au = NULL;
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
@ -1319,21 +1355,13 @@ run_aggregation (void *cls)
{ {
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
GNUNET_free_non_null (au->additional_rows); cleanup_au ();
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
au = NULL;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
/* commit */ /* commit */
(void) commit_or_warn (session); (void) commit_or_warn (session);
GNUNET_free_non_null (au->additional_rows); cleanup_au ();
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
au = NULL;
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
@ -1361,11 +1389,7 @@ run_aggregation (void *cls)
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
GNUNET_free_non_null (au->additional_rows); cleanup_au ();
if (NULL != au->wire)
json_decref (au->wire);
GNUNET_free (au);
au = NULL;
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
@ -1388,8 +1412,10 @@ prepare_cb (void *cls,
size_t buf_size) size_t buf_size)
{ {
struct TALER_EXCHANGEDB_Session *session = au->session; struct TALER_EXCHANGEDB_Session *session = au->session;
enum GNUNET_DB_QueryStatus qs;
GNUNET_free_non_null (au->additional_rows); GNUNET_free_non_null (au->additional_rows);
au->additional_rows = NULL;
if (NULL == buf) if (NULL == buf)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
@ -1398,74 +1424,53 @@ prepare_cb (void *cls,
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
if (NULL != au->wire) cleanup_au ();
{
json_decref (au->wire);
au->wire = NULL;
}
GNUNET_free (au);
au = NULL;
return; return;
} }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Storing %u bytes of wire prepare data\n",
(unsigned int) buf_size);
/* Commit our intention to execute the wire transfer! */ /* Commit our intention to execute the wire transfer! */
if (GNUNET_OK != qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
db_plugin->wire_prepare_data_insert (db_plugin->cls,
session, session,
au->wp->type, au->wp->type,
buf, buf,
buf_size)) buf_size);
{
GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls,
session);
/* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
if (NULL != au->wire)
{
json_decref (au->wire);
au->wire = NULL;
}
GNUNET_free (au);
au = NULL;
return;
}
/* Commit the WTID data to 'wire_out' to finally satisfy aggregation /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
table constraints */ table constraints */
if (GNUNET_OK != if (qs >= 0)
db_plugin->store_wire_transfer_out (db_plugin->cls, qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
session, session,
au->execution_time, au->execution_time,
&au->wtid, &au->wtid,
au->wire, au->wire,
&au->final_amount)) &au->final_amount);
cleanup_au ();
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue for prepared wire data; trying again later!\n");
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
if (NULL != au->wire)
{
json_decref (au->wire);
au->wire = NULL;
}
GNUNET_free (au);
au = NULL;
return; return;
} }
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls,
session);
/* die hard */
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Stored wire transfer out instructions\n"); "Stored wire transfer out instructions\n");
if (NULL != au->wire)
{
json_decref (au->wire);
au->wire = NULL;
}
GNUNET_free (au);
au = NULL;
/* Now we can finally commit the overall transaction, as we are /* Now we can finally commit the overall transaction, as we are
again consistent if all of this passes. */ again consistent if all of this passes. */
@ -1473,6 +1478,8 @@ prepare_cb (void *cls,
{ {
case GNUNET_DB_STATUS_SOFT_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */ /* try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Commit issue for prepared wire data; trying again later!\n");
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
return; return;
@ -1512,6 +1519,7 @@ wire_confirm_cb (void *cls,
const char *emsg) const char *emsg)
{ {
struct TALER_EXCHANGEDB_Session *session = wpd->session; struct TALER_EXCHANGEDB_Session *session = wpd->session;
enum GNUNET_DB_QueryStatus qs;
wpd->eh = NULL; wpd->eh = NULL;
if (GNUNET_SYSERR == success) if (GNUNET_SYSERR == success)
@ -1527,16 +1535,25 @@ wire_confirm_cb (void *cls,
wpd = NULL; wpd = NULL;
return; return;
} }
if (GNUNET_OK != qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
session, session,
wpd->row_id)) wpd->row_id);
if (0 >= qs)
{ {
GNUNET_break (0); /* why!? */ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
/* try again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
}
else
{
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
}
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL; wpd = NULL;
return; return;
@ -1621,7 +1638,7 @@ wire_prepare_cb (void *cls,
static void static void
run_transfers (void *cls) run_transfers (void *cls)
{ {
int ret; enum GNUNET_DB_QueryStatus qs;
struct TALER_EXCHANGEDB_Session *session; struct TALER_EXCHANGEDB_Session *session;
const struct GNUNET_SCHEDULER_TaskContext *tc; const struct GNUNET_SCHEDULER_TaskContext *tc;
@ -1651,35 +1668,39 @@ run_transfers (void *cls)
} }
wpd = GNUNET_new (struct WirePrepareData); wpd = GNUNET_new (struct WirePrepareData);
wpd->session = session; wpd->session = session;
ret = db_plugin->wire_prepare_data_get (db_plugin->cls, qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
session, session,
&wire_prepare_cb, &wire_prepare_cb,
NULL); NULL);
if (GNUNET_SYSERR == ret) if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{ return; /* continues in #wire_prepare_cb() */
GNUNET_break (0); /* why? how to best recover? */
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
GNUNET_free (wpd); GNUNET_free (wpd);
wpd = NULL; wpd = NULL;
return; switch (qs)
}
if (GNUNET_NO == ret)
{ {
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
global_ret = GNUNET_SYSERR;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* no more prepared wire transfers, go back to aggregation! */ /* no more prepared wire transfers, go back to aggregation! */
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more pending wire transfers, starting aggregation\n"); "No more pending wire transfers, starting aggregation\n");
db_plugin->rollback (db_plugin->cls,
session);
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
GNUNET_free (wpd);
wpd = NULL;
return; return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* should be impossible */
GNUNET_assert (0);
} }
/* otherwise, continues in #wire_prepare_cb() */
} }

View File

@ -91,24 +91,6 @@ struct TALER_EXCHANGEDB_Session
*/ */
PGconn *conn; PGconn *conn;
/**
* Transaction state. Set to #GNUNET_OK by #postgres_start().
* Set to #GNUNET_NO if any part of the transaction failed in a
* transient way (i.e. #PG_DIAG_SQLSTATE_DEADLOCK or
* #PG_DIAG_SQLSTATE_SERIALIZATION_FAILURE). Set to
* #GNUNET_SYSERR if any part of the transaction failed in a
* hard way or if we are not within a transaction scope.
*
* If #GNUNET_NO, #postgres_commit() will always just do a
* rollback and return #GNUNET_NO as well (to retry).
*
* If #GNUNET_SYSERR, #postgres_commit() will always just do a
* rollback and return #GNUNET_SYSERR as well.
*
* If #GNUNET_OK, #postgres_commit() will try to commit and
* return the result from the commit operation.
*/
int state;
}; };
@ -1554,7 +1536,6 @@ postgres_get_session (void *cls)
return NULL; return NULL;
} }
session = GNUNET_new (struct TALER_EXCHANGEDB_Session); session = GNUNET_new (struct TALER_EXCHANGEDB_Session);
session->state = GNUNET_SYSERR;
session->conn = db_conn; session->conn = db_conn;
if (0 != pthread_setspecific (pc->db_conn_threadlocal, if (0 != pthread_setspecific (pc->db_conn_threadlocal,
session)) session))
@ -1592,11 +1573,9 @@ postgres_start (void *cls,
PQerrorMessage (session->conn)); PQerrorMessage (session->conn));
GNUNET_break (0); GNUNET_break (0);
PQclear (result); PQclear (result);
session->state = GNUNET_SYSERR;
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
PQclear (result); PQclear (result);
session->state = GNUNET_OK;
return GNUNET_OK; return GNUNET_OK;
} }
@ -1619,51 +1598,6 @@ postgres_rollback (void *cls,
GNUNET_break (PGRES_COMMAND_OK == GNUNET_break (PGRES_COMMAND_OK ==
PQresultStatus (result)); PQresultStatus (result));
PQclear (result); PQclear (result);
session->state = GNUNET_SYSERR;
}
/**
* Check the @a result's error code to see what happened.
* Also logs errors.
*
* @param session session used
* @param result result to check
* @return #GNUNET_OK if the request/transaction succeeded
* #GNUNET_NO if it failed but could succeed if retried
* #GNUNET_SYSERR on hard errors
*/
static int
evaluate_pq_result (struct TALER_EXCHANGEDB_Session *session,
PGresult *result)
{
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
const char *sqlstate;
sqlstate = PQresultErrorField (result,
PG_DIAG_SQLSTATE);
if (NULL == sqlstate)
{
/* very unexpected... */
GNUNET_break (0);
return GNUNET_SYSERR;
}
if ( (0 == strcmp (sqlstate,
PQ_DIAG_SQLSTATE_DEADLOCK)) ||
(0 == strcmp (sqlstate,
PQ_DIAG_SQLSTATE_SERIALIZATION_FAILURE)) )
{
/* These two can be retried and have a fair chance of working
the next time */
QUERY_ERR (result, session->conn);
return GNUNET_NO;
}
BREAK_DB_ERR(result, session->conn);
return GNUNET_SYSERR;
}
return GNUNET_OK;
} }
@ -1688,87 +1622,6 @@ postgres_commit (void *cls,
} }
/**
* Update the @a session state based on the latest @a result from
* the database. Checks the status code of @a result and possibly
* sets the state to failed (#GNUNET_SYSERR) or transiently failed
* (#GNUNET_NO).
*
* @param session the session in which the transaction is running
* @param statement name of the statement we were executing (for logging)
* @param result the result we got from Postgres
* @return current session state, i.e.
* #GNUNET_OK on success
* #GNUNET_NO if the transaction had a transient failure
* #GNUNET_SYSERR if the transaction had a hard failure
*/
static int
update_session_from_result (struct TALER_EXCHANGEDB_Session *session,
const char *statement,
PGresult *result)
{
int ret;
if (GNUNET_OK != session->state)
{
GNUNET_break (0);
return GNUNET_SYSERR; /* we already failed, why do we keep going? */
}
ret = evaluate_pq_result (session,
result);
if (GNUNET_OK == ret)
return ret;
GNUNET_log ((GNUNET_NO == ret)
? GNUNET_ERROR_TYPE_INFO
: GNUNET_ERROR_TYPE_ERROR,
"Statement `%s' failed: %s/%s/%s/%s/%s",
statement,
PQresultErrorField (result, PG_DIAG_MESSAGE_PRIMARY),
PQresultErrorField (result, PG_DIAG_MESSAGE_DETAIL),
PQresultErrorMessage (result),
PQresStatus (PQresultStatus (result)),
PQerrorMessage (session->conn));
session->state = ret;
return ret;
}
/**
* Execute a named prepared @a statement that is NOT a SELECT statement
* in @a session using the given @a params. Returns the resulting session
* state.
*
* @param session session to execute the statement in
* @param statement name of the statement
* @param params parameters to give to the statement (#GNUNET_PQ_query_param_end-terminated)
* @return #GNUNET_OK on success
* #GNUNET_NO if the transaction had a transient failure
* #GNUNET_SYSERR if the transaction had a hard failure
*/
static int
execute_prepared_non_select (struct TALER_EXCHANGEDB_Session *session,
const char *statement,
const struct GNUNET_PQ_QueryParam *params)
{
PGresult *result;
int ret;
if (GNUNET_OK != session->state)
{
GNUNET_break (0);
return GNUNET_SYSERR; /* we already failed, why keep going? */
}
result = GNUNET_PQ_exec_prepared (session->conn,
statement,
params);
ret = update_session_from_result (session,
statement,
result);
PQclear (result);
return ret;
}
/** /**
* Insert a denomination key's public information into the database for * Insert a denomination key's public information into the database for
* reference by auditors and other consistency checks. * reference by auditors and other consistency checks.
@ -2787,10 +2640,9 @@ postgres_mark_deposit_done (void *cls,
* @param session connection to the database * @param session connection to the database
* @param deposit_cb function to call for ONE such deposit * @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb * @param deposit_cb_cls closure for @a deposit_cb
* @return number of rows processed, 0 if none exist, * @return transaction status code
* #GNUNET_SYSERR on error
*/ */
static int static enum GNUNET_DB_QueryStatus
postgres_get_ready_deposit (void *cls, postgres_get_ready_deposit (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
TALER_EXCHANGEDB_DepositIterator deposit_cb, TALER_EXCHANGEDB_DepositIterator deposit_cb,
@ -2801,27 +2653,6 @@ postgres_get_ready_deposit (void *cls,
GNUNET_PQ_query_param_absolute_time (&now), GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
PGresult *result;
unsigned int n;
int ret;
result = GNUNET_PQ_exec_prepared (session->conn,
"deposits_get_ready",
params);
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
if (0 == (n = PQntuples (result)))
{
PQclear (result);
return 0;
}
GNUNET_break (1 == n);
{
struct TALER_Amount amount_with_fee; struct TALER_Amount amount_with_fee;
struct TALER_Amount deposit_fee; struct TALER_Amount deposit_fee;
struct GNUNET_TIME_Absolute wire_deadline; struct GNUNET_TIME_Absolute wire_deadline;
@ -2849,17 +2680,15 @@ postgres_get_ready_deposit (void *cls,
&wire), &wire),
GNUNET_PQ_result_spec_end GNUNET_PQ_result_spec_end
}; };
enum GNUNET_DB_QueryStatus qs;
if (GNUNET_OK != qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
GNUNET_PQ_extract_result (result, "deposits_get_ready",
rs, params,
0)) rs);
{ if (qs <= 0)
GNUNET_break (0); return qs;
PQclear (result); qs = deposit_cb (deposit_cb_cls,
return GNUNET_SYSERR;
}
ret = deposit_cb (deposit_cb_cls,
serial_id, serial_id,
&merchant_pub, &merchant_pub,
&coin_pub, &coin_pub,
@ -2869,9 +2698,116 @@ postgres_get_ready_deposit (void *cls,
wire_deadline, wire_deadline,
wire); wire);
GNUNET_PQ_cleanup_result (rs); GNUNET_PQ_cleanup_result (rs);
PQclear (result); return qs;
}
/**
* Closure for #match_deposit_cb().
*/
struct MatchingDepositContext
{
/**
* Function to call for each result
*/
TALER_EXCHANGEDB_DepositIterator deposit_cb;
/**
* Closure for @e deposit_cb.
*/
void *deposit_cb_cls;
/**
* Public key of the merchant against which we are matching.
*/
const struct TALER_MerchantPublicKeyP *merchant_pub;
/**
* Maximum number of results to return.
*/
uint32_t limit;
/**
* Loop counter, actual number of results returned.
*/
unsigned int i;
/**
* Set to #GNUNET_SYSERR on hard errors.
*/
int status;
};
/**
* Helper function for #postgres_iterate_matching_deposits().
* To be called with the results of a SELECT statement
* that has returned @a num_results results.
*
* @param cls closure of type `struct MatchingDepositContext *`
* @param result the postgres result
* @param num_result the number of results in @a result
*/
static void
match_deposit_cb (void *cls,
PGresult *result,
unsigned int num_results)
{
struct MatchingDepositContext *mdc = cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Found %u/%u matching deposits\n",
num_results,
mdc->limit);
num_results = GNUNET_MIN (num_results,
mdc->limit);
for (mdc->i=0;mdc->i<num_results;mdc->i++)
{
struct TALER_Amount amount_with_fee;
struct TALER_Amount deposit_fee;
struct GNUNET_TIME_Absolute wire_deadline;
struct GNUNET_HashCode h_contract_terms;
struct TALER_CoinSpendPublicKeyP coin_pub;
uint64_t serial_id;
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
&serial_id),
TALER_PQ_result_spec_amount ("amount_with_fee",
&amount_with_fee),
TALER_PQ_result_spec_amount ("fee_deposit",
&deposit_fee),
GNUNET_PQ_result_spec_absolute_time ("wire_deadline",
&wire_deadline),
GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
&h_contract_terms),
GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
&coin_pub),
GNUNET_PQ_result_spec_end
};
if (GNUNET_OK !=
GNUNET_PQ_extract_result (result,
rs,
mdc->i))
{
GNUNET_break (0);
mdc->status = GNUNET_SYSERR;
return;
}
qs = mdc->deposit_cb (mdc->deposit_cb_cls,
serial_id,
mdc->merchant_pub,
&coin_pub,
&amount_with_fee,
&deposit_fee,
&h_contract_terms,
wire_deadline,
NULL);
GNUNET_PQ_cleanup_result (rs);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
break;
} }
return (GNUNET_OK == ret) ? 1 : 0;
} }
@ -2889,7 +2825,7 @@ postgres_get_ready_deposit (void *cls,
* @return transaction status code, if positive: * @return transaction status code, if positive:
* number of rows processed, 0 if none exist * number of rows processed, 0 if none exist
*/ */
static int // FIXME: enum GNUNET_DB_QueryStatus static enum GNUNET_DB_QueryStatus
postgres_iterate_matching_deposits (void *cls, postgres_iterate_matching_deposits (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const struct GNUNET_HashCode *h_wire, const struct GNUNET_HashCode *h_wire,
@ -2903,75 +2839,27 @@ postgres_iterate_matching_deposits (void *cls,
GNUNET_PQ_query_param_auto_from_type (h_wire), GNUNET_PQ_query_param_auto_from_type (h_wire),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
PGresult *result; struct MatchingDepositContext mdc;
unsigned int i; enum GNUNET_DB_QueryStatus qs;
unsigned int n;
result = GNUNET_PQ_exec_prepared (session->conn, mdc.deposit_cb = deposit_cb;
mdc.deposit_cb_cls = deposit_cb_cls;
mdc.merchant_pub = merchant_pub;
mdc.limit = limit;
mdc.status = GNUNET_OK;
qs = GNUNET_PQ_eval_prepared_multi_select (session->conn,
"deposits_iterate_matching", "deposits_iterate_matching",
params); params,
if (PGRES_TUPLES_OK != &match_deposit_cb,
PQresultStatus (result)) &mdc);
{ if (GNUNET_OK != mdc.status)
BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
if (0 == (n = PQntuples (result)))
{
PQclear (result);
return 0;
}
if (n > limit)
n = limit;
for (i=0;i<n;i++)
{
struct TALER_Amount amount_with_fee;
struct TALER_Amount deposit_fee;
struct GNUNET_TIME_Absolute wire_deadline;
struct GNUNET_HashCode h_contract_terms;
struct TALER_CoinSpendPublicKeyP coin_pub;
uint64_t serial_id;
int ret;
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("deposit_serial_id",
&serial_id),
TALER_PQ_result_spec_amount ("amount_with_fee",
&amount_with_fee),
TALER_PQ_result_spec_amount ("fee_deposit",
&deposit_fee),
GNUNET_PQ_result_spec_absolute_time ("wire_deadline",
&wire_deadline),
GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms",
&h_contract_terms),
GNUNET_PQ_result_spec_auto_from_type ("coin_pub",
&coin_pub),
GNUNET_PQ_result_spec_end
};
if (GNUNET_OK !=
GNUNET_PQ_extract_result (result,
rs,
i))
{ {
GNUNET_break (0); GNUNET_break (0);
PQclear (result); return GNUNET_DB_STATUS_HARD_ERROR;
return GNUNET_SYSERR;
} }
ret = deposit_cb (deposit_cb_cls, if (qs >= 0)
serial_id, return mdc.i;
merchant_pub, return qs;
&coin_pub,
&amount_with_fee,
&deposit_fee,
&h_contract_terms,
wire_deadline,
NULL);
GNUNET_PQ_cleanup_result (rs);
if (GNUNET_OK != ret)
break;
}
PQclear (result);
return i;
} }
@ -4493,11 +4381,9 @@ postgres_wire_lookup_deposit_wtid (void *cls,
* @param session database connection * @param session database connection
* @param wtid the raw wire transfer identifier we used * @param wtid the raw wire transfer identifier we used
* @param deposit_serial_id row in the deposits table for which this is aggregation data * @param deposit_serial_id row in the deposits table for which this is aggregation data
* @return #GNUNET_OK on success, * @return transaction status code
* #GNUNET_NO on transient errors
* #GNUNET_SYSERR on DB errors
*/ */
static int static enum GNUNET_DB_QueryStatus
postgres_insert_aggregation_tracking (void *cls, postgres_insert_aggregation_tracking (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const struct TALER_WireTransferIdentifierRawP *wtid, const struct TALER_WireTransferIdentifierRawP *wtid,
@ -4510,7 +4396,7 @@ postgres_insert_aggregation_tracking (void *cls,
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
return execute_prepared_non_select (session, return GNUNET_PQ_eval_prepared_non_select (session->conn,
"insert_aggregation_tracking", "insert_aggregation_tracking",
params); params);
} }
@ -4569,11 +4455,9 @@ postgres_get_wire_fee (void *cls,
* @param end_date when does the fee end being valid * @param end_date when does the fee end being valid
* @param wire_fee how high is the wire transfer fee * @param wire_fee how high is the wire transfer fee
* @param master_sig signature over the above by the exchange master key * @param master_sig signature over the above by the exchange master key
* @return #GNUNET_OK on success or if the record exists, * @return transaction status code
* #GNUNET_NO on transient errors
* #GNUNET_SYSERR on failure
*/ */
static int static enum GNUNET_DB_QueryStatus
postgres_insert_wire_fee (void *cls, postgres_insert_wire_fee (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const char *type, const char *type,
@ -4594,41 +4478,44 @@ postgres_insert_wire_fee (void *cls,
struct TALER_MasterSignatureP sig; struct TALER_MasterSignatureP sig;
struct GNUNET_TIME_Absolute sd; struct GNUNET_TIME_Absolute sd;
struct GNUNET_TIME_Absolute ed; struct GNUNET_TIME_Absolute ed;
enum GNUNET_DB_QueryStatus qs;
if (GNUNET_OK == qs = postgres_get_wire_fee (cls,
postgres_get_wire_fee (cls,
session, session,
type, type,
start_date, start_date,
&sd, &sd,
&ed, &ed,
&wf, &wf,
&sig)) &sig);
if (qs < 0)
return qs;
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{ {
if (0 != memcmp (&sig, if (0 != memcmp (&sig,
master_sig, master_sig,
sizeof (sig))) sizeof (sig)))
{ {
GNUNET_break (0); GNUNET_break (0);
return GNUNET_SYSERR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
if (0 != TALER_amount_cmp (wire_fee, if (0 != TALER_amount_cmp (wire_fee,
&wf)) &wf))
{ {
GNUNET_break (0); GNUNET_break (0);
return GNUNET_SYSERR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
if ( (sd.abs_value_us != start_date.abs_value_us) || if ( (sd.abs_value_us != start_date.abs_value_us) ||
(ed.abs_value_us != end_date.abs_value_us) ) (ed.abs_value_us != end_date.abs_value_us) )
{ {
GNUNET_break (0); GNUNET_break (0);
return GNUNET_SYSERR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
/* equal record already exists */ /* equal record already exists */
return GNUNET_OK; return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS;
} }
return execute_prepared_non_select (session, return GNUNET_PQ_eval_prepared_non_select (session->conn,
"insert_wire_fee", "insert_wire_fee",
params); params);
} }
@ -4862,9 +4749,9 @@ postgres_wire_prepare_data_insert (void *cls,
* @param cls closure * @param cls closure
* @param session database connection * @param session database connection
* @param rowid which entry to mark as finished * @param rowid which entry to mark as finished
* @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors * @return transaction status code
*/ */
static int static enum GNUNET_DB_QueryStatus
postgres_wire_prepare_data_mark_finished (void *cls, postgres_wire_prepare_data_mark_finished (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
uint64_t rowid) uint64_t rowid)
@ -4874,7 +4761,7 @@ postgres_wire_prepare_data_mark_finished (void *cls,
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
return execute_prepared_non_select (session, return GNUNET_PQ_eval_prepared_non_select (session->conn,
"wire_prepare_data_mark_done", "wire_prepare_data_mark_done",
params); params);
} }
@ -4888,43 +4775,18 @@ postgres_wire_prepare_data_mark_finished (void *cls,
* @param session database connection * @param session database connection
* @param cb function to call for ONE unfinished item * @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return #GNUNET_OK on success, * @return transaction status code
* #GNUNET_NO if there are no entries,
* #GNUNET_SYSERR on DB errors
*/ */
static int static enum GNUNET_DB_QueryStatus
postgres_wire_prepare_data_get (void *cls, postgres_wire_prepare_data_get (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
TALER_EXCHANGEDB_WirePreparationIterator cb, TALER_EXCHANGEDB_WirePreparationIterator cb,
void *cb_cls) void *cb_cls)
{ {
PGresult *result; enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
result = GNUNET_PQ_exec_prepared (session->conn,
"wire_prepare_data_get",
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
QUERY_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
if (0 == PQntuples (result))
{
PQclear (result);
return GNUNET_NO;
}
if (1 != PQntuples (result))
{
GNUNET_break (0);
PQclear (result);
return GNUNET_SYSERR;
}
{
uint64_t prewire_uuid; uint64_t prewire_uuid;
char *type; char *type;
void *buf = NULL; void *buf = NULL;
@ -4940,24 +4802,19 @@ postgres_wire_prepare_data_get (void *cls,
GNUNET_PQ_result_spec_end GNUNET_PQ_result_spec_end
}; };
if (GNUNET_OK != qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
GNUNET_PQ_extract_result (result, "wire_prepare_data_get",
rs, params,
0)) rs);
{ if (0 >= qs)
GNUNET_break (0); return qs;
PQclear (result);
return GNUNET_SYSERR;
}
cb (cb_cls, cb (cb_cls,
prewire_uuid, prewire_uuid,
type, type,
buf, buf,
buf_size); buf_size);
GNUNET_PQ_cleanup_result (rs); GNUNET_PQ_cleanup_result (rs);
} return qs;
PQclear (result);
return GNUNET_OK;
} }
@ -5003,7 +4860,6 @@ postgres_start_deferred_wire_out (void *cls,
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
PQclear (result); PQclear (result);
session->state = GNUNET_OK;
return GNUNET_OK; return GNUNET_OK;
} }
@ -5017,10 +4873,9 @@ postgres_start_deferred_wire_out (void *cls,
* @param wtid subject of the wire transfer * @param wtid subject of the wire transfer
* @param wire_account details about the receiver account of the wire transfer * @param wire_account details about the receiver account of the wire transfer
* @param amount amount that was transmitted * @param amount amount that was transmitted
* @return #GNUNET_OK on success * @return transaction status code
* #GNUNET_SYSERR on DB errors
*/ */
static int static enum GNUNET_DB_QueryStatus
postgres_store_wire_transfer_out (void *cls, postgres_store_wire_transfer_out (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
struct GNUNET_TIME_Absolute date, struct GNUNET_TIME_Absolute date,
@ -5036,7 +4891,7 @@ postgres_store_wire_transfer_out (void *cls,
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
return execute_prepared_non_select (session, return GNUNET_PQ_eval_prepared_non_select (session->conn,
"insert_wire_out", "insert_wire_out",
params); params);
} }

View File

@ -914,10 +914,9 @@ static uint64_t deposit_rowid;
* @param wire_deadline by which the merchant adviced that he would like the * @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed * wire transfer to be executed
* @param wire wire details for the merchant, NULL from iterate_matching_deposits() * @param wire wire details for the merchant, NULL from iterate_matching_deposits()
* @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR if deposit does * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
* not match our expectations
*/ */
static int static enum GNUNET_DB_QueryStatus
deposit_cb (void *cls, deposit_cb (void *cls,
uint64_t rowid, uint64_t rowid,
const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_MerchantPublicKeyP *merchant_pub,
@ -953,10 +952,10 @@ deposit_cb (void *cls,
sizeof (struct GNUNET_HashCode))) ) ) sizeof (struct GNUNET_HashCode))) ) )
{ {
GNUNET_break (0); GNUNET_break (0);
return GNUNET_SYSERR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
return GNUNET_OK; return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
} }
@ -1164,7 +1163,7 @@ test_wire_fees (struct TALER_EXCHANGEDB_Session *session)
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK,
&master_sig, &master_sig,
sizeof (master_sig)); sizeof (master_sig));
if (GNUNET_OK != if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->insert_wire_fee (plugin->cls, plugin->insert_wire_fee (plugin->cls,
session, session,
"wire-method", "wire-method",
@ -1176,7 +1175,7 @@ test_wire_fees (struct TALER_EXCHANGEDB_Session *session)
GNUNET_break (0); GNUNET_break (0);
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if (GNUNET_OK != if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->insert_wire_fee (plugin->cls, plugin->insert_wire_fee (plugin->cls,
session, session,
"wire-method", "wire-method",
@ -1800,7 +1799,7 @@ run (void *cls)
NULL)); NULL));
FAILIF (1 != auditor_row_cnt); FAILIF (1 != auditor_row_cnt);
result = 9; result = 9;
FAILIF (1 != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->iterate_matching_deposits (plugin->cls, plugin->iterate_matching_deposits (plugin->cls,
session, session,
&deposit.h_wire, &deposit.h_wire,
@ -1808,7 +1807,7 @@ run (void *cls)
&deposit_cb, &deposit, &deposit_cb, &deposit,
2)); 2));
FAILIF (1 != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->get_ready_deposit (plugin->cls, plugin->get_ready_deposit (plugin->cls,
session, session,
&deposit_cb, &deposit_cb,
@ -1838,18 +1837,18 @@ run (void *cls)
FAILIF (GNUNET_OK != FAILIF (GNUNET_OK !=
plugin->start (plugin->cls, plugin->start (plugin->cls,
session)); session));
FAILIF (GNUNET_NO != FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->test_deposit_done (plugin->cls, plugin->test_deposit_done (plugin->cls,
session, session,
&deposit)); &deposit));
FAILIF (GNUNET_OK != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->mark_deposit_done (plugin->cls, plugin->mark_deposit_done (plugin->cls,
session, session,
deposit_rowid)); deposit_rowid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->commit (plugin->cls, plugin->commit (plugin->cls,
session)); session));
FAILIF (GNUNET_YES != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->test_deposit_done (plugin->cls, plugin->test_deposit_done (plugin->cls,
session, session,
&deposit)); &deposit));
@ -1857,17 +1856,18 @@ run (void *cls)
result = 10; result = 10;
deposit2 = deposit; deposit2 = deposit;
RND_BLK (&deposit2.merchant_pub); /* should fail if merchant is different */ RND_BLK (&deposit2.merchant_pub); /* should fail if merchant is different */
FAILIF (0 != FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->have_deposit (plugin->cls, plugin->have_deposit (plugin->cls,
session, session,
&deposit2)); &deposit2));
deposit2.merchant_pub = deposit.merchant_pub; deposit2.merchant_pub = deposit.merchant_pub;
RND_BLK (&deposit2.coin.coin_pub); /* should fail if coin is different */ RND_BLK (&deposit2.coin.coin_pub); /* should fail if coin is different */
FAILIF (0 != FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->have_deposit (plugin->cls, plugin->have_deposit (plugin->cls,
session, session,
&deposit2)); &deposit2));
FAILIF (GNUNET_OK != test_melting (session)); FAILIF (GNUNET_OK !=
test_melting (session));
/* test insert_refund! */ /* test insert_refund! */
@ -1886,7 +1886,7 @@ run (void *cls)
/* test payback / revocation */ /* test payback / revocation */
RND_BLK (&master_sig); RND_BLK (&master_sig);
FAILIF (GNUNET_OK != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->insert_denomination_revocation (plugin->cls, plugin->insert_denomination_revocation (plugin->cls,
session, session,
&dkp_pub_hash, &dkp_pub_hash,
@ -1897,7 +1897,7 @@ run (void *cls)
FAILIF (GNUNET_OK != FAILIF (GNUNET_OK !=
plugin->start (plugin->cls, plugin->start (plugin->cls,
session)); session));
FAILIF (GNUNET_NO != FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->insert_denomination_revocation (plugin->cls, plugin->insert_denomination_revocation (plugin->cls,
session, session,
&dkp_pub_hash, &dkp_pub_hash,

View File

@ -674,9 +674,9 @@ struct TALER_EXCHANGEDB_Session;
* @param wire_deadline by which the merchant adviced that he would like the * @param wire_deadline by which the merchant adviced that he would like the
* wire transfer to be executed * wire transfer to be executed
* @param receiver_wire_account wire details for the merchant, NULL from iterate_matching_deposits() * @param receiver_wire_account wire details for the merchant, NULL from iterate_matching_deposits()
* @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
*/ */
typedef int typedef enum GNUNET_DB_QueryStatus
(*TALER_EXCHANGEDB_DepositIterator)(void *cls, (*TALER_EXCHANGEDB_DepositIterator)(void *cls,
uint64_t rowid, uint64_t rowid,
const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_MerchantPublicKeyP *merchant_pub,
@ -1383,10 +1383,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param session connection to the database * @param session connection to the database
* @param deposit_cb function to call for ONE such deposit * @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb * @param deposit_cb_cls closure for @a deposit_cb
* @return number of rows processed, 0 if none exist, * @return transaction status code
* #GNUNET_SYSERR on error
*/ */
int enum GNUNET_DB_QueryStatus
(*get_ready_deposit) (void *cls, (*get_ready_deposit) (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
TALER_EXCHANGEDB_DepositIterator deposit_cb, TALER_EXCHANGEDB_DepositIterator deposit_cb,
@ -1418,9 +1417,9 @@ struct TALER_EXCHANGEDB_Plugin
* be #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, larger values * be #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT, larger values
* are not supported, smaller values would be inefficient. * are not supported, smaller values would be inefficient.
* @return number of rows processed, 0 if none exist, * @return number of rows processed, 0 if none exist,
* #GNUNET_SYSERR on error * transaction status code on error
*/ */
int enum GNUNET_DB_QueryStatus
(*iterate_matching_deposits) (void *cls, (*iterate_matching_deposits) (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const struct GNUNET_HashCode *h_wire, const struct GNUNET_HashCode *h_wire,
@ -1753,11 +1752,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param session database connection * @param session database connection
* @param wtid the raw wire transfer identifier we used * @param wtid the raw wire transfer identifier we used
* @param deposit_serial_id row in the deposits table for which this is aggregation data * @param deposit_serial_id row in the deposits table for which this is aggregation data
* @return #GNUNET_OK on success * @return transaction status code
* #GNUNET_NO on transient errors
* #GNUNET_SYSERR on DB errors
*/ */
int enum GNUNET_DB_QueryStatus
(*insert_aggregation_tracking)(void *cls, (*insert_aggregation_tracking)(void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const struct TALER_WireTransferIdentifierRawP *wtid, const struct TALER_WireTransferIdentifierRawP *wtid,
@ -1774,11 +1771,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param end_date when does the fee end being valid * @param end_date when does the fee end being valid
* @param wire_fee how high is the wire transfer fee * @param wire_fee how high is the wire transfer fee
* @param master_sig signature over the above by the exchange master key * @param master_sig signature over the above by the exchange master key
* @return #GNUNET_OK on success or if the record exists, * @return transaction status code
* #GNUNET_NO on transient errors,
* #GNUNET_SYSERR on failure
*/ */
int enum GNUNET_DB_QueryStatus
(*insert_wire_fee)(void *cls, (*insert_wire_fee)(void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const char *wire_method, const char *wire_method,
@ -1879,9 +1874,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param cls closure * @param cls closure
* @param session database connection * @param session database connection
* @param rowid which entry to mark as finished * @param rowid which entry to mark as finished
* @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors * @return transaction status code
*/ */
int enum GNUNET_DB_QueryStatus
(*wire_prepare_data_mark_finished)(void *cls, (*wire_prepare_data_mark_finished)(void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
uint64_t rowid); uint64_t rowid);
@ -1895,11 +1890,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param session database connection * @param session database connection
* @param cb function to call for ONE unfinished item * @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return #GNUNET_OK on success, * @return transaction status code
* #GNUNET_NO if there are no entries,
* #GNUNET_SYSERR on DB errors
*/ */
int enum GNUNET_DB_QueryStatus
(*wire_prepare_data_get)(void *cls, (*wire_prepare_data_get)(void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
TALER_EXCHANGEDB_WirePreparationIterator cb, TALER_EXCHANGEDB_WirePreparationIterator cb,
@ -1930,10 +1923,9 @@ struct TALER_EXCHANGEDB_Plugin
* @param wtid subject of the wire transfer * @param wtid subject of the wire transfer
* @param wire_account details about the receiver account of the wire transfer * @param wire_account details about the receiver account of the wire transfer
* @param amount amount that was transmitted * @param amount amount that was transmitted
* @return #GNUNET_OK on success * @return transaction status code
* #GNUNET_SYSERR on DB errors
*/ */
int enum GNUNET_DB_QueryStatus
(*store_wire_transfer_out)(void *cls, (*store_wire_transfer_out)(void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
struct GNUNET_TIME_Absolute date, struct GNUNET_TIME_Absolute date,