preliminary work on supporting sharding/parallel aggregation (undertested, but tests pass again)

This commit is contained in:
Christian Grothoff 2021-09-03 19:08:02 +02:00
parent 6e1877b142
commit 5149af9314
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
13 changed files with 3185 additions and 1658 deletions

View File

@ -1,3 +1,6 @@
Fri 03 Sep 2021 07:02:05 PM CEST
Add experimental aggregator sharding logic. -CG
Sat 28 Aug 2021 05:22:57 PM CEST
Fixed various memory leaks.
Fixed database initialization sequence to avoid warning on first request.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
/*
This file is part of TALER
Copyright (C) 2014, 2015 Taler Systems SA
Copyright (C) 2014-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@ -17,6 +17,7 @@
* @file exchange-tools/taler-exchange-dbinit.c
* @brief Create tables for the exchange database.
* @author Florian Dold
* @author Christian Grothoff
*/
#include "platform.h"
#include <gnunet/gnunet_util_lib.h>
@ -33,6 +34,11 @@ static int global_ret;
*/
static int reset_db;
/**
* -s option: clear revolving shard locks
*/
static int clear_shards;
/**
* -g option: garbage collect DB reset
*/
@ -83,6 +89,14 @@ run (void *cls,
global_ret = EXIT_NOPERMISSION;
return;
}
if (clear_shards)
{
if (0 < plugin->delete_revolving_shards (plugin->cls))
{
fprintf (stderr,
"Clearing revolving shards failed!\n");
}
}
if (gc_db)
{
if (GNUNET_SYSERR == plugin->gc (plugin->cls))
@ -108,14 +122,18 @@ main (int argc,
char *const *argv)
{
const struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_option_flag ('r',
"reset",
"reset database (DANGEROUS: all existing data is lost!)",
&reset_db),
GNUNET_GETOPT_option_flag ('g',
"gc",
"garbage collect database",
&gc_db),
GNUNET_GETOPT_option_flag ('r',
"reset",
"reset database (DANGEROUS: all existing data is lost!)",
&reset_db),
GNUNET_GETOPT_option_flag ('s',
"shardunlock",
"unlock all revolving shard locks (use after system crash or shard size change while services are not running)",
&clear_shards),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;

View File

@ -45,6 +45,17 @@ BASE_URL = http://localhost:8081/
# sleep if it has nothing to do?
AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
# Values of 0 or above 2^31 disable sharding, which
# is a sane default for most use-cases.
# When changing this value, you MUST stop all
# aggregators and manually run
#
# $ taler-exchange-dbinit -s
#
# against the exchange's database. Otherwise, the
# aggregation logic will break badly!
AGGREGATOR_SHARD_SIZE = 2147483648
# How long should wirewatch sleep if it has nothing to do?
# (Set very aggressively here for the demonstrators to be
# super fast.)

View File

@ -107,6 +107,35 @@ struct AggregationUnit
};
/**
* Work shard we are processing.
*/
struct Shard
{
/**
* When did we start processing the shard?
*/
struct GNUNET_TIME_Absolute start_time;
/**
* Starting row of the shard.
*/
uint32_t shard_start;
/**
* Exclusive end row of the shard.
*/
uint32_t shard_end;
/**
* Number of starting points found in the shard.
*/
uint64_t work_counter;
};
/**
* What is the smallest unit we support for wire transfers?
* We will need to round down to a multiple of this amount.
@ -135,11 +164,19 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
*/
static struct GNUNET_SCHEDULER_Task *task;
/**
* How long should we sleep when idle before trying to find more work?
*/
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
/**
* How big are the shards we are processing? Is an inclusive offset, so every
* shard ranges from [X,X+shard_size) exclusive. So a shard covers
* shard_size slots. The maximum value for shard_size is INT32_MAX+1.
*/
static uint32_t shard_size;
/**
* Value to return from main(). 0 on success, non-zero on errors.
*/
@ -161,6 +198,15 @@ static void
run_aggregation (void *cls);
/**
* Select a shard to work on.
*
* @param cls NULL
*/
static void
run_shard (void *cls);
/**
* Free data stored in @a au, but not @a au itself (stack allocated).
*
@ -611,31 +657,57 @@ commit_or_warn (void)
}
/**
* Release lock on shard @a s in the database.
* On error, terminates this process.
*
* @param[in] s shard to free (and memory to release)
*/
static void
release_shard (struct Shard *s)
{
enum GNUNET_DB_QueryStatus qs;
qs = db_plugin->release_revolving_shard (
db_plugin->cls,
"aggregator",
s->shard_start,
s->shard_end);
GNUNET_free (s);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* Strange, but let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
break;
}
}
/**
* Main work function that queries the DB and aggregates transactions
* into larger wire transfers.
*
* @param cls NULL
* @param cls a `struct Shard *`
*/
static void
run_aggregation (void *cls)
{
struct Shard *s = cls;
struct AggregationUnit au_active;
enum GNUNET_DB_QueryStatus qs;
(void) cls;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for ready deposits to aggregate\n");
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database connection!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
if (GNUNET_OK !=
db_plugin->start_deferred_wire_out (db_plugin->cls))
{
@ -643,51 +715,71 @@ run_aggregation (void *cls)
"Failed to start database transaction!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
}
memset (&au_active,
0,
sizeof (au_active));
qs = db_plugin->get_ready_deposit (db_plugin->cls,
qs = db_plugin->get_ready_deposit (
db_plugin->cls,
s->shard_start,
s->shard_end - 1, /* -1: exclusive->inclusive */
&deposit_cb,
&au_active);
if (0 >= qs)
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute deposit iteration!\n");
"Failed to begin deposit iteration!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
/* should re-try immediately */
case GNUNET_DB_STATUS_SOFT_ERROR:
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
s);
return;
}
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
{
uint64_t counter = s->work_counter;
struct GNUNET_TIME_Relative duration
= GNUNET_TIME_absolute_get_duration (s->start_time);
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more ready deposits, going to sleep\n");
"Completed shard after %s\n",
GNUNET_STRINGS_relative_time_to_string (duration,
GNUNET_YES));
release_shard (s);
if (GNUNET_YES == test_mode)
{
/* in test mode, shutdown if we end up being idle */
/* in test mode, shutdown after a shard is done */
GNUNET_SCHEDULER_shutdown ();
}
else
{
/* nothing to do, sleep for a minute and try again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
&run_aggregation,
NULL);
}
return;
}
GNUNET_assert (NULL == task);
/* If we ended up doing zero work, sleep a bit */
if (0 == counter)
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
&run_shard,
NULL);
else
task = GNUNET_SCHEDULER_add_now (&run_shard,
NULL);
return;
}
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
s->work_counter++;
/* continued below */
break;
}
/* Now try to find other deposits to aggregate */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@ -707,6 +799,7 @@ run_aggregation (void *cls)
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@ -718,7 +811,7 @@ run_aggregation (void *cls)
cleanup_au (&au_active);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
s);
return;
}
@ -754,6 +847,7 @@ run_aggregation (void *cls)
global_ret = EXIT_FAILURE;
cleanup_au (&au_active);
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
}
/* Mark transactions by row_id as minor */
@ -778,7 +872,7 @@ run_aggregation (void *cls)
/* start again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
s);
return;
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
@ -787,6 +881,7 @@ run_aggregation (void *cls)
cleanup_au (&au_active);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
}
/* commit */
@ -796,20 +891,13 @@ run_aggregation (void *cls)
/* start again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
s);
return;
}
{
char *amount_s;
amount_s = TALER_amount_to_string (&au_active.final_amount);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Preparing wire transfer of %s to %s\n",
amount_s,
TALER_amount2s (&au_active.final_amount),
TALER_B2S (&au_active.merchant_pub));
GNUNET_free (amount_s);
}
{
void *buf;
size_t buf_size;
@ -856,7 +944,7 @@ run_aggregation (void *cls)
/* start again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
s);
return;
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
@ -866,6 +954,7 @@ run_aggregation (void *cls)
/* die hard */
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
}
@ -882,29 +971,75 @@ run_aggregation (void *cls)
"Commit issue for prepared wire data; trying again later!\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
s);
return;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Preparation complete, going again\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
NULL);
s);
return;
default:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
}
}
/**
* Select a shard to work on.
*
* @param cls NULL
*/
static void
run_shard (void *cls)
{
struct Shard *s;
enum GNUNET_DB_QueryStatus qs;
(void) cls;
task = NULL;
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database connection!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
s = GNUNET_new (struct Shard);
s->start_time = GNUNET_TIME_absolute_get ();
qs = db_plugin->begin_revolving_shard (db_plugin->cls,
"aggregator",
shard_size,
1U + INT32_MAX,
&s->shard_start,
&s->shard_end);
if (0 >= qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to begin shard!\n");
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
}
/**
* First task.
*
@ -919,6 +1054,7 @@ run (void *cls,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
unsigned long long ass;
(void) cls;
(void) args;
(void) cfgfile;
@ -930,8 +1066,23 @@ run (void *cls,
global_ret = EXIT_NOTCONFIGURED;
return;
}
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (cfg,
"exchange",
"AGGREGATOR_SHARD_SIZE",
&ass))
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
return;
}
if ( (0 == ass) ||
(ass > INT32_MAX) )
shard_size = 1U + INT32_MAX;
else
shard_size = (uint32_t) ass;
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
task = GNUNET_SCHEDULER_add_now (&run_shard,
NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);

View File

@ -18,8 +18,10 @@ sql_DATA = \
exchange-0000.sql \
exchange-0001.sql \
exchange-0002.sql \
exchange-0003.sql \
drop0001.sql \
drop0002.sql
drop0002.sql \
drop0003.sql
EXTRA_DIST = \
exchangedb.conf \

View File

@ -17,8 +17,6 @@
-- Everything in one big transaction
BEGIN;
-- exchange-0002 did not create new tables, so nothing to do here.
-- Unregister patch (0002.sql)
SELECT _v.unregister_patch('exchange-0002');

View File

@ -0,0 +1,26 @@
--
-- This file is part of TALER
-- Copyright (C) 2020 Taler Systems SA
--
-- TALER is free software; you can redistribute it and/or modify it under the
-- terms of the GNU General Public License as published by the Free Software
-- Foundation; either version 3, or (at your option) any later version.
--
-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
--
-- You should have received a copy of the GNU General Public License along with
-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
--
-- Everything in one big transaction
BEGIN;
-- Unregister patch (0003.sql)
SELECT _v.unregister_patch('exchange-0003');
DROP TABLE IF EXISTS revolving_work_shards CASCADE;
-- And we're out of here...
COMMIT;

View File

@ -0,0 +1,75 @@
--
-- This file is part of TALER
-- Copyright (C) 2021 Taler Systems SA
--
-- TALER is free software; you can redistribute it and/or modify it under the
-- terms of the GNU General Public License as published by the Free Software
-- Foundation; either version 3, or (at your option) any later version.
--
-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
-- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
--
-- You should have received a copy of the GNU General Public License along with
-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
--
-- Everything in one big transaction
BEGIN;
-- Check patch versioning is in place.
SELECT _v.register_patch('exchange-0003', NULL, NULL);
ALTER TABLE deposits
ADD COLUMN shard INT4 NOT NULL DEFAULT 0;
COMMENT ON COLUMN deposits.shard
IS 'Used for load sharding. Should be set based on h_wire, merchant_pub and a service salt. Default of 0 onlyapplies for colums migrated from a previous version without sharding support. 64-bit value because we need an *unsigned* 32-bit value.';
DROP INDEX deposits_get_ready_index;
CREATE INDEX deposits_get_ready_index
ON deposits
(shard
,tiny
,done
,wire_deadline
,refund_deadline
);
COMMENT ON INDEX deposits_get_ready_index
IS 'for deposits_get_ready';
CREATE UNLOGGED TABLE IF NOT EXISTS revolving_work_shards
(shard_serial_id BIGSERIAL UNIQUE
,last_attempt INT8 NOT NULL
,start_row INT4 NOT NULL
,end_row INT4 NOT NULL
,active BOOLEAN NOT NULL DEFAULT FALSE
,job_name VARCHAR NOT NULL
,PRIMARY KEY (job_name, start_row)
);
CREATE INDEX IF NOT EXISTS revolving_work_shards_index
ON revolving_work_shards
(job_name
,active
,last_attempt
);
COMMENT ON TABLE revolving_work_shards
IS 'coordinates work between multiple processes working on the same job with partitions that need to be repeatedly processed; unlogged because on system crashes the locks represented by this table will have to be cleared anyway, typically using "taler-exchange-dbinit -s"';
COMMENT ON COLUMN revolving_work_shards.shard_serial_id
IS 'unique serial number identifying the shard';
COMMENT ON COLUMN revolving_work_shards.last_attempt
IS 'last time a worker attempted to work on the shard';
COMMENT ON COLUMN revolving_work_shards.active
IS 'set to TRUE when a worker is active on the shard';
COMMENT ON COLUMN revolving_work_shards.start_row
IS 'row at which the shard scope starts, inclusive';
COMMENT ON COLUMN revolving_work_shards.end_row
IS 'row at which the shard scope ends, exclusive';
COMMENT ON COLUMN revolving_work_shards.job_name
IS 'unique name of the job the workers on this shard are performing';
-- Complete transaction
COMMIT;

View File

@ -874,11 +874,12 @@ prepare_statements (struct PostgresClosure *pg)
",coin_sig"
",wire"
",exchange_timestamp"
",shard"
") SELECT known_coin_id, $2, $3, $4, $5, $6, "
" $7, $8, $9, $10, $11, $12"
" $7, $8, $9, $10, $11, $12, $13"
" FROM known_coins"
" WHERE coin_pub=$1;",
12),
13),
/* Fetch an existing deposit request, used to ensure idempotency
during /deposit processing. Used in #postgres_have_deposit(). */
GNUNET_PQ_make_prepare ("get_deposit",
@ -958,13 +959,18 @@ prepare_statements (struct PostgresClosure *pg)
" FROM deposits"
" JOIN known_coins kc USING (known_coin_id)"
" JOIN denominations denom USING (denominations_serial)"
" WHERE tiny=FALSE"
" WHERE "
" shard >= $2"
" AND shard <= $3"
" AND tiny=FALSE"
" AND done=FALSE"
" AND wire_deadline<=$1"
" AND refund_deadline<$1"
" ORDER BY wire_deadline ASC"
" ORDER BY "
" shard ASC"
" ,wire_deadline ASC"
" LIMIT 1;",
1),
3),
/* Used in #postgres_iterate_matching_deposits() */
GNUNET_PQ_make_prepare ("deposits_iterate_matching",
"SELECT"
@ -2399,6 +2405,18 @@ prepare_statements (struct PostgresClosure *pg)
" ORDER BY last_attempt ASC"
" LIMIT 1;",
2),
/* Used in #postgres_begin_revolving_shard() */
GNUNET_PQ_make_prepare ("get_open_revolving_shard",
"SELECT"
" start_row"
",end_row"
" FROM revolving_work_shards"
" WHERE job_name=$1"
" AND active=FALSE"
" ORDER BY last_attempt ASC"
" LIMIT 1;",
2),
/* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("reclaim_shard",
"UPDATE work_shards"
" SET last_attempt=$2"
@ -2406,6 +2424,16 @@ prepare_statements (struct PostgresClosure *pg)
" AND start_row=$3"
" AND end_row=$4",
4),
/* Used in #postgres_begin_revolving_shard() */
GNUNET_PQ_make_prepare ("reclaim_revolving_shard",
"UPDATE revolving_work_shards"
" SET last_attempt=$2"
" ,active=TRUE"
" WHERE job_name=$1"
" AND start_row=$3"
" AND end_row=$4",
4),
/* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("get_last_shard",
"SELECT"
" end_row"
@ -2414,6 +2442,16 @@ prepare_statements (struct PostgresClosure *pg)
" ORDER BY end_row DESC"
" LIMIT 1;",
1),
/* Used in #postgres_begin_revolving_shard() */
GNUNET_PQ_make_prepare ("get_last_revolving_shard",
"SELECT"
" end_row"
" FROM revolving_work_shards"
" WHERE job_name=$1"
" ORDER BY end_row DESC"
" LIMIT 1;",
1),
/* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("claim_next_shard",
"INSERT INTO work_shards"
"(job_name"
@ -2423,6 +2461,17 @@ prepare_statements (struct PostgresClosure *pg)
") VALUES "
"($1, $2, $3, $4);",
4),
/* Used in #postgres_claim_revolving_shard() */
GNUNET_PQ_make_prepare ("create_revolving_shard",
"INSERT INTO revolving_work_shards"
"(job_name"
",last_attempt"
",start_row"
",end_row"
",active"
") VALUES "
"($1, $2, $3, $4, TRUE);",
4),
/* Used in #postgres_complete_shard() */
GNUNET_PQ_make_prepare ("complete_shard",
"UPDATE work_shards"
@ -2431,6 +2480,18 @@ prepare_statements (struct PostgresClosure *pg)
" AND start_row=$2"
" AND end_row=$3",
3),
/* Used in #postgres_complete_shard() */
GNUNET_PQ_make_prepare ("release_revolving_shard",
"UPDATE revolving_work_shards"
" SET active=FALSE"
" WHERE job_name=$1"
" AND start_row=$2"
" AND end_row=$3",
3),
/* Used in #postgres_delete_revolving_shards() */
GNUNET_PQ_make_prepare ("delete_revolving_shards",
"DELETE FROM revolving_work_shards",
0),
GNUNET_PQ_PREPARED_STATEMENT_END
};
@ -4462,12 +4523,16 @@ postgres_mark_deposit_done (void *cls,
* execution time must be in the past.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param start_shard_row minimum shard row to select
* @param end_shard_row maximum shard row to select (inclusive)
* @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
postgres_get_ready_deposit (void *cls,
uint32_t start_shard_row,
uint32_t end_shard_row,
TALER_EXCHANGEDB_DepositIterator deposit_cb,
void *deposit_cb_cls)
{
@ -4475,6 +4540,8 @@ postgres_get_ready_deposit (void *cls,
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
struct GNUNET_PQ_QueryParam params[] = {
TALER_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_uint32 (&start_shard_row),
GNUNET_PQ_query_param_uint32 (&end_shard_row),
GNUNET_PQ_query_param_end
};
struct TALER_Amount amount_with_fee;
@ -4504,6 +4571,8 @@ postgres_get_ready_deposit (void *cls,
enum GNUNET_DB_QueryStatus qs;
(void) GNUNET_TIME_round_abs (&now);
GNUNET_assert (start_shard_row < end_shard_row);
GNUNET_assert (end_shard_row <= INT32_MAX);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Finding ready deposits by deadline %s (%llu)\n",
GNUNET_STRINGS_absolute_time_to_string (now),
@ -4900,6 +4969,35 @@ postgres_ensure_coin_known (void *cls,
}
/**
* Compute the shard number of a given @a deposit
*
* @param deposit deposit to compute shard for
* @return shard number
*/
static uint32_t
compute_shard (const struct TALER_EXCHANGEDB_Deposit *deposit)
{
uint32_t res;
GNUNET_assert (GNUNET_YES ==
GNUNET_CRYPTO_kdf (&res,
sizeof (res),
&deposit->h_wire,
sizeof (deposit->h_wire),
&deposit->merchant_pub,
sizeof (deposit->merchant_pub),
NULL, 0));
/* interpret hash result as NBO for platform independence,
convert to HBO and map to [0..2^31-1] range */
res = ntohl (res);
if (res > INT32_MAX)
res += INT32_MIN;
GNUNET_assert (res <= INT32_MAX);
return res;
}
/**
* Insert information about deposited coin into the database.
*
@ -4914,6 +5012,7 @@ postgres_insert_deposit (void *cls,
const struct TALER_EXCHANGEDB_Deposit *deposit)
{
struct PostgresClosure *pg = cls;
uint32_t shard = compute_shard (deposit);
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&deposit->coin.coin_pub),
TALER_PQ_query_param_amount (&deposit->amount_with_fee),
@ -4926,9 +5025,11 @@ postgres_insert_deposit (void *cls,
GNUNET_PQ_query_param_auto_from_type (&deposit->csig),
TALER_PQ_query_param_json (deposit->receiver_wire_account),
TALER_PQ_query_param_absolute_time (&exchange_timestamp),
GNUNET_PQ_query_param_uint32 (&shard),
GNUNET_PQ_query_param_end
};
GNUNET_assert (shard <= INT32_MAX);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Inserting deposit to be executed at %s (%llu/%llu)\n",
GNUNET_STRINGS_absolute_time_to_string (deposit->wire_deadline),
@ -6933,18 +7034,19 @@ postgres_wire_prepare_data_get (void *cls,
/**
* Start a transaction where we transiently violate the foreign
* Starts a READ COMMITTED transaction where we transiently violate the foreign
* constraints on the "wire_out" table as we insert aggregations
* and only add the wire transfer out at the end.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @return #GNUNET_OK on success
*/
static int
static enum GNUNET_GenericReturnValue
postgres_start_deferred_wire_out (void *cls)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_ExecuteStatement es[] = {
GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"),
GNUNET_PQ_make_execute ("SET CONSTRAINTS wire_out_ref DEFERRED"),
GNUNET_PQ_EXECUTE_STATEMENT_END
};
@ -6952,10 +7054,6 @@ postgres_start_deferred_wire_out (void *cls)
if (GNUNET_SYSERR ==
postgres_preflight (pg))
return GNUNET_SYSERR;
if (GNUNET_OK !=
postgres_start (pg,
"deferred wire out"))
return GNUNET_SYSERR;
if (GNUNET_OK !=
GNUNET_PQ_exec_statements (pg->conn,
es))
@ -6966,6 +7064,7 @@ postgres_start_deferred_wire_out (void *cls)
postgres_rollback (pg);
return GNUNET_SYSERR;
}
pg->transaction_name = "deferred wire out";
return GNUNET_OK;
}
@ -8041,7 +8140,7 @@ struct RecoupSerialContext
/**
* Status code, set to #GNUNET_SYSERR on hard errors.
*/
int status;
enum GNUNET_GenericReturnValue status;
};
@ -10380,6 +10479,268 @@ postgres_complete_shard (void *cls,
}
/**
* Function called to grab a revolving work shard on an operation @a op. Runs
* in its own transaction. Returns the oldest inactive shard.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param job_name name of the operation to grab a revolving shard for
* @param shard_size desired shard size
* @param shard_limit exclusive end of the shard range
* @param[out] start_row inclusive start row of the shard (returned)
* @param[out] end_row exclusive end row of the shard (returned)
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
postgres_begin_revolving_shard (void *cls,
const char *job_name,
uint32_t shard_size,
uint32_t shard_limit,
uint32_t *start_row,
uint32_t *end_row)
{
struct PostgresClosure *pg = cls;
GNUNET_assert (shard_limit <= 1U + (uint32_t) INT32_MAX);
GNUNET_assert (shard_limit > 0);
GNUNET_assert (shard_size > 0);
for (unsigned int retries = 0; retries<3; retries++)
{
if (GNUNET_OK !=
postgres_start (pg,
"begin_revolving_shard"))
{
GNUNET_break (0);
return GNUNET_DB_STATUS_HARD_ERROR;
}
/* First, find last 'end_row' */
{
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_end
};
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint32 ("end_row",
start_row),
GNUNET_PQ_result_spec_end
};
qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
"get_last_revolving_shard",
params,
rs);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (pg);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (pg);
continue;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
break;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
*start_row = 0; /* base-case: no shards yet */
break; /* continued below */
}
} /* get_last_shard */
if (*start_row < shard_limit)
{
/* Claim fresh shard */
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_TIME_Absolute now;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_uint32 (start_row),
GNUNET_PQ_query_param_uint32 (end_row),
GNUNET_PQ_query_param_end
};
*end_row = GNUNET_MIN (shard_limit,
*start_row + shard_size);
now = GNUNET_TIME_absolute_get ();
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Trying to claim shard %llu-%llu\n",
(unsigned long long) *start_row,
(unsigned long long) *end_row);
qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
"create_revolving_shard",
params);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (pg);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (pg);
continue;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* continued below (with commit) */
break;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* someone else got this shard already,
try again */
postgres_rollback (pg);
continue;
}
} /* end create fresh reovlving shard */
else
{
/* claim oldest existing shard */
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_end
};
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint32 ("start_row",
start_row),
GNUNET_PQ_result_spec_uint32 ("end_row",
end_row),
GNUNET_PQ_result_spec_end
};
qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
"get_open_revolving_shard",
params,
rs);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (pg);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (pg);
continue;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* no open shards available */
postgres_rollback (pg);
return qs;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
{
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_absolute_time (&now),
GNUNET_PQ_query_param_uint32 (start_row),
GNUNET_PQ_query_param_uint32 (end_row),
GNUNET_PQ_query_param_end
};
qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
"reclaim_revolving_shard",
params);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (pg);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (pg);
continue;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
break; /* continue with commit */
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0); /* logic error, should be impossible */
postgres_rollback (pg);
return GNUNET_DB_STATUS_HARD_ERROR;
}
}
break; /* continue with commit */
}
} /* end claim oldest existing shard */
/* commit */
{
enum GNUNET_DB_QueryStatus qs;
qs = postgres_commit (pg);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
postgres_rollback (pg);
return qs;
case GNUNET_DB_STATUS_SOFT_ERROR:
postgres_rollback (pg);
continue;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
}
} /* retry 'for' loop */
return GNUNET_DB_STATUS_SOFT_ERROR;
}
/**
* Function called to release a revolving shard
* back into the work pool. Clears the
* "completed" flag.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param job_name name of the operation to grab a word shard for
* @param start_row inclusive start row of the shard
* @param end_row exclusive end row of the shard
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
postgres_release_revolving_shard (void *cls,
const char *job_name,
uint32_t start_row,
uint32_t end_row)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_string (job_name),
GNUNET_PQ_query_param_uint32 (&start_row),
GNUNET_PQ_query_param_uint32 (&end_row),
GNUNET_PQ_query_param_end
};
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Releasing revolving shard %s %u-%u\n",
job_name,
(unsigned int) start_row,
(unsigned int) end_row);
return GNUNET_PQ_eval_prepared_non_select (pg->conn,
"release_revolving_shard",
params);
}
/**
* Function called to delete all revolving shards.
* To be used after a crash or when the shard size is
* changed.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
postgres_delete_revolving_shards (void *cls)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_end
};
return GNUNET_PQ_eval_prepared_non_select (pg->conn,
"delete_revolving_shards",
params);
}
/**
* Initialize Postgres database subsystem.
*
@ -10592,6 +10953,12 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
= &postgres_begin_shard;
plugin->complete_shard
= &postgres_complete_shard;
plugin->begin_revolving_shard
= &postgres_begin_revolving_shard;
plugin->release_revolving_shard
= &postgres_release_revolving_shard;
plugin->delete_revolving_shards
= &postgres_delete_revolving_shards;
return plugin;
}

View File

@ -804,29 +804,22 @@ static uint64_t deposit_rowid;
* @param cls closure a `struct TALER_EXCHANGEDB_Deposit *`
* @param rowid unique ID for the deposit in our DB, used for marking
* it as 'tiny' or 'done'
* @param exchange_timestamp when did the deposit happen
* @param wallet_timestamp when did the wallet sign the contract
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
* @param amount_with_fee amount that was deposited including fee
* @param deposit_fee amount the exchange gets to keep as transaction fees
* @param h_contract_terms hash of the proposal data known to merchant and customer
* @param wire_deadline by which the merchant advised that he would like the
* wire transfer to be executed
* @param wire wire details for the merchant
* @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
*/
static enum GNUNET_DB_QueryStatus
deposit_cb (void *cls,
uint64_t rowid,
struct GNUNET_TIME_Absolute exchange_timestamp,
struct GNUNET_TIME_Absolute wallet_timestamp,
const struct TALER_MerchantPublicKeyP *merchant_pub,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *amount_with_fee,
const struct TALER_Amount *deposit_fee,
const struct GNUNET_HashCode *h_contract_terms,
struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire)
{
struct TALER_EXCHANGEDB_Deposit *deposit = cls;
@ -1896,9 +1889,11 @@ run (void *cls)
&matching_deposit_cb,
&deposit,
2));
sleep (2); /* giv deposit time to be ready */
sleep (2); /* give deposit time to be ready */
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->get_ready_deposit (plugin->cls,
0,
INT32_MAX,
&deposit_cb,
&deposit));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
@ -1911,11 +1906,15 @@ run (void *cls)
deposit_rowid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->get_ready_deposit (plugin->cls,
0,
INT32_MAX,
&deposit_cb,
&deposit));
plugin->rollback (plugin->cls);
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->get_ready_deposit (plugin->cls,
0,
INT32_MAX,
&deposit_cb,
&deposit));
FAILIF (GNUNET_OK !=

View File

@ -2601,12 +2601,16 @@ struct TALER_EXCHANGEDB_Plugin
* execution time and refund deadlines must both be in the past.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param start_shard_row minimum shard row to select
* @param end_shard_row maximum shard row to select (inclusive)
* @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
(*get_ready_deposit)(void *cls,
uint32_t start_shard_row,
uint32_t end_shard_row,
TALER_EXCHANGEDB_DepositIterator deposit_cb,
void *deposit_cb_cls);
@ -2978,14 +2982,14 @@ struct TALER_EXCHANGEDB_Plugin
/**
* Start a transaction where we transiently violate the foreign
* Starts a READ COMMITTED transaction where we transiently violate the foreign
* constraints on the "wire_out" table as we insert aggregations
* and only add the wire transfer out at the end.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @return #GNUNET_OK on success
*/
int
enum GNUNET_GenericReturnValue
(*start_deferred_wire_out)(void *cls);
@ -3746,6 +3750,57 @@ struct TALER_EXCHANGEDB_Plugin
uint64_t start_row,
uint64_t end_row);
/**
* Function called to grab a revolving work shard on an operation @a op. Runs
* in its own transaction. Returns the oldest inactive shard.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param job_name name of the operation to grab a revolving shard for
* @param shard_size desired shard size
* @param shard_limit exclusive end of the shard range
* @param[out] start_row inclusive start row of the shard (returned)
* @param[out] end_row exclusive end row of the shard (returned)
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
(*begin_revolving_shard)(void *cls,
const char *job_name,
uint32_t shard_size,
uint32_t shard_limit,
uint32_t *start_row,
uint32_t *end_row);
/**
* Function called to release a revolving shard back into the work pool.
* Clears the "completed" flag.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param job_name name of the operation to grab a word shard for
* @param start_row inclusive start row of the shard
* @param end_row exclusive end row of the shard
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
(*release_revolving_shard)(void *cls,
const char *job_name,
uint32_t start_row,
uint32_t end_row);
/**
* Function called to delete all revolving shards.
* To be used after a crash or when the shard size is
* changed.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
(*delete_revolving_shards)(void *cls);
};
#endif /* _TALER_EXCHANGE_DB_H */