implement taler-exchange-transfer DB sharding logic

This commit is contained in:
Christian Grothoff 2021-09-05 15:25:46 +02:00
parent adc6c53b5c
commit ae8d481e1c
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
10 changed files with 469 additions and 125 deletions

View File

@ -24,7 +24,7 @@ DB = postgres
# exchange (or the twister) is actually listening. # exchange (or the twister) is actually listening.
BASE_URL = "http://localhost:8081/" BASE_URL = "http://localhost:8081/"
AGGREGATOR_SHARD_SIZE = 268435456 AGGREGATOR_SHARD_SIZE = 67108864
#AGGREGATOR_SHARD_SIZE = 2147483648 #AGGREGATOR_SHARD_SIZE = 2147483648

View File

@ -41,10 +41,17 @@ PORT = 8081
BASE_URL = http://localhost:8081/ BASE_URL = http://localhost:8081/
# How long should the aggregator (and closer, and transfer) # How long should the aggregator sleep if it has nothing to do?
# sleep if it has nothing to do?
AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
# How long should the transfer tool
# sleep if it has nothing to do?
TRANSFER_IDLE_SLEEP_INTERVAL = 60 s
# How long should the closer tool
# sleep if it has nothing to do?
CLOSER_IDLE_SLEEP_INTERVAL = 60 s
# Values of 0 or above 2^31 disable sharding, which # Values of 0 or above 2^31 disable sharding, which
# is a sane default for most use-cases. # is a sane default for most use-cases.
# When changing this value, you MUST stop all # When changing this value, you MUST stop all

View File

@ -1034,9 +1034,22 @@ run_shard (void *cls)
&s->shard_end); &s->shard_end);
if (0 >= qs) if (0 >= qs)
{ {
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
static struct GNUNET_TIME_Relative delay;
GNUNET_free (s);
delay = GNUNET_TIME_randomized_backoff (delay,
GNUNET_TIME_UNIT_SECONDS);
task = GNUNET_SCHEDULER_add_delayed (delay,
&run_shard,
NULL);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to begin shard!\n"); "Failed to begin shard (%d)!\n",
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); qs);
GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs);
global_ret = EXIT_FAILURE; global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;

View File

