also store wire position in auditordb

This commit is contained in:
Christian Grothoff 2017-09-30 20:47:52 +02:00
parent 96e04d33e1
commit 6a4f6b1836
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
3 changed files with 133 additions and 23 deletions

View File

@ -42,6 +42,11 @@ static int global_ret;
*/ */
static int restart; static int restart;
/**
* Name of the wire plugin to load to access the exchange's bank account.
*/
static char *wire_plugin;
/** /**
* Handle to access the exchange's database. * Handle to access the exchange's database.
*/ */
@ -77,6 +82,11 @@ static struct TALER_AUDITORDB_Session *asession;
*/ */
static struct TALER_MasterPublicKeyP master_pub; static struct TALER_MasterPublicKeyP master_pub;
/**
* Handle to the wire plugin for wire operations.
*/
static struct TALER_WIRE_Plugin *wp;
/** /**
* Last reserve_in / reserve_out serial IDs seen. * Last reserve_in / reserve_out serial IDs seen.
*/ */
@ -159,6 +169,16 @@ analyze_reserves_in (void *cls)
static enum GNUNET_DB_QueryStatus static enum GNUNET_DB_QueryStatus
analyze_reserves_out (void *cls) analyze_reserves_out (void *cls)
{ {
#if 0
// FIXME: start_off != rowid!
hh = wp->get_history (wp->cls,
TALER_BANK_DIRECTION_CREDIT,
&start_off,
sizeof (start_off),
INT64_MAX,
&history_cb,
NULL);
#endif
/* FIXME: #4958 */ /* FIXME: #4958 */
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
} }
@ -191,11 +211,17 @@ incremental_processing (Analysis analysis,
{ {
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
enum GNUNET_DB_QueryStatus qsx; enum GNUNET_DB_QueryStatus qsx;
void *in_wire_off;
void *out_wire_off;
size_t wire_off_size;
qsx = adb->get_wire_auditor_progress (adb->cls, qsx = adb->get_wire_auditor_progress (adb->cls,
asession, asession,
&master_pub, &master_pub,
&pp); &pp,
&in_wire_off,
&out_wire_off,
&wire_off_size);
if (0 > qsx) if (0 > qsx)
{ {
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx); GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
@ -214,6 +240,7 @@ incremental_processing (Analysis analysis,
(unsigned long long) pp.last_reserve_out_serial_id); (unsigned long long) pp.last_reserve_out_serial_id);
} }
qs = analysis (analysis_cls); qs = analysis (analysis_cls);
// FIXME: wire plugin does NOT support synchronous activity!
if (0 > qs) if (0 > qs)
{ {
if (GNUNET_DB_STATUS_SOFT_ERROR == qs) if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@ -228,12 +255,19 @@ incremental_processing (Analysis analysis,
qs = adb->update_wire_auditor_progress (adb->cls, qs = adb->update_wire_auditor_progress (adb->cls,
asession, asession,
&master_pub, &master_pub,
&pp); &pp,
in_wire_off,
out_wire_off,
wire_off_size);
else else
qs = adb->insert_wire_auditor_progress (adb->cls, qs = adb->insert_wire_auditor_progress (adb->cls,
asession, asession,
&master_pub, &master_pub,
&pp); &pp,
in_wire_off,
out_wire_off,
wire_off_size);
if (0 >= qs) if (0 >= qs)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@ -341,7 +375,17 @@ setup_sessions_and_run ()
global_ret = 1; global_ret = 1;
return; return;
} }
wp = TALER_WIRE_plugin_load (cfg,
wire_plugin);
if (NULL == wp)
{
fprintf (stderr,
"Failed to load wire plugin `%s'\n",
wire_plugin);
global_ret = 1;
return;
}
// FIXME: wire plugin does NOT support synchronous activity!
transact (&analyze_reserves_in, transact (&analyze_reserves_in,
NULL); NULL);
transact (&analyze_reserves_out, transact (&analyze_reserves_out,
@ -420,7 +464,11 @@ run (void *cls,
setup_sessions_and_run (); setup_sessions_and_run ();
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Audit complete\n"); "Audit complete\n");
if (NULL != wp)
TALER_WIRE_plugin_unload (wp);
if (NULL != adb)
TALER_AUDITORDB_plugin_unload (adb); TALER_AUDITORDB_plugin_unload (adb);
if (NULL != edb)
TALER_EXCHANGEDB_plugin_unload (edb); TALER_EXCHANGEDB_plugin_unload (edb);
} }
@ -448,6 +496,11 @@ main (int argc,
"restart", "restart",
"restart audit from the beginning (required on first run)", "restart audit from the beginning (required on first run)",
&restart), &restart),
GNUNET_GETOPT_option_string ('w',
"wire",
"PLUGINNAME",
"name of the wire plugin to use",
&wire_plugin),
GNUNET_GETOPT_OPTION_END GNUNET_GETOPT_OPTION_END
}; };

