preparations for sharded wirewatch

This commit is contained in:
Christian Grothoff 2021-06-20 16:41:04 +02:00
parent 0271e84813
commit 108bf57d04
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
4 changed files with 382 additions and 12 deletions

View File

@ -528,8 +528,20 @@ find_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
wa_pos->session = session;
if (wa_pos->shard_end == wa_pos->last_row_off)
{
/* advance to next shard */
// FIXME: if other processes are running in parallel,
// update 'last_row_off' to next free shard!
wa_pos->shard_end = wa_pos->last_row_off + shard_size;
}
if (! wa_pos->reset_mode)
{
// FIXME: need good way to fetch
// shard data here!
qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
session,
wa_pos->section_name,
@ -553,16 +565,8 @@ find_transfers (void *cls)
NULL);
return;
}
wa_pos->reset_mode = GNUNET_NO;
}
wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
wa_pos->session = session;
if (wa_pos->shard_end == wa_pos->last_row_off)
{
/* advance to next shard */
wa_pos->shard_end += shard_size;
}
wa_pos->reset_mode = true;
limit = GNUNET_MIN (wa_pos->batch_size,
wa_pos->shard_end - wa_pos->last_row_off);
GNUNET_assert (NULL == wa_pos->hh);

View File

@ -1,6 +1,6 @@
--
-- This file is part of TALER
-- Copyright (C) 2020 Taler Systems SA
-- Copyright (C) 2020-2021 Taler Systems SA
--
-- TALER is free software; you can redistribute it and/or modify it under the
-- terms of the GNU General Public License as published by the Free Software
@ -374,5 +374,37 @@ COMMENT ON TABLE signkey_revocations
IS 'remembering which online signing keys have been revoked';
CREATE TABLE IF NOT EXISTS work_shards
(shard_serial_id BIGSERIAL UNIQUE
,last_attempt INT8 NOT NULL
,start_row INT8 NOT NULL
,end_row INT8 NOT NULL
,completed BOOLEAN NOT NULL
,job_name VARCHAR NOT NULL
,PRIMARY KEY (job_name, start_row)
);
CREATE INDEX IF NOT EXISTS work_shards_index
ON work_shards
(job_name
,completed
,last_attempt
);
COMMENT ON TABLE work_shards
IS 'coordinates work between multiple processes working on the same job';
COMMENT ON COLUMN work_shards.shard_serial_id
IS 'unique serial number identifying the shard';
COMMENT ON COLUMN work_shards.last_attempt
IS 'last time a worker attempted to work on the shard';
COMMENT ON COLUMN work_shards.completed
IS 'set to TRUE once the shard is finished by a worker';
COMMENT ON COLUMN work_shards.start_row
IS 'row at which the shard scope starts, inclusive';
COMMENT ON COLUMN work_shards.end_row
IS 'row at which the shard scope ends, exclusive';
COMMENT ON COLUMN work_shards.job_name
IS 'unique name of the job the workers on this shard are performing';
-- Complete transaction
COMMIT;

View File

