-fix up wirewatch logic

This commit is contained in:
Christian Grothoff 2021-06-22 13:15:50 +02:00
parent c9a928fe35
commit 0caf3ac2b7
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 113 additions and 18 deletions

View File

@ -116,7 +116,7 @@ struct WireAccount
/** /**
* How much do we incremnt @e batch_size on success? * How much do we incremnt @e batch_size on success?
*/ */
unsigned int batch_increment; unsigned int batch_thresh;
/** /**
* How many transactions did we see in the current batch? * How many transactions did we see in the current batch?
@ -375,8 +375,8 @@ handle_soft_error (struct WireAccount *wa)
wa->session); wa->session);
if (1 < wa->batch_size) if (1 < wa->batch_size)
{ {
wa->batch_thresh = wa->batch_size;
wa->batch_size /= 2; wa->batch_size /= 2;
wa->batch_increment = 0;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Reduced batch size to %llu due to serialization issue\n", "Reduced batch size to %llu due to serialization issue\n",
(unsigned long long) wa->batch_size); (unsigned long long) wa->batch_size);
@ -451,9 +451,13 @@ do_commit (struct WireAccount *wa)
wa->session = NULL; /* should not be needed */ wa->session = NULL; /* should not be needed */
if (wa->batch_size < MAXIMUM_BATCH_SIZE) if (wa->batch_size < MAXIMUM_BATCH_SIZE)
{ {
wa->batch_increment++; int delta;
delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
if (delta < 0)
delta = -delta;
wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
wa->batch_size + wa->batch_increment); wa->batch_size + delta + 1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Increasing batch size to %llu\n", "Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size); (unsigned long long) wa->batch_size);
@ -669,7 +673,7 @@ find_transfers (void *cls)
} }
} }
if (GNUNET_OK != if (GNUNET_OK !=
db_plugin->start (db_plugin->cls, db_plugin->start_read_committed (db_plugin->cls,
session, session,
"wirewatch check for incoming wire transfers")) "wirewatch check for incoming wire transfers"))
{ {
@ -679,6 +683,7 @@ find_transfers (void *cls)
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
limit = GNUNET_MIN (wa_pos->batch_size, limit = GNUNET_MIN (wa_pos->batch_size,
wa_pos->shard_end - wa_pos->batch_start); wa_pos->shard_end - wa_pos->batch_start);
GNUNET_assert (NULL == wa_pos->hh); GNUNET_assert (NULL == wa_pos->hh);

View File

@ -426,7 +426,6 @@ postgres_get_session (void *cls)
" WHERE reserve_pub=$1" " WHERE reserve_pub=$1"
" LIMIT 1;", " LIMIT 1;",
1), 1),
/* Used in #postgres_reserves_in_insert() when the reserve is new */
GNUNET_PQ_make_prepare ("reserve_create", GNUNET_PQ_make_prepare ("reserve_create",
"INSERT INTO reserves " "INSERT INTO reserves "
"(reserve_pub" "(reserve_pub"
@ -2570,6 +2569,47 @@ postgres_start (void *cls,
} }
/**
* Start a READ COMMITTED transaction.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @param session the database connection
* @param name unique name identifying the transaction (for debugging)
* must point to a constant
* @return #GNUNET_OK on success
*/
static int
postgres_start_read_committed (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const char *name)
{
struct GNUNET_PQ_ExecuteStatement es[] = {
GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"),
GNUNET_PQ_EXECUTE_STATEMENT_END
};
(void) cls;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting transaction named: %s\n",
name);
postgres_preflight (cls,
session);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Starting transaction on %p\n",
session->conn);
if (GNUNET_OK !=
GNUNET_PQ_exec_statements (session->conn,
es))
{
TALER_LOG_ERROR ("Failed to start transaction\n");
GNUNET_break (0);
return GNUNET_SYSERR;
}
session->transaction_name = name;
return GNUNET_OK;
}
/** /**
* Roll back the current transaction of a database connection. * Roll back the current transaction of a database connection.
* *
@ -3446,11 +3486,18 @@ postgres_reserves_in_insert (void *cls,
enum GNUNET_DB_QueryStatus qs1; enum GNUNET_DB_QueryStatus qs1;
struct TALER_EXCHANGEDB_Reserve reserve; struct TALER_EXCHANGEDB_Reserve reserve;
struct GNUNET_TIME_Absolute expiry; struct GNUNET_TIME_Absolute expiry;
struct GNUNET_TIME_Absolute gc;
struct GNUNET_TIME_Absolute now;
now = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&now);
reserve.pub = *reserve_pub; reserve.pub = *reserve_pub;
expiry = GNUNET_TIME_absolute_add (execution_time, expiry = GNUNET_TIME_absolute_add (execution_time,
pg->idle_reserve_expiration_time); pg->idle_reserve_expiration_time);
(void) GNUNET_TIME_round_abs (&expiry); (void) GNUNET_TIME_round_abs (&expiry);
gc = GNUNET_TIME_absolute_add (now,
pg->legal_reserve_expiration_time);
(void) GNUNET_TIME_round_abs (&gc);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Creating reserve %s with expiration in %s\n", "Creating reserve %s with expiration in %s\n",
TALER_B2S (reserve_pub), TALER_B2S (reserve_pub),
@ -3467,7 +3514,7 @@ postgres_reserves_in_insert (void *cls,
GNUNET_PQ_query_param_string (sender_account_details), GNUNET_PQ_query_param_string (sender_account_details),
TALER_PQ_query_param_amount (balance), TALER_PQ_query_param_amount (balance),
TALER_PQ_query_param_absolute_time (&expiry), TALER_PQ_query_param_absolute_time (&expiry),
TALER_PQ_query_param_absolute_time (&expiry), TALER_PQ_query_param_absolute_time (&gc),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
@ -3505,6 +3552,13 @@ postgres_reserves_in_insert (void *cls,
} }
if (0 >= qs2) if (0 >= qs2)
{ {
if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) &&
(GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) )
{
GNUNET_break (0); /* should be impossible: reserve was fresh,
but transaction already known */
return GNUNET_DB_STATUS_HARD_ERROR;
}
/* Transaction was already known or error. We are finished. */ /* Transaction was already known or error. We are finished. */
return qs2; return qs2;
} }
@ -3514,6 +3568,22 @@ postgres_reserves_in_insert (void *cls,
/* we were wrong with our optimistic assumption: /* we were wrong with our optimistic assumption:
reserve does exist, need to do an update instead */ reserve does exist, need to do an update instead */
{
enum GNUNET_DB_QueryStatus cs;
cs = postgres_commit (cls,
session);
if (cs < 0)
return cs;
if (GNUNET_OK !=
postgres_start (cls,
session,
"reserve-update-serializable"))
{
GNUNET_break (0);
return GNUNET_DB_STATUS_HARD_ERROR;
}
}
{ {
enum GNUNET_DB_QueryStatus reserve_exists; enum GNUNET_DB_QueryStatus reserve_exists;
@ -3560,7 +3630,7 @@ postgres_reserves_in_insert (void *cls,
updated_reserve.expiry = GNUNET_TIME_absolute_max (expiry, updated_reserve.expiry = GNUNET_TIME_absolute_max (expiry,
reserve.expiry); reserve.expiry);
(void) GNUNET_TIME_round_abs (&updated_reserve.expiry); (void) GNUNET_TIME_round_abs (&updated_reserve.expiry);
updated_reserve.gc = GNUNET_TIME_absolute_max (updated_reserve.expiry, updated_reserve.gc = GNUNET_TIME_absolute_max (gc,
reserve.gc); reserve.gc);
(void) GNUNET_TIME_round_abs (&updated_reserve.gc); (void) GNUNET_TIME_round_abs (&updated_reserve.gc);
qs3 = reserves_update (cls, qs3 = reserves_update (cls,
@ -3581,8 +3651,26 @@ postgres_reserves_in_insert (void *cls,
/* continued below */ /* continued below */
break; break;
} }
return qs3;
} }
/* Go back to original transaction mode */
{
enum GNUNET_DB_QueryStatus cs;
cs = postgres_commit (cls,
session);
if (cs < 0)
return cs;
if (GNUNET_OK !=
postgres_start_read_committed (cls,
session,
"reserve-insert-continued"))
{
GNUNET_break (0);
return GNUNET_DB_STATUS_HARD_ERROR;
}
}
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
} }
@ -3698,7 +3786,7 @@ postgres_insert_withdraw_info (
struct PostgresClosure *pg = cls; struct PostgresClosure *pg = cls;
struct TALER_EXCHANGEDB_Reserve reserve; struct TALER_EXCHANGEDB_Reserve reserve;
struct GNUNET_TIME_Absolute now; struct GNUNET_TIME_Absolute now;
struct GNUNET_TIME_Absolute expiry; struct GNUNET_TIME_Absolute gc;
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&collectable->h_coin_envelope), GNUNET_PQ_query_param_auto_from_type (&collectable->h_coin_envelope),
GNUNET_PQ_query_param_auto_from_type (&collectable->denom_pub_hash), GNUNET_PQ_query_param_auto_from_type (&collectable->denom_pub_hash),
@ -3749,9 +3837,9 @@ postgres_insert_withdraw_info (
TALER_B2S (&collectable->reserve_pub)); TALER_B2S (&collectable->reserve_pub));
return GNUNET_DB_STATUS_HARD_ERROR; return GNUNET_DB_STATUS_HARD_ERROR;
} }
expiry = GNUNET_TIME_absolute_add (now, gc = GNUNET_TIME_absolute_add (now,
pg->legal_reserve_expiration_time); pg->legal_reserve_expiration_time);
reserve.gc = GNUNET_TIME_absolute_max (expiry, reserve.gc = GNUNET_TIME_absolute_max (gc,
reserve.gc); reserve.gc);
(void) GNUNET_TIME_round_abs (&reserve.gc); (void) GNUNET_TIME_round_abs (&reserve.gc);
qs = reserves_update (cls, qs = reserves_update (cls,
@ -8474,6 +8562,7 @@ postgres_insert_recoup_request (
{ {
struct PostgresClosure *pg = cls; struct PostgresClosure *pg = cls;
struct GNUNET_TIME_Absolute expiry; struct GNUNET_TIME_Absolute expiry;
struct GNUNET_TIME_Absolute gc;
struct TALER_EXCHANGEDB_Reserve reserve; struct TALER_EXCHANGEDB_Reserve reserve;
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&coin->coin_pub), GNUNET_PQ_query_param_auto_from_type (&coin->coin_pub),
@ -8517,9 +8606,9 @@ postgres_insert_recoup_request (
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Inserting recoup for coin %s\n", "Inserting recoup for coin %s\n",
TALER_B2S (&coin->coin_pub)); TALER_B2S (&coin->coin_pub));
expiry = GNUNET_TIME_absolute_add (timestamp, gc = GNUNET_TIME_absolute_add (timestamp,
pg->legal_reserve_expiration_time); pg->legal_reserve_expiration_time);
reserve.gc = GNUNET_TIME_absolute_max (expiry, reserve.gc = GNUNET_TIME_absolute_max (gc,
reserve.gc); reserve.gc);
(void) GNUNET_TIME_round_abs (&reserve.gc); (void) GNUNET_TIME_round_abs (&reserve.gc);
expiry = GNUNET_TIME_absolute_add (timestamp, expiry = GNUNET_TIME_absolute_add (timestamp,
@ -10549,6 +10638,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->drop_tables = &postgres_drop_tables; plugin->drop_tables = &postgres_drop_tables;
plugin->create_tables = &postgres_create_tables; plugin->create_tables = &postgres_create_tables;
plugin->start = &postgres_start; plugin->start = &postgres_start;
plugin->start_read_committed = &postgres_start_read_committed;
plugin->commit = &postgres_commit; plugin->commit = &postgres_commit;
plugin->preflight = &postgres_preflight; plugin->preflight = &postgres_preflight;
plugin->rollback = &postgres_rollback; plugin->rollback = &postgres_rollback;