diff --git a/src/auditor/Makefile.am b/src/auditor/Makefile.am index 6432b61d1..8d5b7cf1f 100644 --- a/src/auditor/Makefile.am +++ b/src/auditor/Makefile.am @@ -190,7 +190,8 @@ taler_auditor_dbinit_CPPFLAGS = \ check_SCRIPTS = \ test-auditor.sh \ - test-revocation.sh + test-revocation.sh \ + test-sync.sh .NOTPARALLEL: TESTS = $(check_SCRIPTS) @@ -200,6 +201,8 @@ EXTRA_DIST = \ taler-helper-auditor-render.py \ auditor.conf \ test-auditor.conf \ + test-sync-in.conf \ + test-sync-out.conf \ generate-auditor-basedb.sh \ generate-revoke-basedb.sh \ generate-auditor-basedb.conf \ diff --git a/src/auditor/taler-auditor-sync.c b/src/auditor/taler-auditor-sync.c index 3a57c37ba..84562c5b2 100644 --- a/src/auditor/taler-auditor-sync.c +++ b/src/auditor/taler-auditor-sync.c @@ -50,15 +50,45 @@ static unsigned int transaction_size = 512; /** * Number of records copied in this transaction. */ -static unsigned int actual_size; +static unsigned long long actual_size; -static struct Table +/** + * Terminate once synchronization is achieved. + */ +static int exit_if_synced; + + +/** + * Information we track per replicated table. + */ +struct Table { + /** + * Which table is this record about? + */ enum TALER_EXCHANGEDB_ReplicatedTable rt; + + /** + * Up to which record is the destination table synchronized. + */ uint64_t start_serial; + + /** + * Highest serial in the source table. + */ uint64_t end_serial; + + /** + * Marker for the end of the list of #tables. + */ bool end; -} tables[] = { +}; + + +/** + * Information about replicated tables. + */ +static struct Table tables[] = { { .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS}, { .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS}, { .rt = TALER_EXCHANGEDB_RT_RESERVES}, @@ -94,6 +124,11 @@ struct InsertContext */ struct TALER_EXCHANGEDB_Session *ds; + /** + * Table we are replicating. + */ + struct Table *table; + /** * Set to error if insertion created an error. */ @@ -123,10 +158,32 @@ do_insert (void *cls, td); if (0 >= qs) { + switch (qs) + { + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_assert (0); + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to insert record into table %d: no change\n", + td->table); + break; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error inserting record into table %d (will retry)\n", + td->table); + break; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to insert record into table %d: hard error\n", + td->table); + break; + } ctx->qs = qs; return GNUNET_SYSERR; } actual_size++; + ctx->table->start_serial = td->serial; return GNUNET_OK; } @@ -175,9 +232,17 @@ transact (struct TALER_EXCHANGEDB_Session *ss, return GNUNET_SYSERR; for (unsigned int i = 0; ! tables[i].end; i++) { - printf ("%d ", i); - fflush (stdout); - while (tables[i].start_serial < tables[i].end_serial) + struct Table *table = &tables[i]; + + if (table->start_serial == table->end_serial) + continue; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Replicating table %d from %llu to %llu\n", + i, + (unsigned long long) table->start_serial, + (unsigned long long) table->end_serial); + ctx.table = table; + while (table->start_serial < table->end_serial) { enum GNUNET_DB_QueryStatus qs; @@ -193,21 +258,32 @@ transact (struct TALER_EXCHANGEDB_Session *ss, return GNUNET_SYSERR; qs = src->lookup_records_by_table (src->cls, ss, - tables[i].rt, - tables[i].start_serial, + table->rt, + table->start_serial, &do_insert, &ctx); if (ctx.qs < 0) qs = ctx.qs; if (GNUNET_DB_STATUS_HARD_ERROR == qs) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup records from table %d: hard error\n", + i); global_ret = 3; return GNUNET_SYSERR; } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error looking up records from table %d (will retry)\n", + i); return GNUNET_SYSERR; /* will retry */ + } if (0 == qs) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup records from table %d: no results\n", + i); GNUNET_break (0); /* should be impossible */ global_ret = 4; return GNUNET_SYSERR; @@ -219,16 +295,26 @@ transact (struct TALER_EXCHANGEDB_Session *ss, qs = dst->commit (dst->cls, ds); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error committing transaction on table %d (will retry)\n", + i); continue; + } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Hard error committing transaction on table %d\n", + i); global_ret = 5; return GNUNET_SYSERR; } } } /* we do not care about conflicting UPDATEs to src table, so safe to just rollback */ - printf ("\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Sync pass completed successfully with %llu updates\n", + actual_size); return GNUNET_OK; } @@ -248,18 +334,43 @@ do_sync (void *cls) sync_task = NULL; actual_size = 0; ss = src->get_session (src->cls); + if (NULL == ss) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin transaction with data source. Exiting\n"); + return; + } ds = dst->get_session (dst->cls); + if (NULL == ds) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin transaction with data destination. Exiting\n"); + return; + } if (GNUNET_OK != transact (ss, ds)) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transaction failed, rolling back\n"); src->rollback (src->cls, ss); dst->rollback (dst->cls, ds); } if (0 != global_ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transaction failed permanently, exiting\n"); return; + } + if ( (0 == actual_size) && + (exit_if_synced) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Databases are synchronized. Exiting\n"); + return; + } if (actual_size < transaction_size / 2) { delay = GNUNET_TIME_STD_BACKOFF (delay); @@ -268,6 +379,10 @@ do_sync (void *cls) { delay = GNUNET_TIME_UNIT_ZERO; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Next sync pass in %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); sync_task = GNUNET_SCHEDULER_add_delayed (delay, &do_sync, NULL); @@ -450,6 +565,7 @@ main (int argc, { char *src_cfgfile = NULL; char *dst_cfgfile = NULL; + char *level = GNUNET_strdup ("WARNING"); struct GNUNET_CONFIGURATION_Handle *src_cfg; struct GNUNET_CONFIGURATION_Handle *dst_cfg; const struct GNUNET_GETOPT_CommandLineOption options[] = { @@ -466,15 +582,18 @@ main (int argc, gettext_noop ( "target SIZE for a the number of records to copy in one transaction"), &transaction_size), + GNUNET_GETOPT_option_flag ( + 't', + "terminate-when-synchronized", + gettext_noop ( + "terminate as soon as the databases are synchronized"), + &exit_if_synced), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), + GNUNET_GETOPT_option_loglevel (&level), GNUNET_GETOPT_OPTION_END }; TALER_gcrypt_init (); /* must trigger initialization manually at this point! */ - GNUNET_assert (GNUNET_OK == - GNUNET_log_setup ("taler-auditor-sync", - "WARNING", - NULL)); { int ret; @@ -486,6 +605,11 @@ main (int argc, if (GNUNET_SYSERR == ret) return 1; } + GNUNET_assert (GNUNET_OK == + GNUNET_log_setup ("taler-auditor-sync", + level, + NULL)); + GNUNET_free (level); if (0 == strcmp (src_cfgfile, dst_cfgfile)) { diff --git a/src/auditor/test-sync-in.conf b/src/auditor/test-sync-in.conf new file mode 100644 index 000000000..ef79cf90d --- /dev/null +++ b/src/auditor/test-sync-in.conf @@ -0,0 +1,29 @@ +[exchange] +#The DB plugin to use +DB = postgres + +[exchangedb-postgres] + +#The connection string the plugin has to use for connecting to the database +CONFIG = postgres:///talercheck-in + +# Where are the SQL files to setup our tables? +SQL_DIR = $DATADIR/sql/exchange/ + + +[taler] +CURRENCY = EUR + +[exchangedb] + +# After how long do we close idle reserves? The exchange +# and the auditor must agree on this value. We currently +# expect it to be globally defined for the whole system, +# as there is no way for wallets to query this value. Thus, +# it is only configurable for testing, and should be treated +# as constant in production. +IDLE_RESERVE_EXPIRATION_TIME = 4 weeks + +# After how long do we forget about reserves? Should be above +# the legal expiration timeframe of withdrawn coins. +LEGAL_RESERVE_EXPIRATION_TIME = 7 years diff --git a/src/auditor/test-sync-out.conf b/src/auditor/test-sync-out.conf new file mode 100644 index 000000000..32fb46b37 --- /dev/null +++ b/src/auditor/test-sync-out.conf @@ -0,0 +1,29 @@ +[exchange] +#The DB plugin to use +DB = postgres + +[exchangedb-postgres] + +#The connection string the plugin has to use for connecting to the database +CONFIG = postgres:///talercheck-out + +# Where are the SQL files to setup our tables? +SQL_DIR = $DATADIR/sql/exchange/ + +[taler] +CURRENCY = EUR + + +[exchangedb] + +# After how long do we close idle reserves? The exchange +# and the auditor must agree on this value. We currently +# expect it to be globally defined for the whole system, +# as there is no way for wallets to query this value. Thus, +# it is only configurable for testing, and should be treated +# as constant in production. +IDLE_RESERVE_EXPIRATION_TIME = 4 weeks + +# After how long do we forget about reserves? Should be above +# the legal expiration timeframe of withdrawn coins. +LEGAL_RESERVE_EXPIRATION_TIME = 7 years diff --git a/src/auditor/test-sync.sh b/src/auditor/test-sync.sh new file mode 100755 index 000000000..156df9cc1 --- /dev/null +++ b/src/auditor/test-sync.sh @@ -0,0 +1,42 @@ +#!/bin/sh + +set -eu + +echo -n "Testing synchronization logic ..." + +dropdb talercheck-in 2> /dev/null || true +dropdb talercheck-out 2> /dev/null || true + +createdb talercheck-in || exit 77 +createdb talercheck-out || exit 77 +echo -n "." + +taler-exchange-dbinit -c test-sync-out.conf +echo -n "." +psql talercheck-in < auditor-basedb.sql >/dev/null 2> /dev/null + +echo -n "." +taler-auditor-sync -s test-sync-in.conf -d test-sync-out.conf -t + +for table in denominations denomination_revocations reserves reserves_in reserves_close reserves_out auditors auditor_denom_sigs exchange_sign_keys signkey_revocations known_coins refresh_commitments refresh_revealed_coins refresh_transfer_keys deposits refunds wire_out aggregation_tracking wire_fee recoup recoup_refresh +do + echo -n "." + CIN=`echo "SELECT COUNT(*) FROM $table" | psql talercheck-in -Aqt` + COUT=`echo "SELECT COUNT(*) FROM $table" | psql talercheck-out -Aqt` + + if test ${CIN} != ${COUT} + then + dropdb talercheck-in + dropdb talercheck-out + echo "FAIL" + echo "Record count missmatch: $CIN / $COUT in table $table" + exit 1 + fi +done + +echo -n ". " +dropdb talercheck-in +dropdb talercheck-out + +echo "PASS" +exit 0 diff --git a/src/exchangedb/irbt_callbacks.c b/src/exchangedb/irbt_callbacks.c index 127ac6d51..5f6cf6d48 100644 --- a/src/exchangedb/irbt_callbacks.c +++ b/src/exchangedb/irbt_callbacks.c @@ -437,11 +437,11 @@ irbt_cb_table_refresh_revealed_coins (struct PostgresClosure *pg, &td->details.refresh_revealed_coins.freshcoin_index), GNUNET_PQ_query_param_auto_from_type ( &td->details.refresh_revealed_coins.link_sig), - GNUNET_PQ_query_param_auto_from_type (&h_coin_ev), GNUNET_PQ_query_param_fixed_size ( td->details.refresh_revealed_coins.coin_ev, td->details.refresh_revealed_coins. coin_ev_size), + GNUNET_PQ_query_param_auto_from_type (&h_coin_ev), GNUNET_PQ_query_param_rsa_signature ( td->details.refresh_revealed_coins.ev_sig.rsa_signature), GNUNET_PQ_query_param_uint64 ( diff --git a/src/exchangedb/lrbt_callbacks.c b/src/exchangedb/lrbt_callbacks.c index 35918d7f0..8acc99cdd 100644 --- a/src/exchangedb/lrbt_callbacks.c +++ b/src/exchangedb/lrbt_callbacks.c @@ -436,14 +436,18 @@ lrbt_cb_table_auditor_denom_sigs (void *cls, for (unsigned int i = 0; i $1" " ORDER BY auditor_denom_serial ASC;", @@ -2187,16 +2060,14 @@ postgres_get_session (void *cls) "select_above_serial_by_table_refresh_revealed_coins", "SELECT" " rrc_serial AS serial" - ",rc" ",freshcoin_index" ",link_sig" ",coin_ev" ",h_coin_ev" ",ev_sig" - ",rrc_serial" + ",melt_serial_id" ",denominations_serial" " FROM refresh_revealed_coins" - " JOIN refresh_commitments USING (melt_serial_id)" " WHERE rrc_serial > $1" " ORDER BY rrc_serial ASC;", 1), @@ -2204,11 +2075,10 @@ postgres_get_session (void *cls) "select_above_serial_by_table_refresh_transfer_keys", "SELECT" " rtc_serial AS serial" - ",rc" ",transfer_pub" ",transfer_privs" + ",melt_serial_id" " FROM refresh_transfer_keys" - " JOIN refresh_commitments USING (melt_serial_id)" " WHERE rtc_serial > $1" " ORDER BY rtc_serial ASC;", 1), @@ -2236,15 +2106,12 @@ postgres_get_session (void *cls) GNUNET_PQ_make_prepare ("select_above_serial_by_table_refunds", "SELECT" " refund_serial_id AS serial" - ",merchant_pub" ",merchant_sig" - ",h_contract_terms" ",rtransaction_id" - ",refunds.amount_with_fee_val" - ",refunds.amount_with_fee_frac" - ",known_coin_id" + ",amount_with_fee_val" + ",amount_with_fee_frac" + ",deposit_serial_id" " FROM refunds" - " JOIN deposits USING (deposit_serial_id)" " WHERE refund_serial_id > $1" " ORDER BY refund_serial_id ASC;", 1), diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 92163bb54..fc17f70c7 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -3811,7 +3811,7 @@ struct TALER_EXCHANGEDB_Plugin * @param cls closure * @param session a session * @param tb table data to insert - * @return transaction status code, GNUNET_DB_STATUS_HARD_ERROR if + * @return transaction status code, #GNUNET_DB_STATUS_HARD_ERROR if * @a table does not have a serial number */ enum GNUNET_DB_QueryStatus