add wirewatch shutdown shard cleanup logic

This commit is contained in:
Christian Grothoff 2022-04-14 00:00:50 +02:00
parent 5882e6b56b
commit a13f90a15e
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
3 changed files with 63 additions and 1 deletions

View File

@ -226,6 +226,8 @@ shutdown_task (void *cls)
while (NULL != (wa = wa_head)) while (NULL != (wa = wa_head))
{ {
enum GNUNET_DB_QueryStatus qs;
if (NULL != wa->hh) if (NULL != wa->hh)
{ {
TALER_BANK_credit_history_cancel (wa->hh); TALER_BANK_credit_history_cancel (wa->hh);
@ -239,7 +241,13 @@ shutdown_task (void *cls)
db_plugin->rollback (db_plugin->cls); db_plugin->rollback (db_plugin->cls);
wa->started_transaction = false; wa->started_transaction = false;
} }
// FIXME: delete shard lock here (#7124) qs = db_plugin->abort_shard (db_plugin->cls,
wa_pos->job_name,
wa_pos->shard_start,
wa_pos->shard_end);
if (qs <= 0)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to abort work shard on shutdown\n");
GNUNET_free (wa->job_name); GNUNET_free (wa->job_name);
GNUNET_free (wa); GNUNET_free (wa);
} }

View File

@ -3354,6 +3354,15 @@ prepare_statements (struct PostgresClosure *pg)
" ORDER BY end_row DESC" " ORDER BY end_row DESC"
" LIMIT 1;", " LIMIT 1;",
1), 1),
/* Used in #postgres_abort_shard() */
GNUNET_PQ_make_prepare (
"abort_shard",
"UPDATE work_shards"
" SET last_attempt=0"
" WHERE job_name = $1 "
" AND start_row = $2 "
" AND end_row = $3;",
3),
/* Used in #postgres_begin_shard() */ /* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ( GNUNET_PQ_make_prepare (
"claim_next_shard", "claim_next_shard",
@ -12586,6 +12595,35 @@ commit:
} }
/**
* Function called to abort work on a shard.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param job_name name of the operation to abort a word shard for
* @param start_row inclusive start row of the shard
* @param end_row exclusive end row of the shard
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
postgres_abort_shard (void *cls,
const char *job_name,
uint64_t start_row,
uint64_t end_row)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_uint64 (&start_row),
GNUNET_PQ_query_param_uint64 (&end_row),
GNUNET_PQ_query_param_end
};
return GNUNET_PQ_eval_prepared_non_select (pg->conn,
"abort_shard",
params);
}
/** /**
* Function called to persist that work on a shard was completed. * Function called to persist that work on a shard was completed.
* *
@ -13889,6 +13927,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
= &postgres_insert_records_by_table; = &postgres_insert_records_by_table;
plugin->begin_shard plugin->begin_shard
= &postgres_begin_shard; = &postgres_begin_shard;
plugin->abort_shard
= &postgres_abort_shard;
plugin->complete_shard plugin->complete_shard
= &postgres_complete_shard; = &postgres_complete_shard;
plugin->begin_revolving_shard plugin->begin_revolving_shard

View File

@ -4296,6 +4296,20 @@ struct TALER_EXCHANGEDB_Plugin
uint64_t *start_row, uint64_t *start_row,
uint64_t *end_row); uint64_t *end_row);
/**
* Function called to abort work on a shard.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param job_name name of the operation to abort a word shard for
* @param start_row inclusive start row of the shard
* @param end_row exclusive end row of the shard
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
(*abort_shard)(void *cls,
const char *job_name,
uint64_t start_row,
uint64_t end_row);
/** /**
* Function called to persist that work on a shard was completed. * Function called to persist that work on a shard was completed.