@ -60,7 +60,7 @@ static struct GNUNET_SCHEDULER_Task *task;
/** /**
* How long should we sleep when idle before trying to find more work? * How long should we sleep when idle before trying to find more work?
*/ */
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; static struct GNUNET_TIME_Relative closer_idle_sleep_interval;
/** /**
* Value to return from main(). 0 on success, non-zero * Value to return from main(). 0 on success, non-zero
@ -112,8 +112,8 @@ shutdown_task (void *cls)
* *
* @return #GNUNET_OK on success * @return #GNUNET_OK on success
*/ */
static int static enum GNUNET_GenericReturnValue
parse_wirewatch_config (void) parse_closer_config (void)
{ {
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg, GNUNET_CONFIGURATION_get_value_string (cfg,
@ -129,12 +129,12 @@ parse_wirewatch_config (void)
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg, GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange", "exchange",
"AGGREGATOR_IDLE_SLEEP_INTERVAL", "CLOSER_IDLE_SLEEP_INTERVAL",
&aggregator_idle_sleep_interval)) &closer_idle_sleep_interval))
{ {
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange", "exchange",
"AGGREGATOR_IDLE_SLEEP_INTERVAL"); "CLOSER_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if ( (GNUNET_OK != if ( (GNUNET_OK !=
@ -444,7 +444,7 @@ run_reserve_closures (void *cls)
} }
else else
{ {
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, task = GNUNET_SCHEDULER_add_delayed (closer_idle_sleep_interval,
&run_reserve_closures, &run_reserve_closures,
NULL); NULL);
} }
@ -480,7 +480,7 @@ run (void *cls,
(void) cfgfile; (void) cfgfile;
cfg = c; cfg = c;
if (GNUNET_OK != parse_wirewatch_config ()) if (GNUNET_OK != parse_closer_config ())
{ {
cfg = NULL; cfg = NULL;
global_ret = EXIT_NOTCONFIGURED; global_ret = EXIT_NOTCONFIGURED;

View File

@ -1,6 +1,6 @@
/* /*
This file is part of TALER This file is part of TALER
Copyright (C) 2016-2020 Taler Systems SA Copyright (C) 2016-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software terms of the GNU Affero General Public License as published by the Free Software
@ -27,6 +27,46 @@
#include "taler_json_lib.h" #include "taler_json_lib.h"
#include "taler_bank_service.h" #include "taler_bank_service.h"
/**
* What is the maximum batch size we use for credit history
* requests with the bank. See `batch_size` below.
*/
#define MAXIMUM_BATCH_SIZE 1024
/**
* Information about our work shard.
*/
struct Shard
{
/**
* Time when we started to work on this shard.
*/
struct GNUNET_TIME_Absolute shard_start_time;
/**
* Offset the shard begins at.
*/
uint64_t shard_start;
/**
* Exclusive offset where the shard ends.
*/
uint64_t shard_end;
/**
* Offset where our current batch begins.
*/
uint64_t batch_start;
/**
* Highest row processed in the current batch.
*/
uint64_t batch_end;
};
/** /**
* Data we keep to #run_transfers(). There is at most * Data we keep to #run_transfers(). There is at most
@ -37,6 +77,18 @@
struct WirePrepareData struct WirePrepareData
{ {
/**
* All transfers done in the same transaction
* are kept in a DLL.
*/
struct WirePrepareData *next;
/**
* All transfers done in the same transaction
* are kept in a DLL.
*/
struct WirePrepareData *prev;
/** /**
* Wire execution handle. * Wire execution handle.
*/ */
@ -71,10 +123,21 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct GNUNET_SCHEDULER_Task *task; static struct GNUNET_SCHEDULER_Task *task;
/** /**
* If we are currently executing a transfer, information about * If we are currently executing transfers, information about
* the active transfer is here. Otherwise, this variable is NULL. * the active transfers is here. Otherwise, this variable is NULL.
*/ */
static struct WirePrepareData *wpd; static struct WirePrepareData *wpd_head;
/**
* If we are currently executing transfers, information about
* the active transfers is here. Otherwise, this variable is NULL.
*/
static struct WirePrepareData *wpd_tail;
/**
* Information about our work shard.
*/
static struct Shard *shard;
/** /**
* Handle to the context for interacting with the bank / wire gateway. * Handle to the context for interacting with the bank / wire gateway.
@ -86,11 +149,6 @@ static struct GNUNET_CURL_Context *ctx;
*/ */
static struct GNUNET_CURL_RescheduleContext *rc; static struct GNUNET_CURL_RescheduleContext *rc;
/**
* How long should we sleep when idle before trying to find more work?
*/
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
/** /**
* Value to return from main(). 0 on success, non-zero on errors. * Value to return from main(). 0 on success, non-zero on errors.
*/ */
@ -101,6 +159,54 @@ static int global_ret;
*/ */
static int test_mode; static int test_mode;
/**
* How long should we sleep when idle before trying to find more work?
* Also used for how long we wait to grab a shard before trying it again.
* The value should be set to a bit above the average time it takes to
* process a shard.
*/
static struct GNUNET_TIME_Relative transfer_idle_sleep_interval;
/**
* How long did we take to finish the last shard?
*/
static struct GNUNET_TIME_Relative shard_delay;
/**
* Modulus to apply to group shards. The shard size must ultimately be a
* multiple of the batch size. Thus, if this is not a multiple of the
* #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
*/
static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
/**
* How many workers should we plan our scheduling with?
*/
static unsigned int max_workers = 16;
/**
* Clean up all active bank interactions.
*/
static void
cleanup_wpd (void)
{
struct WirePrepareData *wpd;
while (NULL != (wpd = wpd_head))
{
GNUNET_CONTAINER_DLL_remove (wpd_head,
wpd_tail,
wpd);
if (NULL != wpd->eh)
{
TALER_BANK_transfer_cancel (wpd->eh);
wpd->eh = NULL;
}
GNUNET_free (wpd);
}
}
/** /**
* We're being aborted with CTRL-C (or SIGTERM). Shut down. * We're being aborted with CTRL-C (or SIGTERM). Shut down.
@ -128,17 +234,9 @@ shutdown_task (void *cls)
GNUNET_SCHEDULER_cancel (task); GNUNET_SCHEDULER_cancel (task);
task = NULL; task = NULL;
} }
if (NULL != wpd) cleanup_wpd ();
{ GNUNET_free (shard);
if (NULL != wpd->eh) db_plugin->rollback (db_plugin->cls); /* just in case */
{
TALER_BANK_transfer_cancel (wpd->eh);
wpd->eh = NULL;
}
db_plugin->rollback (db_plugin->cls);
GNUNET_free (wpd);
wpd = NULL;
}
TALER_EXCHANGEDB_plugin_unload (db_plugin); TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL; db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts (); TALER_EXCHANGEDB_unload_accounts ();
@ -151,18 +249,18 @@ shutdown_task (void *cls)
* *
* @return #GNUNET_OK on success * @return #GNUNET_OK on success
*/ */
static int static enum GNUNET_GenericReturnValue
parse_wirewatch_config (void) parse_transfer_config (void)
{ {
if (GNUNET_OK != if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg, GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange", "exchange",
"AGGREGATOR_IDLE_SLEEP_INTERVAL", "TRANSFER_IDLE_SLEEP_INTERVAL",
&aggregator_idle_sleep_interval)) &transfer_idle_sleep_interval))
{ {
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange", "exchange",
"AGGREGATOR_IDLE_SLEEP_INTERVAL"); "TRANSFER_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR; return GNUNET_SYSERR;
} }
if (NULL == if (NULL ==
@ -218,13 +316,22 @@ static void
run_transfers (void *cls); run_transfers (void *cls);
/**
* Select shard to process.
*
* @param cls NULL
*/
static void
select_shard (void *cls);
/** /**
* Function called with the result from the execute step. * Function called with the result from the execute step.
* On success, we mark the respective wire transfer as finished, * On success, we mark the respective wire transfer as finished,
* and in general we afterwards continue to #run_transfers(), * and in general we afterwards continue to #run_transfers(),
* except for irrecoverable errors. * except for irrecoverable errors.
* *
* @param cls NULL * @param cls `struct WirePrepareData` we are working on
* @param http_status_code #MHD_HTTP_OK on success * @param http_status_code #MHD_HTTP_OK on success
* @param ec taler error code * @param ec taler error code
* @param row_id unique ID of the wire transfer in the bank's records * @param row_id unique ID of the wire transfer in the bank's records
@ -237,15 +344,18 @@ wire_confirm_cb (void *cls,
uint64_t row_id, uint64_t row_id,
struct GNUNET_TIME_Absolute wire_timestamp) struct GNUNET_TIME_Absolute wire_timestamp)
{ {
struct WirePrepareData *wpd = cls;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
(void) cls;
(void) row_id; (void) row_id;
(void) wire_timestamp; (void) wire_timestamp;
wpd->eh = NULL; wpd->eh = NULL;
switch (http_status_code) switch (http_status_code)
{ {
case MHD_HTTP_OK: case MHD_HTTP_OK:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Wire transfer %llu completed successfully\n",
(unsigned long long) wpd->row_id);
qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls, qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
wpd->row_id); wpd->row_id);
/* continued below */ /* continued below */
@ -262,38 +372,43 @@ wire_confirm_cb (void *cls,
break; break;
default: default:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Wire transaction failed: %u/%d\n", "Wire transfer %llu failed: %u/%d\n",
(unsigned long long) wpd->row_id,
http_status_code, http_status_code,
ec); ec);
db_plugin->rollback (db_plugin->cls); db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE; global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
GNUNET_free (wpd);
wpd = NULL;
return; return;
} }
if (0 >= qs) shard->batch_end = GNUNET_MAX (wpd->row_id,
shard->batch_end);
switch (qs)
{ {
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); case GNUNET_DB_STATUS_SOFT_ERROR:
db_plugin->rollback (db_plugin->cls); db_plugin->rollback (db_plugin->cls);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs) cleanup_wpd ();
{ GNUNET_assert (NULL == task);
/* try again */ task = GNUNET_SCHEDULER_add_now (&run_transfers,
GNUNET_assert (NULL == task); NULL);
task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL);
}
else
{
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
}
GNUNET_free (wpd);
wpd = NULL;
return; return;
case GNUNET_DB_STATUS_HARD_ERROR:
db_plugin->rollback (db_plugin->cls);
cleanup_wpd ();
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
GNUNET_CONTAINER_DLL_remove (wpd_head,
wpd_tail,
wpd);
GNUNET_free (wpd);
break;
} }
GNUNET_free (wpd); if (NULL != wpd_head)
wpd = NULL; return; /* wait for other queries to complete */
/* batch done */
switch (commit_or_warn ()) switch (commit_or_warn ())
{ {
case GNUNET_DB_STATUS_SOFT_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR:
@ -308,8 +423,9 @@ wire_confirm_cb (void *cls,
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
shard->batch_start = shard->batch_end + 1;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Wire transfer complete\n"); "Batch complete\n");
/* continue with #run_transfers(), just to guard /* continue with #run_transfers(), just to guard
against the unlikely case that there are more. */ against the unlikely case that there are more. */
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
@ -343,6 +459,7 @@ wire_prepare_cb (void *cls,
size_t buf_size) size_t buf_size)
{ {
const struct TALER_EXCHANGEDB_AccountInfo *wa; const struct TALER_EXCHANGEDB_AccountInfo *wa;
struct WirePrepareData *wpd;
(void) cls; (void) cls;
if ( (NULL == wire_method) || if ( (NULL == wire_method) ||
@ -351,9 +468,14 @@ wire_prepare_cb (void *cls,
GNUNET_break (0); GNUNET_break (0);
db_plugin->rollback (db_plugin->cls); db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE; global_ret = EXIT_FAILURE;
goto cleanup; GNUNET_SCHEDULER_shutdown ();
return;
} }
wpd = GNUNET_new (struct WirePrepareData);
wpd->row_id = rowid; wpd->row_id = rowid;
GNUNET_CONTAINER_DLL_insert (wpd_head,
wpd_tail,
wpd);
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting wire transfer %llu\n", "Starting wire transfer %llu\n",
(unsigned long long) rowid); (unsigned long long) rowid);
@ -365,7 +487,8 @@ wire_prepare_cb (void *cls,
GNUNET_break (0); GNUNET_break (0);
db_plugin->rollback (db_plugin->cls); db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_NOTCONFIGURED; global_ret = EXIT_NOTCONFIGURED;
goto cleanup; GNUNET_SCHEDULER_shutdown ();
return;
} }
wa = wpd->wa; wa = wpd->wa;
wpd->eh = TALER_BANK_transfer (ctx, wpd->eh = TALER_BANK_transfer (ctx,
@ -373,19 +496,15 @@ wire_prepare_cb (void *cls,
buf, buf,
buf_size, buf_size,
&wire_confirm_cb, &wire_confirm_cb,
NULL); wpd);
if (NULL == wpd->eh) if (NULL == wpd->eh)
{ {
GNUNET_break (0); /* Irrecoverable */ GNUNET_break (0); /* Irrecoverable */
db_plugin->rollback (db_plugin->cls); db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE; global_ret = EXIT_FAILURE;
goto cleanup; GNUNET_SCHEDULER_shutdown ();
return;
} }
return;
cleanup:
GNUNET_SCHEDULER_shutdown ();
GNUNET_free (wpd);
wpd = NULL;
} }
@ -399,23 +518,55 @@ static void
run_transfers (void *cls) run_transfers (void *cls)
{ {
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
int64_t limit;
(void) cls; (void) cls;
task = NULL; task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO, limit = shard->shard_end - shard->batch_start;
"Checking for pending wire transfers\n"); if (0 >= limit)
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Failed to obtain database connection!\n"); "Shard [%llu,%llu) completed\n",
global_ret = EXIT_FAILURE; (unsigned long long) shard->shard_start,
GNUNET_SCHEDULER_shutdown (); (unsigned long long) shard->batch_end);
qs = db_plugin->complete_shard (db_plugin->cls,
"transfer",
shard->shard_start,
shard->batch_end + 1);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
GNUNET_free (shard);
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for complete_shard. Rolling back.\n");
GNUNET_free (shard);
task = GNUNET_SCHEDULER_add_now (&select_shard,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* already existed, ok, let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
break;
}
shard_delay = GNUNET_TIME_absolute_get_duration (shard->shard_start_time);
GNUNET_free (shard);
task = GNUNET_SCHEDULER_add_now (&select_shard,
NULL);
return; return;
} }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for %lld pending wire transfers [%llu-...)\n",
(long long) limit,
(unsigned long long) shard->batch_start);
if (GNUNET_OK != if (GNUNET_OK !=
db_plugin->start (db_plugin->cls, db_plugin->start_read_committed (db_plugin->cls,
"aggregator run transfer")) "aggregator run transfer"))
{ {
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n"); "Failed to start database transaction!\n");
@ -423,30 +574,29 @@ run_transfers (void *cls)
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
} }
wpd = GNUNET_new (struct WirePrepareData);
qs = db_plugin->wire_prepare_data_get (db_plugin->cls, qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
shard->batch_start,
limit,
&wire_prepare_cb, &wire_prepare_cb,
NULL); NULL);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
return; /* continued via continuation set in #wire_prepare_cb() */
db_plugin->rollback (db_plugin->cls);
GNUNET_free (wpd);
wpd = NULL;
switch (qs) switch (qs)
{ {
case GNUNET_DB_STATUS_HARD_ERROR: case GNUNET_DB_STATUS_HARD_ERROR:
db_plugin->rollback (db_plugin->cls);
GNUNET_break (0); GNUNET_break (0);
global_ret = EXIT_FAILURE; global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown (); GNUNET_SCHEDULER_shutdown ();
return; return;
case GNUNET_DB_STATUS_SOFT_ERROR: case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */ /* try again */
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_transfers, task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL); NULL);
return; return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* no more prepared wire transfers, go sleep a bit! */ /* no more prepared wire transfers, go sleep a bit! */
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
if (GNUNET_YES == test_mode) if (GNUNET_YES == test_mode)
{ {
@ -458,18 +608,95 @@ run_transfers (void *cls)
{ {
GNUNET_log (GNUNET_ERROR_TYPE_INFO, GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more pending wire transfers, going idle\n"); "No more pending wire transfers, going idle\n");
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
&run_transfers, &run_transfers,
NULL); NULL);
} }
return; return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: default:
/* should be impossible */ /* continued in wire_prepare_cb() */
GNUNET_assert (0); return;
} }
} }
/**
* Select shard to process.
*
* @param cls NULL
*/
static void
select_shard (void *cls)
{
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_TIME_Relative delay;
uint64_t start;
uint64_t end;
(void) cls;
task = NULL;
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database connection!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
if (0 == max_workers)
delay = GNUNET_TIME_UNIT_ZERO;
else
delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
GNUNET_CRYPTO_QUALITY_WEAK,
4 * GNUNET_TIME_relative_max (
transfer_idle_sleep_interval,
GNUNET_TIME_relative_multiply (shard_delay,
max_workers)).rel_value_us);
qs = db_plugin->begin_shard (db_plugin->cls,
"transfer",
delay,
shard_size,
&start,
&end);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain starting point for montoring from database!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
&select_shard,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0);
task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
&select_shard,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* continued below */
break;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting with shard [%llu,%llu)\n",
(unsigned long long) start,
(unsigned long long) end);
shard = GNUNET_new (struct Shard);
shard->shard_start_time = GNUNET_TIME_absolute_get ();
shard->shard_start = start;
shard->shard_end = end;
shard->batch_start = start;
task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL);
}
/** /**
* First task. * First task.
* *
@ -489,7 +716,7 @@ run (void *cls,
(void) cfgfile; (void) cfgfile;
cfg = c; cfg = c;
if (GNUNET_OK != parse_wirewatch_config ()) if (GNUNET_OK != parse_transfer_config ())
{ {
cfg = NULL; cfg = NULL;
global_ret = EXIT_NOTCONFIGURED; global_ret = EXIT_NOTCONFIGURED;
@ -503,9 +730,17 @@ run (void *cls,
GNUNET_break (0); GNUNET_break (0);
return; return;
} }
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database connection!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_transfers, task = GNUNET_SCHEDULER_add_now (&select_shard,
NULL); NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task, GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls); cls);
@ -524,12 +759,22 @@ main (int argc,
char *const *argv) char *const *argv)
{ {
struct GNUNET_GETOPT_CommandLineOption options[] = { struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_option_uint ('S',
"size",
"SIZE",
"Size to process per shard (default: 1024)",
&shard_size),
GNUNET_GETOPT_option_timetravel ('T', GNUNET_GETOPT_option_timetravel ('T',
"timetravel"), "timetravel"),
GNUNET_GETOPT_option_flag ('t', GNUNET_GETOPT_option_flag ('t',
"test", "test",
"run in test mode and exit when idle", "run in test mode and exit when idle",
&test_mode), &test_mode),
GNUNET_GETOPT_option_uint ('w',
"workers",
"COUNT",
"Plan work load with up to COUNT worker processes (default: 16)",
&max_workers),
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END GNUNET_GETOPT_OPTION_END
}; };

View File

@ -752,6 +752,7 @@ main (int argc,
"COUNT", "COUNT",
"Plan work load with up to COUNT worker processes (default: 16)", "Plan work load with up to COUNT worker processes (default: 16)",
&max_workers), &max_workers),
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END GNUNET_GETOPT_OPTION_END
}; };
enum GNUNET_GenericReturnValue ret; enum GNUNET_GenericReturnValue ret;

View File

@ -1188,11 +1188,12 @@ prepare_statements (struct PostgresClosure *pg)
",type" ",type"
",buf" ",buf"
" FROM prewire" " FROM prewire"
" WHERE finished=FALSE" " WHERE prewire_uuid >= $1"
" AND finished=FALSE"
" AND failed=FALSE" " AND failed=FALSE"
" ORDER BY prewire_uuid ASC" " ORDER BY prewire_uuid ASC"
" LIMIT 1;", " LIMIT $2;",
0), 2),
/* Used in #postgres_select_deposits_missing_wire */ /* Used in #postgres_select_deposits_missing_wire */
GNUNET_PQ_make_prepare ("deposits_get_overdue", GNUNET_PQ_make_prepare ("deposits_get_overdue",
"SELECT" "SELECT"
@ -6984,52 +6985,116 @@ postgres_wire_prepare_data_mark_failed (
} }
/**
* Closure for #prewire_cb().
*/
struct PrewireContext
{
/**
* Function to call on each result.
*/
TALER_EXCHANGEDB_WirePreparationIterator cb;
/**
* Closure for @a cb.
*/
void *cb_cls;
/**
* #GNUNET_OK if everything went fine.
*/
enum GNUNET_GenericReturnValue status;
};
/**
* Invoke the callback for each result.
*
* @param cls a `struct MissingWireContext *`
* @param result SQL result
* @param num_results number of rows in @a result
*/
static void
prewire_cb (void *cls,
PGresult *result,
unsigned int num_results)
{
struct PrewireContext *pc = cls;
for (unsigned int i = 0; i < num_results; i++)
{
uint64_t prewire_uuid;
char *type;
void *buf = NULL;
size_t buf_size;
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
&prewire_uuid),
GNUNET_PQ_result_spec_string ("type",
&type),
GNUNET_PQ_result_spec_variable_size ("buf",
&buf,
&buf_size),
GNUNET_PQ_result_spec_end
};
if (GNUNET_OK !=
GNUNET_PQ_extract_result (result,
rs,
i))
{
GNUNET_break (0);
pc->status = GNUNET_SYSERR;
return;
}
pc->cb (pc->cb_cls,
prewire_uuid,
type,
buf,
buf_size);
GNUNET_PQ_cleanup_result (rs);
}
}
/** /**
* Function called to get an unfinished wire transfer * Function called to get an unfinished wire transfer
* preparation data. Fetches at most one item. * preparation data. Fetches at most one item.
* *
* @param cls closure * @param cls closure
* @param start_row offset to query table at
* @param limit maximum number of results to return
* @param cb function to call for ONE unfinished item * @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return transaction status code * @return transaction status code
*/ */
static enum GNUNET_DB_QueryStatus static enum GNUNET_DB_QueryStatus
postgres_wire_prepare_data_get (void *cls, postgres_wire_prepare_data_get (void *cls,
uint64_t start_row,
uint64_t limit,
TALER_EXCHANGEDB_WirePreparationIterator cb, TALER_EXCHANGEDB_WirePreparationIterator cb,
void *cb_cls) void *cb_cls)
{ {
struct PostgresClosure *pg = cls; struct PostgresClosure *pg = cls;
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_uint64 (&start_row),
GNUNET_PQ_query_param_uint64 (&limit),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
uint64_t prewire_uuid; struct PrewireContext pc = {
char *type; .cb = cb,
void *buf = NULL; .cb_cls = cb_cls,
size_t buf_size; .status = GNUNET_OK
struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
&prewire_uuid),
GNUNET_PQ_result_spec_string ("type",
&type),
GNUNET_PQ_result_spec_variable_size ("buf",
&buf,
&buf_size),
GNUNET_PQ_result_spec_end
}; };
enum GNUNET_DB_QueryStatus qs;
qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
"wire_prepare_data_get", "wire_prepare_data_get",
params, params,
rs); &prewire_cb,
if (0 >= qs) &pc);
return qs; if (GNUNET_OK != pc.status)
cb (cb_cls, return GNUNET_DB_STATUS_HARD_ERROR;
prewire_uuid,
type,
buf,
buf_size);
GNUNET_PQ_cleanup_result (rs);
return qs; return qs;
} }

