-bugfix, preparations for sharding

This commit is contained in:
Christian Grothoff 2021-06-19 18:20:19 +02:00
parent 03e2aa71bc
commit 0271e84813
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 50 additions and 60 deletions

View File

@ -210,24 +210,6 @@ handle_credit_history_finished (void *cls,
} }
/**
* Request the credit history of the exchange's bank account.
*
* @param ctx curl context for the event loop
* @param auth authentication data to use
* @param start_row from which row on do we want to get results,
* use UINT64_MAX for the latest; exclusive
* @param num_results how many results do we want;
* negative numbers to go into the past, positive numbers
* to go into the future starting at @a start_row;
* must not be zero.
* @param hres_cb the callback to call with the transaction
* history
* @param hres_cb_cls closure for the above callback
* @return NULL if the inputs are invalid (i.e. zero value for
* @e num_results). In this case, the callback is not
* called.
*/
struct TALER_BANK_CreditHistoryHandle * struct TALER_BANK_CreditHistoryHandle *
TALER_BANK_credit_history (struct GNUNET_CURL_Context *ctx, TALER_BANK_credit_history (struct GNUNET_CURL_Context *ctx,
const struct TALER_BANK_AuthenticationData *auth, const struct TALER_BANK_AuthenticationData *auth,
@ -300,13 +282,6 @@ TALER_BANK_credit_history (struct GNUNET_CURL_Context *ctx,
} }
/**
* Cancel a history request. This function cannot be
* used on a request handle if a response is already
* served for it.
*
* @param hh the history request handle
*/
void void
TALER_BANK_credit_history_cancel (struct TALER_BANK_CreditHistoryHandle *hh) TALER_BANK_credit_history_cancel (struct TALER_BANK_CreditHistoryHandle *hh)
{ {

View File

@ -1,6 +1,6 @@
/* /*
This file is part of TALER This file is part of TALER
Copyright (C) 2016--2020 Taler Systems SA Copyright (C) 2016--2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software terms of the GNU Affero General Public License as published by the Free Software
@ -89,6 +89,11 @@ struct WireAccount
*/ */
uint64_t latest_row_off; uint64_t latest_row_off;
/**
* Offset where our current shard ends.
*/
uint64_t shard_end;
/** /**
* How many transactions do we retrieve per batch? * How many transactions do we retrieve per batch?
*/ */
@ -103,19 +108,14 @@ struct WireAccount
* Are we running from scratch and should re-process all transactions * Are we running from scratch and should re-process all transactions
* for this account? * for this account?
*/ */
int reset_mode; bool reset_mode;
/** /**
* Should we delay the next request to the wire plugin a bit? Set to * Should we delay the next request to the wire plugin a bit? Set to
* #GNUNET_NO if we actually did some work. * false if we actually did some work.
*/ */
int delay; bool delay;
/**
* Did we experience a soft failure during the current
* transaction?
*/
bool soft_fail;
}; };
@ -160,6 +160,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
*/ */
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
* Modulus to apply to group shards.
*/
static unsigned int shard_size = 1024;
/** /**
* Value to return from main(). 0 on success, non-zero on * Value to return from main(). 0 on success, non-zero on
* on serious errors. * on serious errors.
@ -363,20 +368,10 @@ history_cb (void *cls,
(unsigned int) ec, (unsigned int) ec,
http_status); http_status);
} }
if (wa->soft_fail) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
{ "End of list. Committing progress!\n");
/* no point to commit, transaction was already rolled qs = db_plugin->commit (db_plugin->cls,
back after we encountered a soft failure */ session);
wa->soft_fail = false;
qs = GNUNET_DB_STATUS_SOFT_ERROR;
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"End of list. Committing progress!\n");
qs = db_plugin->commit (db_plugin->cls,
session);
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs) if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{ {
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
@ -410,7 +405,7 @@ history_cb (void *cls,
"Increasing batch size to %llu\n", "Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size); (unsigned long long) wa->batch_size);
} }
if ( (GNUNET_YES == wa->delay) && if ( (wa->delay) &&
(test_mode) && (test_mode) &&
(NULL == wa->next) ) (NULL == wa->next) )
{ {
@ -419,7 +414,7 @@ history_cb (void *cls,
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK; return GNUNET_OK;
} }
if (GNUNET_YES == wa->delay) if (wa->delay)
{ {
wa->delayed_until wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
@ -477,6 +472,7 @@ history_cb (void *cls,
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if (GNUNET_DB_STATUS_SOFT_ERROR == qs) if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@ -485,10 +481,13 @@ history_cb (void *cls,
"Got DB soft error for reserves_in_insert. Rolling back.\n"); "Got DB soft error for reserves_in_insert. Rolling back.\n");
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
wa->soft_fail = true; wa->hh = NULL;
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
wa->delay = GNUNET_NO; wa->delay = false;
wa->latest_row_off = serial_id; wa->latest_row_off = serial_id;
return GNUNET_OK; return GNUNET_OK;
} }
@ -504,6 +503,7 @@ find_transfers (void *cls)
{ {
struct TALER_EXCHANGEDB_Session *session; struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
unsigned int limit;
(void) cls; (void) cls;
task = NULL; task = NULL;
@ -555,13 +555,21 @@ find_transfers (void *cls)
} }
wa_pos->reset_mode = GNUNET_NO; wa_pos->reset_mode = GNUNET_NO;
} }
wa_pos->delay = GNUNET_YES; wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */ wa_pos->current_batch_size = 0; /* reset counter */
wa_pos->session = session; wa_pos->session = session;
if (wa_pos->shard_end == wa_pos->last_row_off)
{
/* advance to next shard */
wa_pos->shard_end += shard_size;
}
limit = GNUNET_MIN (wa_pos->batch_size,
wa_pos->shard_end - wa_pos->last_row_off);
GNUNET_assert (NULL == wa_pos->hh);
wa_pos->hh = TALER_BANK_credit_history (ctx, wa_pos->hh = TALER_BANK_credit_history (ctx,
&wa_pos->auth, &wa_pos->auth,
wa_pos->last_row_off, wa_pos->last_row_off,
wa_pos->batch_size, limit,
&history_cb, &history_cb,
wa_pos); wa_pos);
if (NULL == wa_pos->hh) if (NULL == wa_pos->hh)
@ -594,6 +602,7 @@ run (void *cls,
(void) cls; (void) cls;
(void) args; (void) args;
(void) cfgfile; (void) cfgfile;
cfg = c; cfg = c;
if (GNUNET_OK != if (GNUNET_OK !=
exchange_serve_process_config ()) exchange_serve_process_config ())
@ -603,8 +612,6 @@ run (void *cls,
} }
wa_pos = wa_head; wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos); GNUNET_assert (NULL != wa_pos);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task, GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls); cls);
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
@ -615,6 +622,9 @@ run (void *cls,
GNUNET_break (0); GNUNET_break (0);
return; return;
} }
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
} }
@ -630,16 +640,21 @@ main (int argc,
char *const *argv) char *const *argv)
{ {
struct GNUNET_GETOPT_CommandLineOption options[] = { struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_option_flag ('r',
"reset",
"start fresh with all transactions in the history",
&reset_mode),
GNUNET_GETOPT_option_uint ('S',
"size",
"SIZE",
"Size to process per shard (default: 1024)",
&shard_size),
GNUNET_GETOPT_option_timetravel ('T', GNUNET_GETOPT_option_timetravel ('T',
"timetravel"), "timetravel"),
GNUNET_GETOPT_option_flag ('t', GNUNET_GETOPT_option_flag ('t',
"test", "test",
"run in test mode and exit when idle", "run in test mode and exit when idle",
&test_mode), &test_mode),
GNUNET_GETOPT_option_flag ('r',
"reset",
"start fresh with all transactions in the history",
&reset_mode),
GNUNET_GETOPT_OPTION_END GNUNET_GETOPT_OPTION_END
}; };
enum GNUNET_GenericReturnValue ret; enum GNUNET_GenericReturnValue ret;