View File

@ -227,8 +227,13 @@ postgres_create_tables (void *cls)
",last_melt_serial_id INT8 NOT NULL DEFAULT 0" ",last_melt_serial_id INT8 NOT NULL DEFAULT 0"
",last_refund_serial_id INT8 NOT NULL DEFAULT 0" ",last_refund_serial_id INT8 NOT NULL DEFAULT 0"
",last_wire_out_serial_id INT8 NOT NULL DEFAULT 0" ",last_wire_out_serial_id INT8 NOT NULL DEFAULT 0"
")"),
GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS wire_auditor_progress"
"(master_pub BYTEA PRIMARY KEY CHECK (LENGTH(master_pub)=32)"
",last_wire_reserve_in_serial_id INT8 NOT NULL DEFAULT 0" ",last_wire_reserve_in_serial_id INT8 NOT NULL DEFAULT 0"
",last_wire_reserve_out_serial_id INT8 NOT NULL DEFAULT 0" ",last_wire_reserve_out_serial_id INT8 NOT NULL DEFAULT 0"
",wire_in_off BLOB"
",wire_out_off BLOB"
")"), ")"),
/* Table with all of the customer reserves and their respective /* Table with all of the customer reserves and their respective
balances that the auditor is aware of. balances that the auditor is aware of.
@ -512,25 +517,31 @@ postgres_prepare (PGconn *db_conn)
1), 1),
/* Used in #postgres_insert_wire_auditor_progress() */ /* Used in #postgres_insert_wire_auditor_progress() */
GNUNET_PQ_make_prepare ("wire_auditor_progress_insert", GNUNET_PQ_make_prepare ("wire_auditor_progress_insert",
"INSERT INTO auditor_progress " "INSERT INTO wire_auditor_progress "
"(master_pub" "(master_pub"
",last_wire_reserve_in_serial_id" ",last_wire_reserve_in_serial_id"
",last_wire_reserve_out_serial_id" ",last_wire_reserve_out_serial_id"
") VALUES ($1,$2,$3);", ",wire_in_off"
3), ",wire_out_off"
") VALUES ($1,$2,$3,$4,$5);",
5),
/* Used in #postgres_update_wire_auditor_progress() */ /* Used in #postgres_update_wire_auditor_progress() */
GNUNET_PQ_make_prepare ("wire_auditor_progress_update", GNUNET_PQ_make_prepare ("wire_auditor_progress_update",
"UPDATE auditor_progress SET " "UPDATE wire_auditor_progress SET "
" last_wire_reserve_in_serial_id=$1" " last_wire_reserve_in_serial_id=$1"
",last_wire_reserve_out_serial_id=$2" ",last_wire_reserve_out_serial_id=$2"
" WHERE master_pub=$3", ",wire_in_off=$3"
3), ",wire_out_off=$4"
" WHERE master_pub=$5",
5),
/* Used in #postgres_get_wire_auditor_progress() */ /* Used in #postgres_get_wire_auditor_progress() */
GNUNET_PQ_make_prepare ("wire_auditor_progress_select", GNUNET_PQ_make_prepare ("wire_auditor_progress_select",
"SELECT" "SELECT"
" last_wire_reserve_in_serial_id" " last_wire_reserve_in_serial_id"
",last_wire_reserve_out_serial_id" ",last_wire_reserve_out_serial_id"
" FROM auditor_progress" ",wire_in_off"
",wire_out_off"
" FROM wire_auditor_progress"
" WHERE master_pub=$1;", " WHERE master_pub=$1;",
1), 1),
/* Used in #postgres_insert_reserve_info() */ /* Used in #postgres_insert_reserve_info() */
@ -1333,12 +1344,19 @@ static enum GNUNET_DB_QueryStatus
postgres_insert_wire_auditor_progress (void *cls, postgres_insert_wire_auditor_progress (void *cls,
struct TALER_AUDITORDB_Session *session, struct TALER_AUDITORDB_Session *session,
const struct TALER_MasterPublicKeyP *master_pub, const struct TALER_MasterPublicKeyP *master_pub,
const struct TALER_AUDITORDB_WireProgressPoint *pp) const struct TALER_AUDITORDB_WireProgressPoint *pp,
const void *in_wire_off,
const void *out_wire_off,
size_t wire_off_size)
{ {
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (master_pub), GNUNET_PQ_query_param_auto_from_type (master_pub),
GNUNET_PQ_query_param_uint64 (&pp->last_reserve_in_serial_id), GNUNET_PQ_query_param_uint64 (&pp->last_reserve_in_serial_id),
GNUNET_PQ_query_param_uint64 (&pp->last_reserve_out_serial_id), GNUNET_PQ_query_param_uint64 (&pp->last_reserve_out_serial_id),
GNUNET_PQ_query_param_fixed_size (in_wire_off,
wire_off_size),
GNUNET_PQ_query_param_fixed_size (out_wire_off,
wire_off_size),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
@ -1362,12 +1380,19 @@ static enum GNUNET_DB_QueryStatus
postgres_update_wire_auditor_progress (void *cls, postgres_update_wire_auditor_progress (void *cls,
struct TALER_AUDITORDB_Session *session, struct TALER_AUDITORDB_Session *session,
const struct TALER_MasterPublicKeyP *master_pub, const struct TALER_MasterPublicKeyP *master_pub,
const struct TALER_AUDITORDB_WireProgressPoint *pp) const struct TALER_AUDITORDB_WireProgressPoint *pp,
const void *in_wire_off,
const void *out_wire_off,
size_t wire_off_size)
{ {
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint64 (&pp->last_reserve_in_serial_id), GNUNET_PQ_query_param_uint64 (&pp->last_reserve_in_serial_id),
GNUNET_PQ_query_param_uint64 (&pp->last_reserve_out_serial_id), GNUNET_PQ_query_param_uint64 (&pp->last_reserve_out_serial_id),
GNUNET_PQ_query_param_auto_from_type (master_pub), GNUNET_PQ_query_param_auto_from_type (master_pub),
GNUNET_PQ_query_param_fixed_size (in_wire_off,
wire_off_size),
GNUNET_PQ_query_param_fixed_size (out_wire_off,
wire_off_size),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
@ -1390,8 +1415,13 @@ static enum GNUNET_DB_QueryStatus
postgres_get_wire_auditor_progress (void *cls, postgres_get_wire_auditor_progress (void *cls,
struct TALER_AUDITORDB_Session *session, struct TALER_AUDITORDB_Session *session,
const struct TALER_MasterPublicKeyP *master_pub, const struct TALER_MasterPublicKeyP *master_pub,
struct TALER_AUDITORDB_WireProgressPoint *pp) struct TALER_AUDITORDB_WireProgressPoint *pp,
void **in_wire_off,
void **out_wire_off,
size_t *wire_off_size)
{ {
size_t xsize;
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (master_pub), GNUNET_PQ_query_param_auto_from_type (master_pub),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
@ -1401,13 +1431,21 @@ postgres_get_wire_auditor_progress (void *cls,
&pp->last_reserve_in_serial_id), &pp->last_reserve_in_serial_id),
GNUNET_PQ_result_spec_uint64 ("last_reserve_out_serial_id", GNUNET_PQ_result_spec_uint64 ("last_reserve_out_serial_id",
&pp->last_reserve_out_serial_id), &pp->last_reserve_out_serial_id),
GNUNET_PQ_result_spec_variable_size ("wire_in_off",
in_wire_off,
wire_off_size),
GNUNET_PQ_result_spec_variable_size ("wire_out_off",
out_wire_off,
&xsize),
GNUNET_PQ_result_spec_end GNUNET_PQ_result_spec_end
}; };
return GNUNET_PQ_eval_prepared_singleton_select (session->conn, qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
"wire_auditor_progress_select", "wire_auditor_progress_select",
params, params,
rs); rs);
GNUNET_assert (xsize == *wire_off_size);
return qs;
} }

