enable multiple wire methods to be used with aggregator, add command to run aggregator in testcase

This commit is contained in:
Christian Grothoff 2016-05-03 07:57:49 +02:00
parent f2b7e36269
commit 79c316f0d5
6 changed files with 261 additions and 118 deletions

View File

@ -16,9 +16,6 @@ taler\-exchange\-aggregator \- Aggregate and execute exchange transactions
.IP "\-d DIRNAME, \-\-exchange-dir=DIRNAME" .IP "\-d DIRNAME, \-\-exchange-dir=DIRNAME"
Use the configuration and other resources for the exchange to operate from DIRNAME. Use the configuration and other resources for the exchange to operate from DIRNAME.
.B .B
.IP "\-f WIREFORMAT, \-\-format=WIREFORMAT"
Overrides WIREFORMAT option from the configuation file.
.B
.IP "\-h, \-\-help" .IP "\-h, \-\-help"
Print short help on options. Print short help on options.
.B .B

View File

@ -122,7 +122,12 @@ enum OpCode
/** /**
* Verify exchange's /deposit/wtid method. * Verify exchange's /deposit/wtid method.
*/ */
OC_DEPOSIT_WTID OC_DEPOSIT_WTID,
/**
* Run the aggregator to execute deposits.
*/
OC_RUN_AGGREGATOR
}; };
@ -535,6 +540,15 @@ struct Command
} deposit_wtid; } deposit_wtid;
struct {
/**
* Process for the aggregator.
*/
struct GNUNET_OS_Process *aggregator_proc;
} run_aggregator;
} details; } details;
}; };
@ -623,6 +637,20 @@ static void
interpreter_run (void *cls); interpreter_run (void *cls);
/**
* Run the next command with the interpreter.
*
* @param is current interpeter state.
*/
static void
next_command (struct InterpreterState *is)
{
is->ip++;
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
}
/** /**
* Function called upon completion of our /admin/add/incoming request. * Function called upon completion of our /admin/add/incoming request.
* *
@ -646,9 +674,7 @@ add_incoming_cb (void *cls,
fail (is); fail (is);
return; return;
} }
is->ip++; next_command (is);
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -833,9 +859,7 @@ reserve_status_cb (void *cls,
GNUNET_break (0); GNUNET_break (0);
break; break;
} }
is->ip++; next_command (is);
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -889,9 +913,7 @@ reserve_withdraw_cb (void *cls,
GNUNET_break (0); GNUNET_break (0);
break; break;
} }
is->ip++; next_command (is);
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -923,9 +945,7 @@ deposit_cb (void *cls,
fail (is); fail (is);
return; return;
} }
is->ip++; next_command (is);
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -960,9 +980,7 @@ melt_cb (void *cls,
return; return;
} }
cmd->details.refresh_melt.noreveal_index = noreveal_index; cmd->details.refresh_melt.noreveal_index = noreveal_index;
is->ip++; next_command (is);
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -1024,10 +1042,7 @@ reveal_cb (void *cls,
default: default:
break; break;
} }
next_command (is);
is->ip++;
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -1124,9 +1139,7 @@ link_cb (void *cls,
default: default:
break; break;
} }
is->ip++; next_command (is);
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -1236,9 +1249,7 @@ wire_cb (void *cls,
default: default:
break; break;
} }
is->ip++; next_command (is);
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -1325,11 +1336,7 @@ wire_deposits_cb (void *cls,
default: default:
break; break;
} }
next_command (is);
/* move to next command */
is->ip++;
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -1377,11 +1384,7 @@ deposit_wtid_cb (void *cls,
default: default:
break; break;
} }
next_command (is);
/* move to next command */
is->ip++;
is->task = GNUNET_SCHEDULER_add_now (&interpreter_run,
is);
} }
@ -1932,15 +1935,38 @@ interpreter_run (void *cls)
json_decref (contract); json_decref (contract);
cmd->details.deposit_wtid.dwh cmd->details.deposit_wtid.dwh
= TALER_EXCHANGE_deposit_wtid (exchange, = TALER_EXCHANGE_deposit_wtid (exchange,
&ref->details.deposit.merchant_priv, &ref->details.deposit.merchant_priv,
&h_wire, &h_wire,
&h_contract, &h_contract,
&coin_pub, &coin_pub,
ref->details.deposit.transaction_id, ref->details.deposit.transaction_id,
&deposit_wtid_cb, &deposit_wtid_cb,
is); is);
} }
return; return;
case OC_RUN_AGGREGATOR:
{
cmd->details.run_aggregator.aggregator_proc
= GNUNET_OS_start_process (GNUNET_NO,
GNUNET_OS_INHERIT_STD_ALL,
NULL, NULL, NULL,
"taler-exchange-aggregator",
"taler-exchange-aggregator",
"-c", "test_exchange_api.conf",
"-t", /* exit when done */
NULL);
if (NULL == cmd->details.run_aggregator.aggregator_proc)
{
GNUNET_break (0);
fail (is);
return;
}
GNUNET_OS_process_wait (cmd->details.run_aggregator.aggregator_proc);
GNUNET_OS_process_destroy (cmd->details.run_aggregator.aggregator_proc);
cmd->details.run_aggregator.aggregator_proc = NULL;
next_command (is);
return;
}
default: default:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Unknown instruction %d at %u (%s)\n", "Unknown instruction %d at %u (%s)\n",
@ -2122,6 +2148,17 @@ do_shutdown (void *cls)
cmd->details.deposit_wtid.dwh = NULL; cmd->details.deposit_wtid.dwh = NULL;
} }
break; break;
case OC_RUN_AGGREGATOR:
if (NULL != cmd->details.run_aggregator.aggregator_proc)
{
GNUNET_break (0 ==
GNUNET_OS_process_kill (cmd->details.run_aggregator.aggregator_proc,
SIGKILL));
GNUNET_OS_process_wait (cmd->details.run_aggregator.aggregator_proc);
GNUNET_OS_process_destroy (cmd->details.run_aggregator.aggregator_proc);
cmd->details.run_aggregator.aggregator_proc = NULL;
}
break;
default: default:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Unknown instruction %d at %u (%s)\n", "Unknown instruction %d at %u (%s)\n",
@ -2425,6 +2462,9 @@ run (void *cls)
.label = "wire-deposit-failing", .label = "wire-deposit-failing",
.expected_response_code = MHD_HTTP_NOT_FOUND }, .expected_response_code = MHD_HTTP_NOT_FOUND },
{ .oc = OC_RUN_AGGREGATOR,
.label = "run-aggregator" },
/* TODO: trigger aggregation logic and then check the /* TODO: trigger aggregation logic and then check the
cases where tracking succeeds! */ cases where tracking succeeds! */

