add shard init for shard node

This commit is contained in:
Marco Boss 2022-03-31 21:31:07 +02:00
parent 0cf9a9984c
commit 75c1e10a67
No known key found for this signature in database
GPG Key ID: 89A3EC33C625C3DF
6 changed files with 130 additions and 44 deletions

View File

@ -50,9 +50,14 @@ static int gc_db;
static uint32_t num_partitions; static uint32_t num_partitions;
/** /**
* -S option: setup a sharded database * -F option: setup a sharded database, i.e. create foreign tables/server
*/ */
static uint32_t num_shards; static uint32_t num_foreign_servers;
/**
* -S option: setup a database on a shard server, creates tables with suffix shard_idx
*/
static uint32_t shard_idx;
/** /**
* Main function that will be run. * Main function that will be run.
@ -89,6 +94,20 @@ run (void *cls,
"Could not drop tables as requested. Either database was not yet initialized, or permission denied. Consult the logs. Will still try to create new tables.\n"); "Could not drop tables as requested. Either database was not yet initialized, or permission denied. Consult the logs. Will still try to create new tables.\n");
} }
} }
if (1 <
shard_idx)
{
if (GNUNET_OK != plugin->create_shard_tables (plugin->cls, shard_idx))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Could not create shard database\n");
global_ret = EXIT_NOTINSTALLED;
}
/* We do not want to continue if we are on a shard */
TALER_EXCHANGEDB_plugin_unload (plugin);
plugin = NULL;
return;
}
if (GNUNET_OK != if (GNUNET_OK !=
plugin->create_tables (plugin->cls)) plugin->create_tables (plugin->cls))
{ {
@ -118,9 +137,10 @@ run (void *cls,
} }
} }
else if (1 < else if (1 <
num_shards) num_foreign_servers)
{ {
if (GNUNET_OK != plugin->setup_shards (plugin->cls, num_shards)) if (GNUNET_OK != plugin->setup_foreign_servers (plugin->cls,
num_foreign_servers))
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Could not setup shards. Aborting\n"); "Could not setup shards. Aborting\n");
@ -200,11 +220,16 @@ main (int argc,
"NUMBER", "NUMBER",
"Setup a partitioned database where each table which can be partitioned holds NUMBER partitions on a single DB node (NOTE: this is different from sharding)", "Setup a partitioned database where each table which can be partitioned holds NUMBER partitions on a single DB node (NOTE: this is different from sharding)",
&num_partitions), &num_partitions),
GNUNET_GETOPT_option_uint ('F',
"foreign",
"NUMBER",
"Setup a sharded database whit N foreign servers (shards) / tables",
&num_foreign_servers),
GNUNET_GETOPT_option_uint ('S', GNUNET_GETOPT_option_uint ('S',
"shard", "shard",
"NUMBER", "INDEX",
"Setup a sharded database whit N shards", "Setup a shard server, creates tables with INDEX as suffix",
&num_shards), &shard_idx),
GNUNET_GETOPT_OPTION_END GNUNET_GETOPT_OPTION_END
}; };
enum GNUNET_GenericReturnValue ret; enum GNUNET_GenericReturnValue ret;

View File