View File

@ -377,13 +377,19 @@ struct TALER_AUDITORDB_Plugin
* @param session connection to use * @param session connection to use
* @param master_pub master key of the exchange * @param master_pub master key of the exchange
* @param pp where is the auditor in processing * @param pp where is the auditor in processing
* @param in_wire_off how far are we in the incoming wire transaction history
* @param out_wire_off how far are we in the outgoing wire transaction history
* @param wire_off_size how many bytes do @a in_wire_off and @a out_wire_off take?
* @return transaction status code * @return transaction status code
*/ */
enum GNUNET_DB_QueryStatus enum GNUNET_DB_QueryStatus
(*insert_wire_auditor_progress)(void *cls, (*insert_wire_auditor_progress)(void *cls,
struct TALER_AUDITORDB_Session *session, struct TALER_AUDITORDB_Session *session,
const struct TALER_MasterPublicKeyP *master_pub, const struct TALER_MasterPublicKeyP *master_pub,
const struct TALER_AUDITORDB_WireProgressPoint *pp); const struct TALER_AUDITORDB_WireProgressPoint *pp,
const void *in_wire_off,
const void *out_wire_off,
size_t wire_off_size);
/** /**
@ -394,13 +400,20 @@ struct TALER_AUDITORDB_Plugin
* @param session connection to use * @param session connection to use
* @param master_pub master key of the exchange * @param master_pub master key of the exchange
* @param pp where is the auditor in processing * @param pp where is the auditor in processing
* @param in_wire_off how far are we in the incoming wire transaction history
* @param out_wire_off how far are we in the outgoing wire transaction history
* @param wire_off_size how many bytes do @a in_wire_off and @a out_wire_off take?
* @return transaction status code * @return transaction status code
*/ */
enum GNUNET_DB_QueryStatus enum GNUNET_DB_QueryStatus
(*update_wire_auditor_progress)(void *cls, (*update_wire_auditor_progress)(void *cls,
struct TALER_AUDITORDB_Session *session, struct TALER_AUDITORDB_Session *session,
const struct TALER_MasterPublicKeyP *master_pub, const struct TALER_MasterPublicKeyP *master_pub,
const struct TALER_AUDITORDB_WireProgressPoint *pp); const struct TALER_AUDITORDB_WireProgressPoint *pp,
const void *in_wire_off,
const void *out_wire_off,
size_t wire_off_size);
/** /**
@ -410,13 +423,19 @@ struct TALER_AUDITORDB_Plugin
* @param session connection to use * @param session connection to use
* @param master_pub master key of the exchange * @param master_pub master key of the exchange
* @param[out] pp set to where the auditor is in processing * @param[out] pp set to where the auditor is in processing
* @param[out] in_wire_off how far are we in the incoming wire transaction history
* @param[out] out_wire_off how far are we in the outgoing wire transaction history
* @param[out] wire_off_size how many bytes do @a in_wire_off and @a out_wire_off take?
* @return transaction status code * @return transaction status code
*/ */
enum GNUNET_DB_QueryStatus enum GNUNET_DB_QueryStatus
(*get_wire_auditor_progress)(void *cls, (*get_wire_auditor_progress)(void *cls,
struct TALER_AUDITORDB_Session *session, struct TALER_AUDITORDB_Session *session,
const struct TALER_MasterPublicKeyP *master_pub, const struct TALER_MasterPublicKeyP *master_pub,
struct TALER_AUDITORDB_WireProgressPoint *pp); struct TALER_AUDITORDB_WireProgressPoint *pp,
void **in_wire_off,
void **out_wire_off,
size_t *wire_off_size);
/** /**