complete first pass of taler-wre-auditor's wire-out audit logic

This commit is contained in:
Christian Grothoff 2017-10-12 20:46:42 +02:00
parent 600d1684e3
commit cb13afaf54
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
4 changed files with 298 additions and 29 deletions

View File

@ -21,7 +21,7 @@
* - First, this auditor verifies that 'reserves_in' actually matches
* the incoming wire transfers from the bank.
* - Second, we check that the outgoing wire transfers match those
* given in the 'wire_out' table (TODO!)
* given in the 'wire_out' table
*/
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
@ -68,6 +68,12 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
*/
static struct GNUNET_CONTAINER_MultiHashMap *in_map;
/**
* Map with information about outgoing wire transfers.
* Maps hashes of the wire offsets to `struct ReserveOutInfo`s.
*/
static struct GNUNET_CONTAINER_MultiHashMap *out_map;
/**
* Our session with the #edb.
*/
@ -104,7 +110,7 @@ static struct TALER_WIRE_HistoryHandle *hh;
static enum GNUNET_DB_QueryStatus qsx;
/**
* Last reserve_in / reserve_out serial IDs seen.
* Last reserve_in / wire_out serial IDs seen.
*/
static struct TALER_AUDITORDB_WireProgressPoint pp;
@ -128,7 +134,7 @@ static size_t wire_off_size;
/**
* Entry in map with wire information we expect to obtain from the
* #edb later.
* bank later.
*/
struct ReserveInInfo
{
@ -156,6 +162,26 @@ struct ReserveInInfo
};
/**
* Entry in map with wire information we expect to obtain from the
* #edb later.
*/
struct ReserveOutInfo
{
/**
* Hash of the wire transfer subject.
*/
struct GNUNET_HashCode subject_hash;
/**
* Expected details about the wire transfer.
*/
struct TALER_WIRE_TransferDetails details;
};
/**
* Free entry in #in_map.
*
@ -181,6 +207,31 @@ free_rii (void *cls,
}
/**
* Free entry in #out_map.
*
* @param cls NULL
* @param key unused key
* @param value the `struct ReserveOutInfo` to free
* @return #GNUNET_OK
*/
static int
free_roi (void *cls,
const struct GNUNET_HashCode *key,
void *value)
{
struct ReserveOutInfo *roi = value;
GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (out_map,
key,
roi));
json_decref (roi->details.account_details);
GNUNET_free (roi);
return GNUNET_OK;
}
/**
* Task run on shutdown.
*
@ -203,6 +254,14 @@ do_shutdown (void *cls)
GNUNET_CONTAINER_multihashmap_destroy (in_map);
in_map = NULL;
}
if (NULL != out_map)
{
GNUNET_CONTAINER_multihashmap_iterate (out_map,
&free_roi,
NULL);
GNUNET_CONTAINER_multihashmap_destroy (out_map);
out_map = NULL;
}
if (NULL != wp)
{
TALER_WIRE_plugin_unload (wp);
@ -223,6 +282,7 @@ do_shutdown (void *cls)
/* ***************************** Report logic **************************** */
/**
* Report a (serious) inconsistency in the exchange's database.
*
@ -319,7 +379,7 @@ commit (enum GNUNET_DB_QueryStatus qs)
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Concluded audit step at %llu/%llu\n"),
(unsigned long long) pp.last_reserve_in_serial_id,
(unsigned long long) pp.last_reserve_out_serial_id);
(unsigned long long) pp.last_wire_out_serial_id);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
@ -362,12 +422,132 @@ commit (enum GNUNET_DB_QueryStatus qs)
/**
* Main functin for processing reserves_out data.
* Function called with details about outgoing wire transfers
* as claimed by the exchange DB.
*
* @param cls NULL
* @param rowid unique serial ID for the refresh session in our DB
* @param date timestamp of the wire transfer (roughly)
* @param wtid wire transfer subject
* @param wire wire transfer details of the receiver
* @param amount amount that was wired
* @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
*/
static int
wire_out_cb (void *cls,
uint64_t rowid,
struct GNUNET_TIME_Absolute date,
const struct TALER_WireTransferIdentifierRawP *wtid,
const json_t *wire,
const struct TALER_Amount *amount)
{
struct GNUNET_HashCode key;
struct ReserveOutInfo *roi;
GNUNET_CRYPTO_hash (wtid,
sizeof (struct TALER_WireTransferIdentifierRawP),
&key);
roi = GNUNET_CONTAINER_multihashmap_get (in_map,
&key);
if (NULL == roi)
{
/* FIXME: do proper logging! */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to find wire transfer `%s' over %s at `%s' in exchange database!\n",
TALER_B2S (wtid),
TALER_amount2s (amount),
GNUNET_STRINGS_absolute_time_to_string (date));
return GNUNET_OK;
}
if (0 != TALER_amount_cmp (&roi->details.amount,
amount))
{
report_row_inconsistency ("reserves_out",
rowid,
"wire amount missmatch");
return GNUNET_OK;
}
if (roi->details.execution_date.abs_value_us !=
date.abs_value_us)
{
report_row_minor_inconsistency ("reserves_out",
rowid,
"execution date missmatch");
}
if (! json_equal ((json_t *) wire,
roi->details.account_details))
{
report_row_inconsistency ("reserves_out",
rowid,
"receiver account missmatch");
return GNUNET_OK;
}
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_remove (out_map,
&key,
roi));
GNUNET_assert (GNUNET_OK ==
free_roi (NULL,
&key,
roi));
return GNUNET_OK;
}
/**
* Complain that we failed to match an entry from #out_map.
*
* @param cls NULL
* @param key unused key
* @param value the `struct ReserveOutInfo` to free
* @return #GNUNET_OK
*/
static int
complain_out_not_found (void *cls,
const struct GNUNET_HashCode *key,
void *value)
{
struct ReserveOutInfo *roi = value;
/* FIXME: log more precisely which wire transfer (and amount)
is bogus. */
report_row_inconsistency ("reserves_out",
UINT64_MAX,
"matching wire transfer not found");
return GNUNET_OK;
}
/**
* Go over the "wire_out" table of the exchange and
* verify that all wire outs are in that table.
*/
static void
process_debits ()
check_exchange_wire_out ()
{
/* TODO: also check DEBITs! */
enum GNUNET_DB_QueryStatus qs;
qs = edb->select_wire_out_above_serial_id (edb->cls,
esession,
pp.last_wire_out_serial_id,
&wire_out_cb,
NULL);
if (0 > qs)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_CONTAINER_multihashmap_iterate (out_map,
&complain_out_not_found,
NULL);
/* clean up (technically redundant, but nicer) */
GNUNET_CONTAINER_multihashmap_iterate (out_map,
&free_roi,
NULL);
GNUNET_CONTAINER_multihashmap_destroy (out_map);
out_map = NULL;
/* conclude with: */
commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT);
@ -375,6 +555,90 @@ process_debits ()
}
/**
* This function is called for all transactions that
* are credited to the exchange's account (incoming
* transactions).
*
* @param cls closure
* @param dir direction of the transfer
* @param row_off identification of the position at which we are querying
* @param row_off_size number of bytes in @a row_off
* @param details details about the wire transfer
* @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
*/
static int
history_debit_cb (void *cls,
enum TALER_BANK_Direction dir,
const void *row_off,
size_t row_off_size,
const struct TALER_WIRE_TransferDetails *details)
{
struct ReserveOutInfo *roi;
if (TALER_BANK_DIRECTION_NONE == dir)
{
/* end of iteration, now check wire_out to see
if it matches #out_map */
hh = NULL;
check_exchange_wire_out ();
return GNUNET_OK;
}
roi = GNUNET_new (struct ReserveOutInfo);
GNUNET_CRYPTO_hash (&details->reserve_pub, /* FIXME: missnomer */
sizeof (details->reserve_pub),
&roi->subject_hash);
roi->details.amount = details->amount;
roi->details.execution_date = details->execution_date;
roi->details.reserve_pub = details->reserve_pub; /* FIXME: missnomer & redundant */
roi->details.account_details = json_incref ((json_t *) details->account_details);
if (GNUNET_OK !=
GNUNET_CONTAINER_multihashmap_put (out_map,
&roi->subject_hash,
roi,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY))
{
GNUNET_break_op (0); /* duplicate wire offset is not allowed! */
report_row_inconsistency ("bank wire log",
UINT64_MAX,
"duplicate wire offset");
return GNUNET_SYSERR;
}
return GNUNET_OK;
}
/**
* Main functin for processing 'reserves_out' data.
* We start by going over the DEBIT transactions this
* time, and then verify that all of them are justified
* by 'reserves_out'.
*/
static void
process_debits ()
{
GNUNET_assert (NULL == hh);
out_map = GNUNET_CONTAINER_multihashmap_create (1024,
GNUNET_YES);
hh = wp->get_history (wp->cls,
TALER_BANK_DIRECTION_DEBIT,
out_wire_off,
wire_off_size,
INT64_MAX,
&history_debit_cb,
NULL);
if (NULL == hh)
{
fprintf (stderr,
"Failed to obtain bank transaction history\n");
commit (GNUNET_DB_STATUS_HARD_ERROR);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;
}
}
/* ***************************** Analyze reserves_in ************************ */
@ -441,9 +705,9 @@ reserve_in_cb (void *cls,
* @return #GNUNET_OK
*/
static int
complain_not_found (void *cls,
const struct GNUNET_HashCode *key,
void *value)
complain_in_not_found (void *cls,
const struct GNUNET_HashCode *key,
void *value)
{
struct ReserveInInfo *rii = value;
@ -455,8 +719,9 @@ complain_not_found (void *cls,
/**
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
* This function is called for all transactions that
* are credited to the exchange's account (incoming
* transactions).
*
* @param cls closure
* @param dir direction of the transfer
@ -480,7 +745,7 @@ history_credit_cb (void *cls,
/* end of operation */
hh = NULL;
GNUNET_CONTAINER_multihashmap_iterate (in_map,
&complain_not_found,
&complain_in_not_found,
NULL);
/* clean up before 2nd phase */
GNUNET_CONTAINER_multihashmap_iterate (in_map,
@ -719,7 +984,7 @@ run (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
_("Resuming audit at %llu/%llu\n"),
(unsigned long long) pp.last_reserve_in_serial_id,
(unsigned long long) pp.last_reserve_out_serial_id);
(unsigned long long) pp.last_wire_out_serial_id);
}
in_map = GNUNET_CONTAINER_multihashmap_create (1024,
@ -731,7 +996,7 @@ run (void *cls,
NULL);
if (0 > qs)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
global_ret = 1;
GNUNET_SCHEDULER_shutdown ();
return;

View File

@ -231,7 +231,7 @@ postgres_create_tables (void *cls)
GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS wire_auditor_progress"
"(master_pub BYTEA PRIMARY KEY CHECK (LENGTH(master_pub)=32)"
",last_wire_reserve_in_serial_id INT8 NOT NULL DEFAULT 0"
",last_wire_reserve_out_serial_id INT8 NOT NULL DEFAULT 0"
",last_wire_wire_out_serial_id INT8 NOT NULL DEFAULT 0"
",wire_in_off BYTEA"
",wire_out_off BYTEA"
")"),
@ -520,7 +520,7 @@ postgres_prepare (PGconn *db_conn)
"INSERT INTO wire_auditor_progress "
"(master_pub"
",last_wire_reserve_in_serial_id"
",last_wire_reserve_out_serial_id"
",last_wire_wire_out_serial_id"
",wire_in_off"
",wire_out_off"
") VALUES ($1,$2,$3,$4,$5);",
@ -529,7 +529,7 @@ postgres_prepare (PGconn *db_conn)
GNUNET_PQ_make_prepare ("wire_auditor_progress_update",
"UPDATE wire_auditor_progress SET "
" last_wire_reserve_in_serial_id=$1"
",last_wire_reserve_out_serial_id=$2"
",last_wire_wire_out_serial_id=$2"
",wire_in_off=$3"
",wire_out_off=$4"
" WHERE master_pub=$5",
@ -538,7 +538,7 @@ postgres_prepare (PGconn *db_conn)
GNUNET_PQ_make_prepare ("wire_auditor_progress_select",
"SELECT"
" last_wire_reserve_in_serial_id"
",last_wire_reserve_out_serial_id"
",last_wire_wire_out_serial_id"
",wire_in_off"
",wire_out_off"
" FROM wire_auditor_progress"
@ -1352,7 +1352,7 @@ postgres_insert_wire_auditor_progress (void *cls,
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (master_pub),
GNUNET_PQ_query_param_uint64 (&pp->last_reserve_in_serial_id),
GNUNET_PQ_query_param_uint64 (&pp->last_reserve_out_serial_id),
GNUNET_PQ_query_param_uint64 (&pp->last_wire_out_serial_id),
GNUNET_PQ_query_param_fixed_size (in_wire_off,
wire_off_size),
GNUNET_PQ_query_param_fixed_size (out_wire_off,
@ -1387,7 +1387,7 @@ postgres_update_wire_auditor_progress (void *cls,
{
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint64 (&pp->last_reserve_in_serial_id),
GNUNET_PQ_query_param_uint64 (&pp->last_reserve_out_serial_id),
GNUNET_PQ_query_param_uint64 (&pp->last_wire_out_serial_id),
GNUNET_PQ_query_param_auto_from_type (master_pub),
GNUNET_PQ_query_param_fixed_size (in_wire_off,
wire_off_size),
@ -1429,8 +1429,8 @@ postgres_get_wire_auditor_progress (void *cls,
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("last_reserve_in_serial_id",
&pp->last_reserve_in_serial_id),
GNUNET_PQ_result_spec_uint64 ("last_reserve_out_serial_id",
&pp->last_reserve_out_serial_id),
GNUNET_PQ_result_spec_uint64 ("last_wire_out_serial_id",
&pp->last_wire_out_serial_id),
GNUNET_PQ_result_spec_variable_size ("wire_in_off",
in_wire_off,
wire_off_size),

View File

@ -118,9 +118,9 @@ struct TALER_AUDITORDB_WireProgressPoint
uint64_t last_reserve_in_serial_id;
/**
* last_reserve_out_serial_id serial ID of the last reserve_out the wire auditor processed
* last_wire_out_serial_id serial ID of the last wire_out the wire auditor processed
*/
uint64_t last_reserve_out_serial_id;
uint64_t last_wire_out_serial_id;
};

View File

@ -59,7 +59,11 @@ struct TALER_WIRE_TransferDetails
struct GNUNET_TIME_Absolute execution_date;
/**
* Reserve public key that was encoded in the wire transfer subject
* Reserve public key that was encoded in the wire transfer subject.
* FIXME: this is incorrect for *outgoing* wire transfers.
* Maybe use `struct TALER_WireTransferIdentifierRawP` here instead?
* OTOH, we might want to make this even more generic in case of
* invalid transfers, so that we can capture those as well!
*/
struct TALER_ReservePublicKeyP reserve_pub;