@ -1484,6 +1484,7 @@ $$;
--------------------- Sharding --------------------------- --------------------- Sharding ---------------------------
---------------------- Shards ---------------------------- ---------------------- Shards ----------------------------
CREATE OR REPLACE FUNCTION setup_shard( CREATE OR REPLACE FUNCTION setup_shard(
shard_suffix VARCHAR shard_suffix VARCHAR
) )
@ -1553,6 +1554,7 @@ END
$$; $$;
------------------------------ Master ---------------------------------- ------------------------------ Master ----------------------------------
CREATE OR REPLACE FUNCTION create_foreign_table( CREATE OR REPLACE FUNCTION create_foreign_table(
source_table_name VARCHAR source_table_name VARCHAR
,modulus INTEGER ,modulus INTEGER
@ -1849,18 +1851,21 @@ COMMENT ON FUNCTION create_shard_server
IS 'Create a shard server on the master IS 'Create a shard server on the master
node with all foreign tables and user mappings'; node with all foreign tables and user mappings';
CREATE OR REPLACE FUNCTION create_shards( CREATE OR REPLACE FUNCTION create_foreign_servers(
num_shards INTEGER amount INTEGER
,domain VARCHAR DEFAULT 'perf.taler' ,domain VARCHAR DEFAULT 'perf.taler'
) )
RETURNS VOID RETURNS VOID
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
BEGIN BEGIN
FOR i IN 1..num_shards LOOP
PERFORM master_prepare_sharding();
FOR i IN 1..amount LOOP
PERFORM create_shard_server( PERFORM create_shard_server(
i i
,num_shards ,amount
,i ,i
,'shard-' || i::varchar || '.' || domain ,'shard-' || i::varchar || '.' || domain
,'taler' ,'taler'

View File

@ -1484,6 +1484,7 @@ $$;
--------------------- Sharding --------------------------- --------------------- Sharding ---------------------------
---------------------- Shards ---------------------------- ---------------------- Shards ----------------------------
CREATE OR REPLACE FUNCTION setup_shard( CREATE OR REPLACE FUNCTION setup_shard(
shard_suffix VARCHAR shard_suffix VARCHAR
) )
@ -1553,6 +1554,7 @@ END
$$; $$;
------------------------------ Master ---------------------------------- ------------------------------ Master ----------------------------------
CREATE OR REPLACE FUNCTION create_foreign_table( CREATE OR REPLACE FUNCTION create_foreign_table(
source_table_name VARCHAR source_table_name VARCHAR
,modulus INTEGER ,modulus INTEGER
@ -1849,18 +1851,21 @@ COMMENT ON FUNCTION create_shard_server
IS 'Create a shard server on the master IS 'Create a shard server on the master
node with all foreign tables and user mappings'; node with all foreign tables and user mappings';
CREATE OR REPLACE FUNCTION create_shards( CREATE OR REPLACE FUNCTION create_foreign_servers(
num_shards INTEGER amount INTEGER
,domain VARCHAR DEFAULT 'perf.taler' ,domain VARCHAR DEFAULT 'perf.taler'
) )
RETURNS VOID RETURNS VOID
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
BEGIN BEGIN
FOR i IN 1..num_shards LOOP
PERFORM master_prepare_sharding();
FOR i IN 1..amount LOOP
PERFORM create_shard_server( PERFORM create_shard_server(
i i
,num_shards ,amount
,i ,i
,'shard-' || i::varchar || '.' || domain ,'shard-' || i::varchar || '.' || domain
,'taler' ,'taler'

View File

@ -198,6 +198,49 @@ postgres_create_tables (void *cls)
} }
/**
* Create tables of a shard node with index idx
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
* @param idx the shards index, will be appended as suffix to all tables
* @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure
*/
static enum GNUNET_GenericReturnValue
postgres_create_shard_tables (void *cls,
uint32_t idx)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_Context *conn;
enum GNUNET_GenericReturnValue ret;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint32 (&idx),
GNUNET_PQ_query_param_end
};
struct GNUNET_PQ_PreparedStatement ps[] = {
GNUNET_PQ_make_prepare ("create_shard_tables",
"SELECT"
" setup_shard"
" ($1);",
1),
GNUNET_PQ_PREPARED_STATEMENT_END
};
conn = GNUNET_PQ_connect_with_cfg (pg->cfg,
"exchangedb-postgres",
"shard-",
NULL,
ps);
if (NULL == conn)
return GNUNET_SYSERR;
if (0 > GNUNET_PQ_eval_prepared_non_select (conn,
"create_shard_tables",
params))
ret = GNUNET_SYSERR;
GNUNET_PQ_disconnect (conn);
return ret;
}
/** /**
* Setup partitions of already existing tables * Setup partitions of already existing tables
* *
@ -207,7 +250,7 @@ postgres_create_tables (void *cls)
*/ */
static enum GNUNET_GenericReturnValue static enum GNUNET_GenericReturnValue
postgres_setup_partitions (void *cls, postgres_setup_partitions (void *cls,
const uint32_t num) uint32_t num)
{ {
struct PostgresClosure *pg = cls; struct PostgresClosure *pg = cls;
struct GNUNET_PQ_Context *conn; struct GNUNET_PQ_Context *conn;
@ -227,7 +270,7 @@ postgres_setup_partitions (void *cls,
conn = GNUNET_PQ_connect_with_cfg (pg->cfg, conn = GNUNET_PQ_connect_with_cfg (pg->cfg,
"exchangedb-postgres", "exchangedb-postgres",
"exchage-", NULL,
NULL, NULL,
ps); ps);
if (NULL == conn) if (NULL == conn)
@ -243,15 +286,15 @@ postgres_setup_partitions (void *cls,
/** /**
* Setup shards for already existing tables * Setup foreign servers (shards) for already existing tables
* *
* @param cls the `struct PostgresClosure` with the plugin-specific state * @param cls the `struct PostgresClosure` with the plugin-specific state
* @param num the number of shard servers to create for each partitioned table * @param num the number of foreign servers (shards) to create for each partitioned table
* @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure
*/ */
static enum GNUNET_GenericReturnValue static enum GNUNET_GenericReturnValue
postgres_setup_shards (void *cls, postgres_setup_foreign_servers (void *cls,
const uint32_t num) uint32_t num)
{ {
struct PostgresClosure *pg = cls; struct PostgresClosure *pg = cls;
struct GNUNET_PQ_Context *conn; struct GNUNET_PQ_Context *conn;
@ -261,14 +304,9 @@ postgres_setup_shards (void *cls,
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
struct GNUNET_PQ_PreparedStatement ps[] = { struct GNUNET_PQ_PreparedStatement ps[] = {
GNUNET_PQ_make_prepare ("setup_shards", GNUNET_PQ_make_prepare ("create_foreign_servers",
"SELECT" "SELECT"
" master_prepare_sharding" " create_foreign_servers"
" ();",
0),
GNUNET_PQ_make_prepare ("create_shards",
"SELECT"
" create_shards"
" ($1);", " ($1);",
1), 1),
GNUNET_PQ_PREPARED_STATEMENT_END GNUNET_PQ_PREPARED_STATEMENT_END
@ -276,18 +314,13 @@ postgres_setup_shards (void *cls,
conn = GNUNET_PQ_connect_with_cfg (pg->cfg, conn = GNUNET_PQ_connect_with_cfg (pg->cfg,
"exchangedb-postgres", "exchangedb-postgres",
"exchage-", NULL,
NULL, NULL,
ps); ps);
if (NULL == conn) if (NULL == conn)
return GNUNET_SYSERR; return GNUNET_SYSERR;
ret = GNUNET_OK;
if (0 > GNUNET_PQ_eval_prepared_non_select (conn, if (0 > GNUNET_PQ_eval_prepared_non_select (conn,
"setup_shards", "create_foreign_servers",
NULL))
ret = GNUNET_SYSERR;
if (0 > GNUNET_PQ_eval_prepared_non_select (conn,
"create_shards",
params)) params))
ret = GNUNET_SYSERR; ret = GNUNET_SYSERR;
GNUNET_PQ_disconnect (conn); GNUNET_PQ_disconnect (conn);
@ -13114,8 +13147,9 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->cls = pg; plugin->cls = pg;
plugin->drop_tables = &postgres_drop_tables; plugin->drop_tables = &postgres_drop_tables;
plugin->create_tables = &postgres_create_tables; plugin->create_tables = &postgres_create_tables;
plugin->create_shard_tables = &postgres_create_shard_tables;
plugin->setup_partitions = &postgres_setup_partitions; plugin->setup_partitions = &postgres_setup_partitions;
plugin->setup_shards = &postgres_setup_shards; plugin->setup_foreign_servers = &postgres_setup_foreign_servers;
plugin->start = &postgres_start; plugin->start = &postgres_start;
plugin->start_read_committed = &postgres_start_read_committed; plugin->start_read_committed = &postgres_start_read_committed;
plugin->commit = &postgres_commit; plugin->commit = &postgres_commit;

View File

@ -1484,6 +1484,7 @@ $$;
--------------------- Sharding --------------------------- --------------------- Sharding ---------------------------
---------------------- Shards ---------------------------- ---------------------- Shards ----------------------------
CREATE OR REPLACE FUNCTION setup_shard( CREATE OR REPLACE FUNCTION setup_shard(
shard_suffix VARCHAR shard_suffix VARCHAR
) )
@ -1553,6 +1554,7 @@ END
$$; $$;
------------------------------ Master ---------------------------------- ------------------------------ Master ----------------------------------
CREATE OR REPLACE FUNCTION create_foreign_table( CREATE OR REPLACE FUNCTION create_foreign_table(
source_table_name VARCHAR source_table_name VARCHAR
,modulus INTEGER ,modulus INTEGER
@ -1849,18 +1851,21 @@ COMMENT ON FUNCTION create_shard_server
IS 'Create a shard server on the master IS 'Create a shard server on the master
node with all foreign tables and user mappings'; node with all foreign tables and user mappings';
CREATE OR REPLACE FUNCTION create_shards( CREATE OR REPLACE FUNCTION create_foreign_servers(
num_shards INTEGER amount INTEGER
,domain VARCHAR DEFAULT 'perf.taler' ,domain VARCHAR DEFAULT 'perf.taler'
) )
RETURNS VOID RETURNS VOID
LANGUAGE plpgsql LANGUAGE plpgsql
AS $$ AS $$
BEGIN BEGIN
FOR i IN 1..num_shards LOOP
PERFORM master_prepare_sharding();
FOR i IN 1..amount LOOP
PERFORM create_shard_server( PERFORM create_shard_server(
i i
,num_shards ,amount
,i ,i
,'shard-' || i::varchar || '.' || domain ,'shard-' || i::varchar || '.' || domain
,'taler' ,'taler'

View File

@ -2229,6 +2229,17 @@ struct TALER_EXCHANGEDB_Plugin
enum GNUNET_GenericReturnValue enum GNUNET_GenericReturnValue
(*create_tables)(void *cls); (*create_tables)(void *cls);
/**
* Initialize the database of a shard node
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param idx the current shard index, will be appended to tables as suffix
* @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure
*/
enum GNUNET_GenericReturnValue
(*create_shard_tables)(void *cls,
uint32_t idx);
/** /**
* Change already present tables of the database to num partitions * Change already present tables of the database to num partitions
* Only has an effect if there are default partitions only * Only has an effect if there are default partitions only
@ -2239,10 +2250,11 @@ struct TALER_EXCHANGEDB_Plugin
*/ */
enum GNUNET_GenericReturnValue enum GNUNET_GenericReturnValue
(*setup_partitions)(void *cls, (*setup_partitions)(void *cls,
const uint32_t num); uint32_t num);
/** /**
* Change already present tables of the database to num shards * Change already present tables of the database to num foreign tables on
* num foreign servers (shards).
* Only has an effect if there are default partitions only * Only has an effect if there are default partitions only
* *
* @param cls the @e cls of this struct with the plugin-specific state * @param cls the @e cls of this struct with the plugin-specific state
@ -2253,8 +2265,8 @@ struct TALER_EXCHANGEDB_Plugin
* @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure * @return #GNUNET_OK upon success; #GNUNET_SYSERR upon failure
*/ */
enum GNUNET_GenericReturnValue enum GNUNET_GenericReturnValue
(*setup_shards)(void *cls, (*setup_foreign_servers)(void *cls,
const uint32_t num); uint32_t num);
/** /**
* Start a transaction. * Start a transaction.