@ -1,6 +1,6 @@
/*
This file is part of TALER
Copyright (C) 2014--2020 Taler Systems SA
Copyright (C) 2014--2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@ -2438,6 +2438,52 @@ postgres_get_session (void *cls)
") VALUES "
"($1, $2, $3, $4, $5, $6, $7, $8);",
8),
/* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("get_open_shard",
"SELECT"
" start_row"
",end_row"
" FROM work_shards"
" WHERE job_name=$1"
" AND last_attempt<$2"
" AND completed=FALSE"
" ORDER BY last_attempt ASC"
" LIMIT 1;",
2),
GNUNET_PQ_make_prepare ("reclaim_shard",
"UPDATE work_shards"
" SET last_attempt=$2"
" WHERE job_name=$1"
" AND start_row=$3"
" AND end_row=$4",
4),
GNUNET_PQ_make_prepare ("get_last_shard",
"SELECT"
" end_row"
" FROM work_shards"
" WHERE job_name=$1"
" AND completed=FALSE"
" ORDER BY end_row DESC"
" LIMIT 1;",
1),
GNUNET_PQ_make_prepare ("claim_next_shard",
"INSERT INTO work_shards"
"(job_name"
",last_attempt"
",start_row"
",end_row"
") VALUES "
"($1, $2, $3, $4);",
4),
/* Used in #postgres_complete_shard() */
GNUNET_PQ_make_prepare ("complete_shard",
"UPDATE work_shards"
" SET completed=TRUE"
" WHERE job_name=$1"
" AND start_row=$2"
" AND end_row=$3",
3),
GNUNET_PQ_PREPARED_STATEMENT_END
};
@ -10149,6 +10195,251 @@ postgres_insert_records_by_table (void *cls,
}
/**
* Function called to grab a work shard on an operation @a op. Runs in its
* own transaction (hence no session provided).
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param job_name name of the operation to grab a word shard for
* @param delay minimum age of a shard to grab
* @param size desired shard size
* @param[out] start_row inclusive start row of the shard (returned)
* @param[out] end_row exclusive end row of the shard (returned)
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
postgres_begin_shard (void *cls,
const char *job_name,
struct GNUNET_TIME_Relative delay,
uint64_t shard_size,
uint64_t *start_row,
uint64_t *end_row)
{
struct TALER_EXCHANGEDB_Session *session;
session = postgres_get_session (cls);
if (NULL == session)
return GNUNET_DB_STATUS_HARD_ERROR;
for (unsigned int retries = 0; retries<3; retries++)
{
if (GNUNET_OK !=
postgres_start (cls,
session,
"begin_shard"))
{
GNUNET_break (0);
return GNUNET_DB_STATUS_HARD_ERROR;
}
{
struct GNUNET_TIME_Absolute past;
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_absolute_time (&past),
GNUNET_PQ_query_param_end
};
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("start_row",
start_row),
GNUNET_PQ_result_spec_uint64 ("end_row",
end_row),
GNUNET_PQ_result_spec_end
};
past = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
delay);
qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
"get_open_shard",
params,
rs);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (cls,
session);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (cls,
session);
continue;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
{
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_TIME_Absolute now;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_uint64 (start_row),
GNUNET_PQ_query_param_uint64 (end_row),
GNUNET_PQ_query_param_end
};
now = GNUNET_TIME_absolute_get ();
qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
"reclaim_shard",
params);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (cls,
session);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (cls,
session);
continue;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
goto commit;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0); /* logic error, should be impossible */
postgres_rollback (cls,
session);
return GNUNET_DB_STATUS_HARD_ERROR;
}
}
break; /* actually unreachable */
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
break; /* continued below */
}
} /* get_open_shard */
/* No open shard, find last 'end_row' */
{
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_end
};
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("end_row",
start_row),
GNUNET_PQ_result_spec_end
};
qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
"get_last_shard",
params,
rs);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (cls,
session);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (cls,
session);
continue;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
break;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
*start_row = 0; /* base-case: no shards yet */
break; /* continued below */
}
*end_row = *start_row + shard_size;
} /* get_last_shard */
/* Claim fresh shard */
{
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_TIME_Absolute now;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_uint64 (start_row),
GNUNET_PQ_query_param_uint64 (end_row),
GNUNET_PQ_query_param_end
};
now = GNUNET_TIME_absolute_get ();
qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
"claim_next_shard",
params);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (cls,
session);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (cls,
session);
continue;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* continued below */
break;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0);
postgres_rollback (cls,
session);
continue;
}
} /* claim_next_shard */
/* commit */
commit:
{
enum GNUNET_DB_QueryStatus qs;
qs = postgres_commit (cls,
session);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (cls,
session);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (cls,
session);
continue;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
}
} /* retry 'for' loop */
return GNUNET_DB_STATUS_SOFT_ERROR;
}
/**
* Function called to persist that work on a shard was completed.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param session a session
* @param job_name name of the operation to grab a word shard for
* @param start_row inclusive start row of the shard
* @param end_row exclusive end row of the shard
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
postgres_complete_shard (void *cls,
struct TALER_EXCHANGEDB_Session *session,
const char *job_name,
uint64_t start_row,
uint64_t end_row)
{
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_uint64 (&start_row),
GNUNET_PQ_query_param_uint64 (&end_row),
GNUNET_PQ_query_param_end
};
(void) cls;
return GNUNET_PQ_eval_prepared_non_select (session->conn,
"complete_shard",
params);
}
/**
* Initialize Postgres database subsystem.
*
@ -10353,6 +10644,10 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
= &postgres_lookup_records_by_table;
plugin->insert_records_by_table
= &postgres_insert_records_by_table;
plugin->begin_shard
= &postgres_begin_shard;
plugin->complete_shard
= &postgres_complete_shard;
return plugin;
}

View File

@ -1,6 +1,6 @@
/*
This file is part of TALER
Copyright (C) 2014-2020 Taler Systems SA
Copyright (C) 2014-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@ -3820,6 +3820,45 @@ struct TALER_EXCHANGEDB_Plugin
struct TALER_EXCHANGEDB_Session *session,
const struct TALER_EXCHANGEDB_TableData *td);
/**
* Function called to grab a work shard on an operation @a op. Runs in its
* own transaction (hence no session provided).
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param job_name name of the operation to grab a word shard for
* @param delay minimum age of a shard to grab
* @param size desired shard size
* @param[out] start_row inclusive start row of the shard (returned)
* @param[out] end_row exclusive end row of the shard (returned)
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
(*begin_shard)(void *cls,
const char *job_name,
struct GNUNET_TIME_Relative delay,
uint64_t shard_size,
uint64_t *start_row,
uint64_t *end_row);
/**
* Function called to persist that work on a shard was completed.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param session a session
* @param job_name name of the operation to grab a word shard for
* @param start_row inclusive start row of the shard
* @param end_row exclusive end row of the shard
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
(*complete_shard)(void *cls,
struct TALER_EXCHANGEDB_Session *session,
const char *job_name,
uint64_t start_row,
uint64_t end_row);
};
#endif /* _TALER_EXCHANGE_DB_H */