taler-auditor-sync WiP

This commit is contained in:
Christian Grothoff 2021-01-11 23:02:22 +01:00
parent e3156e88a7
commit 2518da8f45
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 177 additions and 28 deletions

View File

@ -52,6 +52,182 @@ static unsigned int transaction_size = 512;
*/
static unsigned int actual_size;
static struct Table
{
enum TALER_EXCHANGEDB_ReplicatedTable rt;
uint64_t start_serial;
uint64_t end_serial;
bool end;
} tables[] = {
{ .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS},
{ .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS},
{ .rt = TALER_EXCHANGEDB_RT_RESERVES},
{ .rt = TALER_EXCHANGEDB_RT_RESERVES_IN},
{ .rt = TALER_EXCHANGEDB_RT_RESERVES_CLOSE},
{ .rt = TALER_EXCHANGEDB_RT_RESERVES_OUT},
{ .rt = TALER_EXCHANGEDB_RT_AUDITORS},
{ .rt = TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS},
{ .rt = TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS},
{ .rt = TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS},
{ .rt = TALER_EXCHANGEDB_RT_KNOWN_COINS},
{ .rt = TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS},
{ .rt = TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS},
{ .rt = TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS},
{ .rt = TALER_EXCHANGEDB_RT_DEPOSITS},
{ .rt = TALER_EXCHANGEDB_RT_REFUNDS},
{ .rt = TALER_EXCHANGEDB_RT_WIRE_OUT},
{ .rt = TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING},
{ .rt = TALER_EXCHANGEDB_RT_WIRE_FEE},
{ .rt = TALER_EXCHANGEDB_RT_RECOUP},
{ .rt = TALER_EXCHANGEDB_RT_RECOUP_REFRESH },
{ .end = true }
};
/**
* Function called on data to replicate in the auditor's database.
*
* @param cls closure
* @param td record from an exchange table
* @return #GNUNET_OK to continue to iterate,
* #GNUNET_SYSERR to fail with an error
*/
static int
do_insert (void *cls,
const struct TALER_EXCHANGEDB_TableData *td)
{
// FIXME ...
}
/**
* Run one replication transaction.
*
* @return #GNUNET_OK on success, #GNUNET_SYSERR to rollback
*/
static int
transact (struct TALER_EXCHANGEDB_Session *ss,
struct TALER_EXCHANGEDB_Session *ds)
{
if (GNUNET_OK !=
src->start (src->cls,
ss,
"lookup src serials"))
return GNUNET_SYSERR;
for (unsigned int i = 0; ! tables[i].end; i++)
src->lookup_serial_by_table (src->cls,
ss,
tables[i].rt,
&tables[i].end_serial);
if (GNUNET_OK !=
src->commit (src->cls,
ss))
return GNUNET_SYSERR;
if (GNUNET_OK !=
dst->start (src->cls,
ds,
"lookup dst serials"))
return GNUNET_SYSERR;
for (unsigned int i = 0; ! tables[i].end; i++)
dst->lookup_serial_by_table (dst->cls,
ds,
tables[i].rt,
&tables[i].start_serial);
if (GNUNET_OK !=
dst->commit (dst->cls,
ds))
return GNUNET_SYSERR;
for (unsigned int i = 0; ! tables[i].end; i++)
{
printf ("%d ", i);
fflush (stdout);
while (tables[i].start_serial < tables[i].end_serial)
{
enum GNUNET_DB_QueryStatus qs;
if (GNUNET_OK !=
src->start (src->cls,
ss,
"copy table (src)"))
return GNUNET_SYSERR;
if (GNUNET_OK !=
dst->start (dst->cls,
ds,
"copy table (dst)"))
return GNUNET_SYSERR;
qs = src->lookup_records_by_table (src->cls,
ss,
tables[i].rt,
tables[i].start_serial,
&do_insert,
ds);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
global_ret = 3;
return GNUNET_SYSERR;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
return GNUNET_SYSERR; /* will retry */
if (0 == qs)
{
GNUNET_break (0); /* should be impossible */
global_ret = 4;
return GNUNET_SYSERR;
}
}
}
/* we do not care about conflicting UPDATEs to src table, so safe to just rollback */
src->rollback (src->cls,
ss);
if (GNUNET_OK !=
dst->commit (dst->cls,
ds))
return GNUNET_SYSERR;
printf ("\n");
return GNUNET_OK;
}
/**
* Task to do the actual synchronization work.
*
* @param cls NULL, unused
*/
static void
do_sync (void *cls)
{
struct GNUNET_TIME_Relative delay;
struct TALER_EXCHANGEDB_Session *ss;
struct TALER_EXCHANGEDB_Session *ds;
sync_task = NULL;
actual_size = 0;
ss = src->get_session (src->cls);
ds = dst->get_session (dst->cls);
if (GNUNET_OK !=
transact (ss,
ds))
{
src->rollback (src->cls,
ss);
dst->rollback (dst->cls,
ds);
}
if (0 != global_ret)
return;
if (actual_size < transaction_size / 2)
{
delay = GNUNET_TIME_STD_BACKOFF (delay);
}
else if (actual_size >= transaction_size)
{
delay = GNUNET_TIME_UNIT_ZERO;
}
sync_task = GNUNET_SCHEDULER_add_delayed (delay,
&do_sync,
NULL);
}
/**
* Set an option of type 'char *' from the command line with
@ -150,33 +326,6 @@ load_config (const char *cfgfile)
}
/**
* Task to do the actual synchronization work.
*
* @param cls NULL, unused
*/
static void
do_sync (void *cls)
{
struct GNUNET_TIME_Relative delay;
sync_task = NULL;
actual_size = 0;
// FIXME: do real work here!
if (actual_size < transaction_size / 2)
{
delay = GNUNET_TIME_STD_BACKOFF (delay);
}
else if (actual_size >= transaction_size)
{
delay = GNUNET_TIME_UNIT_ZERO;
}
sync_task = GNUNET_SCHEDULER_add_delayed (delay,
&do_sync,
NULL);
}
/**
* Shutdown task.
*

View File

@ -3773,7 +3773,7 @@ struct TALER_EXCHANGEDB_Plugin
* @param session a session
* @param table table for which we should return the serial
* @param[out] latest serial number in use
* @return transaction status code, GNUNET_DB_STATUS_HARD_ERROR if
* @return transaction status code, #GNUNET_DB_STATUS_HARD_ERROR if
* @a table does not have a serial number
*/
enum GNUNET_DB_QueryStatus