View File

@ -117,6 +117,8 @@ test_wire_prepare (void)
{ {
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->wire_prepare_data_get (plugin->cls, plugin->wire_prepare_data_get (plugin->cls,
0,
1,
&dead_prepare_cb, &dead_prepare_cb,
NULL)); NULL));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
@ -126,10 +128,14 @@ test_wire_prepare (void)
11)); 11));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->wire_prepare_data_get (plugin->cls, plugin->wire_prepare_data_get (plugin->cls,
0,
1,
&mark_prepare_cb, &mark_prepare_cb,
NULL)); NULL));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->wire_prepare_data_get (plugin->cls, plugin->wire_prepare_data_get (plugin->cls,
0,
1,
&dead_prepare_cb, &dead_prepare_cb,
NULL)); NULL));
return GNUNET_OK; return GNUNET_OK;

View File

@ -2964,15 +2964,19 @@ struct TALER_EXCHANGEDB_Plugin
/** /**
* Function called to get an unfinished wire transfer * Function called to get an unfinished wire transfer
* preparation data. Fetches at most one item. * preparation data.
* *
* @param cls closure * @param cls closure
* @param cb function to call for ONE unfinished item * @param start_row offset to query table at
* @param limit maximum number of results to return
* @param cb function to call for unfinished work
* @param cb_cls closure for @a cb * @param cb_cls closure for @a cb
* @return transaction status code * @return transaction status code
*/ */
enum GNUNET_DB_QueryStatus enum GNUNET_DB_QueryStatus
(*wire_prepare_data_get)(void *cls, (*wire_prepare_data_get)(void *cls,
uint64_t start_row,
uint64_t limit,
TALER_EXCHANGEDB_WirePreparationIterator cb, TALER_EXCHANGEDB_WirePreparationIterator cb,
void *cb_cls); void *cb_cls);

View File

@ -66,6 +66,9 @@ transfer_run (void *cls,
"taler-exchange-transfer", "taler-exchange-transfer",
"taler-exchange-transfer", "taler-exchange-transfer",
"-c", as->config_filename, "-c", as->config_filename,
"-L", "INFO",
"-S", "1",
"-w", "0",
"-t", /* exit when done */ "-t", /* exit when done */
NULL); NULL);
if (NULL == as->transfer_proc) if (NULL == as->transfer_proc)