From 2518da8f4581d868dd8eafabc54e6b2ddcc998d4 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 11 Jan 2021 23:02:22 +0100 Subject: [PATCH] taler-auditor-sync WiP --- src/auditor/taler-auditor-sync.c | 203 ++++++++++++++++++++++---- src/include/taler_exchangedb_plugin.h | 2 +- 2 files changed, 177 insertions(+), 28 deletions(-) diff --git a/src/auditor/taler-auditor-sync.c b/src/auditor/taler-auditor-sync.c index a76c9a0be..fae3d2188 100644 --- a/src/auditor/taler-auditor-sync.c +++ b/src/auditor/taler-auditor-sync.c @@ -52,6 +52,182 @@ static unsigned int transaction_size = 512; */ static unsigned int actual_size; +static struct Table +{ + enum TALER_EXCHANGEDB_ReplicatedTable rt; + uint64_t start_serial; + uint64_t end_serial; + bool end; +} tables[] = { + { .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS}, + { .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS}, + { .rt = TALER_EXCHANGEDB_RT_RESERVES}, + { .rt = TALER_EXCHANGEDB_RT_RESERVES_IN}, + { .rt = TALER_EXCHANGEDB_RT_RESERVES_CLOSE}, + { .rt = TALER_EXCHANGEDB_RT_RESERVES_OUT}, + { .rt = TALER_EXCHANGEDB_RT_AUDITORS}, + { .rt = TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS}, + { .rt = TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS}, + { .rt = TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS}, + { .rt = TALER_EXCHANGEDB_RT_KNOWN_COINS}, + { .rt = TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS}, + { .rt = TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS}, + { .rt = TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS}, + { .rt = TALER_EXCHANGEDB_RT_DEPOSITS}, + { .rt = TALER_EXCHANGEDB_RT_REFUNDS}, + { .rt = TALER_EXCHANGEDB_RT_WIRE_OUT}, + { .rt = TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING}, + { .rt = TALER_EXCHANGEDB_RT_WIRE_FEE}, + { .rt = TALER_EXCHANGEDB_RT_RECOUP}, + { .rt = TALER_EXCHANGEDB_RT_RECOUP_REFRESH }, + { .end = true } +}; + + +/** + * Function called on data to replicate in the auditor's database. + * + * @param cls closure + * @param td record from an exchange table + * @return #GNUNET_OK to continue to iterate, + * #GNUNET_SYSERR to fail with an error + */ +static int +do_insert (void *cls, + const struct TALER_EXCHANGEDB_TableData *td) +{ + // FIXME ... +} + + +/** + * Run one replication transaction. + * + * @return #GNUNET_OK on success, #GNUNET_SYSERR to rollback + */ +static int +transact (struct TALER_EXCHANGEDB_Session *ss, + struct TALER_EXCHANGEDB_Session *ds) +{ + if (GNUNET_OK != + src->start (src->cls, + ss, + "lookup src serials")) + return GNUNET_SYSERR; + for (unsigned int i = 0; ! tables[i].end; i++) + src->lookup_serial_by_table (src->cls, + ss, + tables[i].rt, + &tables[i].end_serial); + if (GNUNET_OK != + src->commit (src->cls, + ss)) + return GNUNET_SYSERR; + if (GNUNET_OK != + dst->start (src->cls, + ds, + "lookup dst serials")) + return GNUNET_SYSERR; + for (unsigned int i = 0; ! tables[i].end; i++) + dst->lookup_serial_by_table (dst->cls, + ds, + tables[i].rt, + &tables[i].start_serial); + if (GNUNET_OK != + dst->commit (dst->cls, + ds)) + return GNUNET_SYSERR; + for (unsigned int i = 0; ! tables[i].end; i++) + { + printf ("%d ", i); + fflush (stdout); + while (tables[i].start_serial < tables[i].end_serial) + { + enum GNUNET_DB_QueryStatus qs; + + if (GNUNET_OK != + src->start (src->cls, + ss, + "copy table (src)")) + return GNUNET_SYSERR; + if (GNUNET_OK != + dst->start (dst->cls, + ds, + "copy table (dst)")) + return GNUNET_SYSERR; + qs = src->lookup_records_by_table (src->cls, + ss, + tables[i].rt, + tables[i].start_serial, + &do_insert, + ds); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) + { + global_ret = 3; + return GNUNET_SYSERR; + } + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + return GNUNET_SYSERR; /* will retry */ + if (0 == qs) + { + GNUNET_break (0); /* should be impossible */ + global_ret = 4; + return GNUNET_SYSERR; + } + } + } + /* we do not care about conflicting UPDATEs to src table, so safe to just rollback */ + src->rollback (src->cls, + ss); + if (GNUNET_OK != + dst->commit (dst->cls, + ds)) + return GNUNET_SYSERR; + printf ("\n"); + return GNUNET_OK; +} + + +/** + * Task to do the actual synchronization work. + * + * @param cls NULL, unused + */ +static void +do_sync (void *cls) +{ + struct GNUNET_TIME_Relative delay; + struct TALER_EXCHANGEDB_Session *ss; + struct TALER_EXCHANGEDB_Session *ds; + + sync_task = NULL; + actual_size = 0; + ss = src->get_session (src->cls); + ds = dst->get_session (dst->cls); + if (GNUNET_OK != + transact (ss, + ds)) + { + src->rollback (src->cls, + ss); + dst->rollback (dst->cls, + ds); + } + if (0 != global_ret) + return; + if (actual_size < transaction_size / 2) + { + delay = GNUNET_TIME_STD_BACKOFF (delay); + } + else if (actual_size >= transaction_size) + { + delay = GNUNET_TIME_UNIT_ZERO; + } + sync_task = GNUNET_SCHEDULER_add_delayed (delay, + &do_sync, + NULL); +} + /** * Set an option of type 'char *' from the command line with @@ -150,33 +326,6 @@ load_config (const char *cfgfile) } -/** - * Task to do the actual synchronization work. - * - * @param cls NULL, unused - */ -static void -do_sync (void *cls) -{ - struct GNUNET_TIME_Relative delay; - - sync_task = NULL; - actual_size = 0; - // FIXME: do real work here! - if (actual_size < transaction_size / 2) - { - delay = GNUNET_TIME_STD_BACKOFF (delay); - } - else if (actual_size >= transaction_size) - { - delay = GNUNET_TIME_UNIT_ZERO; - } - sync_task = GNUNET_SCHEDULER_add_delayed (delay, - &do_sync, - NULL); -} - - /** * Shutdown task. * diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 8286260ca..92163bb54 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -3773,7 +3773,7 @@ struct TALER_EXCHANGEDB_Plugin * @param session a session * @param table table for which we should return the serial * @param[out] latest serial number in use - * @return transaction status code, GNUNET_DB_STATUS_HARD_ERROR if + * @return transaction status code, #GNUNET_DB_STATUS_HARD_ERROR if * @a table does not have a serial number */ enum GNUNET_DB_QueryStatus