View File

@ -29,6 +29,35 @@
#include "taler_wire_lib.h" #include "taler_wire_lib.h"
/**
* Information we keep for each loaded wire plugin.
*/
struct WirePlugin
{
/**
* Plugins are kept in a DLL.
*/
struct WirePlugin *next;
/**
* Plugins are kept in a DLL.
*/
struct WirePlugin *prev;
/**
* Handle to the plugin.
*/
struct TALER_WIRE_Plugin *wire_plugin;
/**
* Name of the plugin.
*/
char *type;
};
/** /**
* Data we keep to #run_transfers(). There is at most * Data we keep to #run_transfers(). There is at most
* one of these around at any given point in time. * one of these around at any given point in time.
@ -46,6 +75,11 @@ struct WirePrepareData
*/ */
struct TALER_WIRE_ExecuteHandle *eh; struct TALER_WIRE_ExecuteHandle *eh;
/**
* Wire plugin used for this preparation.
*/
struct WirePlugin *wp;
/** /**
* Row ID of the transfer. * Row ID of the transfer.
*/ */
@ -95,6 +129,11 @@ struct AggregationUnit
*/ */
json_t *wire; json_t *wire;
/**
* Wire plugin to be used for the preparation.
*/
struct WirePlugin *wp;
/** /**
* Database session for all of our transactions. * Database session for all of our transactions.
*/ */
@ -129,11 +168,6 @@ struct AggregationUnit
*/ */
static char *exchange_currency_string; static char *exchange_currency_string;
/**
* Which wireformat should be supported by this aggregator?
*/
static char *exchange_wireformat;
/** /**
* The exchange's configuration (global) * The exchange's configuration (global)
*/ */
@ -145,9 +179,14 @@ static struct GNUNET_CONFIGURATION_Handle *cfg;
static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/** /**
* Our wire plugin. * Head of list of loaded wire plugins.
*/ */
static struct TALER_WIRE_Plugin *wire_plugin; static struct WirePlugin *wp_head;
/**
* Tail of list of loaded wire plugins.
*/
static struct WirePlugin *wp_tail;
/** /**
* Next task to run, if any. * Next task to run, if any.
@ -189,6 +228,69 @@ static int test_mode;
static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT; static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT;
/**
* Extract wire plugin type from @a wire address
*
* @param wire a wire address
* @return NULL if @a wire is ill-formed
*/
const char *
extract_type (const json_t *wire)
{
const char *type;
json_t *t;
t = json_object_get (wire, "type");
if (NULL == t)
{
GNUNET_break (0);
return NULL;
}
type = json_string_value (t);
if (NULL == type)
{
GNUNET_break (0);
return NULL;
}
return type;
}
/**
* Find the wire plugin for the given wire address.
*
* @param type wire plugin type we need a plugin for
* @return NULL on error
*/
static struct WirePlugin *
find_plugin (const char *type)
{
struct WirePlugin *wp;
if (NULL == type)
return NULL;
for (wp = wp_head; NULL != wp; wp = wp->next)
if (0 == strcmp (type,
wp->type))
return wp;
wp = GNUNET_new (struct WirePlugin);
wp->wire_plugin = TALER_WIRE_plugin_load (cfg,
type);
if (NULL == wp->wire_plugin)
{
fprintf (stderr,
"Failed to load wire plugin for `%s'\n",
type);
GNUNET_free (wp);
return NULL;
}
wp->type = GNUNET_strdup (type);
GNUNET_CONTAINER_DLL_insert (wp_head,
wp_tail,
wp);
return wp;
}
/** /**
* We're being aborted with CTRL-C (or SIGTERM). Shut down. * We're being aborted with CTRL-C (or SIGTERM). Shut down.
* *
@ -197,6 +299,8 @@ static unsigned int aggregation_limit = TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT
static void static void
shutdown_task (void *cls) shutdown_task (void *cls)
{ {
struct WirePlugin *wp;
if (NULL != task) if (NULL != task)
{ {
GNUNET_SCHEDULER_cancel (task); GNUNET_SCHEDULER_cancel (task);
@ -206,8 +310,8 @@ shutdown_task (void *cls)
{ {
if (NULL != wpd->eh) if (NULL != wpd->eh)
{ {
wire_plugin->execute_wire_transfer_cancel (wire_plugin->cls, wpd->wp->wire_plugin->execute_wire_transfer_cancel (wpd->wp->wire_plugin->cls,
wpd->eh); wpd->eh);
wpd->eh = NULL; wpd->eh = NULL;
} }
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
@ -219,8 +323,8 @@ shutdown_task (void *cls)
{ {
if (NULL != au->ph) if (NULL != au->ph)
{ {
wire_plugin->prepare_wire_transfer_cancel (wire_plugin->cls, au->wp->wire_plugin->prepare_wire_transfer_cancel (au->wp->wire_plugin->cls,
au->ph); au->ph);
au->ph = NULL; au->ph = NULL;
} }
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
@ -232,7 +336,15 @@ shutdown_task (void *cls)
GNUNET_free (au); GNUNET_free (au);
} }
TALER_EXCHANGEDB_plugin_unload (db_plugin); TALER_EXCHANGEDB_plugin_unload (db_plugin);
TALER_WIRE_plugin_unload (wire_plugin); while (NULL != (wp = wp_head))
{
GNUNET_CONTAINER_DLL_remove (wp_head,
wp_tail,
wp);
TALER_WIRE_plugin_unload (wp->wire_plugin);
GNUNET_free (wp->type);
GNUNET_free (wp);
}
GNUNET_CONFIGURATION_destroy (cfg); GNUNET_CONFIGURATION_destroy (cfg);
cfg = NULL; cfg = NULL;
} }
@ -266,22 +378,6 @@ exchange_serve_process_config ()
(unsigned int) TALER_CURRENCY_LEN); (unsigned int) TALER_CURRENCY_LEN);
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if (NULL != exchange_wireformat)
GNUNET_CONFIGURATION_set_value_string (cfg,
"exchange",
"wireformat",
exchange_wireformat);
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg,
"exchange",
"wireformat",
&exchange_wireformat))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
"wireformat");
return GNUNET_SYSERR;
}
if (NULL == if (NULL ==
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
@ -291,15 +387,6 @@ exchange_serve_process_config ()
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if (NULL ==
(wire_plugin = TALER_WIRE_plugin_load (cfg,
exchange_wireformat)))
{
fprintf (stderr,
"Failed to load wire plugin for `%s'\n",
exchange_wireformat);
return GNUNET_SYSERR;
}
return GNUNET_OK; return GNUNET_OK;
} }
@ -511,6 +598,7 @@ run_aggregation (void *cls)
unsigned int i; unsigned int i;
int ret; int ret;
const struct GNUNET_SCHEDULER_TaskContext *tc; const struct GNUNET_SCHEDULER_TaskContext *tc;
struct WirePlugin *wp;
task = NULL; task = NULL;
tc = GNUNET_SCHEDULER_get_task_context (); tc = GNUNET_SCHEDULER_get_task_context ();
@ -571,6 +659,19 @@ run_aggregation (void *cls)
} }
return; return;
} }
wp = find_plugin (extract_type (au->wire));
if (NULL == wp)
{
json_decref (au->wire);
GNUNET_free (au);
au = NULL;
db_plugin->rollback (db_plugin->cls,
session);
GNUNET_SCHEDULER_shutdown ();
return;
}
/* Now try to find other deposits to aggregate */ /* Now try to find other deposits to aggregate */
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Found ready deposit for %s, aggregating\n", "Found ready deposit for %s, aggregating\n",
@ -588,8 +689,7 @@ run_aggregation (void *cls)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n"); "Failed to execute deposit iteration!\n");
GNUNET_free_non_null (au->additional_rows); GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire) json_decref (au->wire);
json_decref (au->wire);
GNUNET_free (au); GNUNET_free (au);
au = NULL; au = NULL;
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
@ -597,10 +697,11 @@ run_aggregation (void *cls)
global_ret = GNUNET_SYSERR; global_ret = GNUNET_SYSERR;
return; return;
} }
/* Round to the unit supported by the wire transfer method */ /* Round to the unit supported by the wire transfer method */
GNUNET_assert (GNUNET_SYSERR != GNUNET_assert (GNUNET_SYSERR !=
wire_plugin->amount_round (wire_plugin->cls, wp->wire_plugin->amount_round (wp->wire_plugin->cls,
&au->total_amount)); &au->total_amount));
/* Check if after rounding down, we still have an amount to transfer */ /* Check if after rounding down, we still have an amount to transfer */
if ( (0 == au->total_amount.value) && if ( (0 == au->total_amount.value) &&
(0 == au->total_amount.fraction) ) (0 == au->total_amount.fraction) )
@ -668,12 +769,13 @@ run_aggregation (void *cls)
amount_s, amount_s,
TALER_B2S (&au->merchant_pub)); TALER_B2S (&au->merchant_pub));
} }
au->ph = wire_plugin->prepare_wire_transfer (wire_plugin->cls, au->wp = wp;
au->wire, au->ph = wp->wire_plugin->prepare_wire_transfer (wp->wire_plugin->cls,
&au->total_amount, au->wire,
&au->wtid, &au->total_amount,
&prepare_cb, &au->wtid,
au); &prepare_cb,
au);
if (NULL == au->ph) if (NULL == au->ph)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
@ -717,11 +819,9 @@ prepare_cb (void *cls,
{ {
struct TALER_EXCHANGEDB_Session *session = au->session; struct TALER_EXCHANGEDB_Session *session = au->session;
GNUNET_free_non_null (au->additional_rows);
if (NULL != au->wire) if (NULL != au->wire)
json_decref (au->wire); json_decref (au->wire);
GNUNET_free (au); GNUNET_free_non_null (au->additional_rows);
au = NULL;
if (NULL == buf) if (NULL == buf)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
@ -730,6 +830,8 @@ prepare_cb (void *cls,
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
GNUNET_free (au);
au = NULL;
return; return;
} }
@ -737,7 +839,7 @@ prepare_cb (void *cls,
if (GNUNET_OK != if (GNUNET_OK !=
db_plugin->wire_prepare_data_insert (db_plugin->cls, db_plugin->wire_prepare_data_insert (db_plugin->cls,
session, session,
exchange_wireformat, au->wp->type,
buf, buf,
buf_size)) buf_size))
{ {
@ -747,8 +849,12 @@ prepare_cb (void *cls,
/* start again */ /* start again */
task = GNUNET_SCHEDULER_add_now (&run_aggregation, task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL); NULL);
GNUNET_free (au);
au = NULL;
return; return;
} }
GNUNET_free (au);
au = NULL;
/* Now we can finally commit the overall transaction, as we are /* Now we can finally commit the overall transaction, as we are
again consistent if all of this passes. */ again consistent if all of this passes. */
@ -839,12 +945,14 @@ wire_confirm_cb (void *cls,
* *
* @param cls NULL * @param cls NULL
* @param rowid row identifier used to mark prepared transaction as done * @param rowid row identifier used to mark prepared transaction as done
* @param wire_method wire method the preparation was done for
* @param buf transaction data that was persisted, NULL on error * @param buf transaction data that was persisted, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error * @param buf_size number of bytes in @a buf, 0 on error
*/ */
static void static void
wire_prepare_cb (void *cls, wire_prepare_cb (void *cls,
unsigned long long rowid, unsigned long long rowid,
const char *wire_method,
const char *buf, const char *buf,
size_t buf_size) size_t buf_size)
{ {
@ -852,11 +960,12 @@ wire_prepare_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting wire transfer %llu\n", "Starting wire transfer %llu\n",
rowid); rowid);
wpd->eh = wire_plugin->execute_wire_transfer (wire_plugin->cls, wpd->wp = find_plugin (wire_method);
buf, wpd->eh = wpd->wp->wire_plugin->execute_wire_transfer (wpd->wp->wire_plugin->cls,
buf_size, buf,
&wire_confirm_cb, buf_size,
NULL); &wire_confirm_cb,
NULL);
if (NULL == wpd->eh) if (NULL == wpd->eh)
{ {
GNUNET_break (0); /* why? how to best recover? */ GNUNET_break (0); /* why? how to best recover? */
@ -910,7 +1019,6 @@ run_transfers (void *cls)
wpd->session = session; wpd->session = session;
ret = db_plugin->wire_prepare_data_get (db_plugin->cls, ret = db_plugin->wire_prepare_data_get (db_plugin->cls,
session, session,
exchange_wireformat,
&wire_prepare_cb, &wire_prepare_cb,
NULL); NULL);
if (GNUNET_SYSERR == ret) if (GNUNET_SYSERR == ret)
@ -981,9 +1089,6 @@ main (int argc,
char *const *argv) char *const *argv)
{ {
static const struct GNUNET_GETOPT_CommandLineOption options[] = { static const struct GNUNET_GETOPT_CommandLineOption options[] = {
{'f', "format", "WIREFORMAT",
"wireformat to use, overrides WIREFORMAT option in [exchange] section", 1,
&GNUNET_GETOPT_set_filename, &exchange_wireformat},
{'t', "test", NULL, {'t', "test", NULL,
"run in test mode and exit when idle", 0, "run in test mode and exit when idle", 0,
&GNUNET_GETOPT_set_one, &test_mode}, &GNUNET_GETOPT_set_one, &test_mode},

View File

@ -1156,14 +1156,14 @@ postgres_prepare (PGconn *db_conn)
PREPARE ("wire_prepare_data_get", PREPARE ("wire_prepare_data_get",
"SELECT" "SELECT"
" serial_id" " serial_id"
",type"
",buf" ",buf"
" FROM prewire" " FROM prewire"
" WHERE" " WHERE"
" type=$1 AND"
" finished=false" " finished=false"
" ORDER BY serial_id ASC" " ORDER BY serial_id ASC"
" LIMIT 1", " LIMIT 1",
1, NULL); 0, NULL);
return GNUNET_OK; return GNUNET_OK;
#undef PREPARE #undef PREPARE
@ -4115,7 +4115,6 @@ postgres_wire_prepare_data_mark_finished (void *cls,
* *
* @param cls closure * @param cls closure
* @param session database connection * @param session database connection
* @param type type fo the wire transfer (i.e. "sepa")
* @param cb function to call for ONE unfinished item * @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return #GNUNET_OK on success, * @return #GNUNET_OK on success,
@ -4125,13 +4124,11 @@ postgres_wire_prepare_data_mark_finished (void *cls,
static int static int
postgres_wire_prepare_data_get (void *cls, postgres_wire_prepare_data_get (void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const char *type,
TALER_EXCHANGEDB_WirePreparationCallback cb, TALER_EXCHANGEDB_WirePreparationCallback cb,
void *cb_cls) void *cb_cls)
{ {
PGresult *result; PGresult *result;
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (type),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
@ -4158,14 +4155,17 @@ postgres_wire_prepare_data_get (void *cls,
{ {
uint64_t serial_id; uint64_t serial_id;
char *type;
void *buf = NULL; void *buf = NULL;
size_t buf_size; size_t buf_size;
struct GNUNET_PQ_ResultSpec rs[] = { struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("serial_id", GNUNET_PQ_result_spec_uint64 ("serial_id",
&serial_id), &serial_id),
GNUNET_PQ_result_spec_string ("type",
&type),
GNUNET_PQ_result_spec_variable_size ("buf", GNUNET_PQ_result_spec_variable_size ("buf",
&buf, &buf,
&buf_size), &buf_size),
GNUNET_PQ_result_spec_end GNUNET_PQ_result_spec_end
}; };
@ -4180,6 +4180,7 @@ postgres_wire_prepare_data_get (void *cls,
} }
cb (cb_cls, cb (cb_cls,
serial_id, serial_id,
type,
buf, buf,
buf_size); buf_size);
GNUNET_PQ_cleanup_result (rs); GNUNET_PQ_cleanup_result (rs);

View File

@ -684,12 +684,14 @@ typedef void
* *
* @param cls closure * @param cls closure
* @param rowid row identifier used to mark prepared transaction as done * @param rowid row identifier used to mark prepared transaction as done
* @param wire_method which wire method is this preparation data for
* @param buf transaction data that was persisted, NULL on error * @param buf transaction data that was persisted, NULL on error
* @param buf_size number of bytes in @a buf, 0 on error * @param buf_size number of bytes in @a buf, 0 on error
*/ */
typedef void typedef void
(*TALER_EXCHANGEDB_WirePreparationCallback) (void *cls, (*TALER_EXCHANGEDB_WirePreparationCallback) (void *cls,
unsigned long long rowid, unsigned long long rowid,
const char *wire_method,
const char *buf, const char *buf,
size_t buf_size); size_t buf_size);
@ -1479,7 +1481,6 @@ struct TALER_EXCHANGEDB_Plugin
* *
* @param cls closure * @param cls closure
* @param session database connection * @param session database connection
* @param type type fo the wire transfer (i.e. "sepa")
* @param cb function to call for ONE unfinished item * @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return #GNUNET_OK on success, * @return #GNUNET_OK on success,
@ -1489,7 +1490,6 @@ struct TALER_EXCHANGEDB_Plugin
int int
(*wire_prepare_data_get)(void *cls, (*wire_prepare_data_get)(void *cls,
struct TALER_EXCHANGEDB_Session *session, struct TALER_EXCHANGEDB_Session *session,
const char *type,
TALER_EXCHANGEDB_WirePreparationCallback cb, TALER_EXCHANGEDB_WirePreparationCallback cb,
void *cb_cls); void *cb_cls);

View File

@ -418,7 +418,7 @@ struct TALER_RefundRequestPS
/** /**
* The coin's public key. This is the value that must have been * The coin's public key. This is the value that must have been
* signed (blindly) by the Exchange. * signed (blindly) by the Exchange.
*/ */
struct TALER_CoinSpendPublicKeyP coin_pub; struct TALER_CoinSpendPublicKeyP coin_pub;
@ -912,7 +912,7 @@ struct TALER_DepositTrackPS
/** /**
* @brief Format internally used for packing the detailed information * @brief Format internally used for packing the detailed information
* to generate the signature for /wire/deposit signatures. * to generate the signature for /wire/deposits signatures.
*/ */
struct TALER_WireDepositDetailP struct TALER_WireDepositDetailP
{ {