fixing parallel fakebank to ensure transactions are ordered, fixing indices/constraint preservation after DB update to 0002

This commit is contained in:
Christian Grothoff 2021-06-21 00:17:16 +02:00
parent 108bf57d04
commit 9c51720cbf
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
10 changed files with 598 additions and 357 deletions

View File

@ -81,13 +81,6 @@ struct Account
*/
char *account_name;
/**
* Lock for modifying transaction list of this account.
* Note that per-transaction locks MUST be acquired before
* per-account locks (if both are acquired).
*/
pthread_mutex_t lock;
/**
* Current account balance.
*/
@ -157,11 +150,6 @@ struct Transaction
*/
uint64_t row_id;
/**
* Lock for accessing this transaction array entry.
*/
pthread_mutex_t lock;
/**
* What does the @e subject contain?
*/
@ -257,7 +245,7 @@ struct TALER_FAKEBANK_Handle
/**
* We store transactions in a revolving array.
*/
struct Transaction *transactions;
struct Transaction **transactions;
/**
* HTTP server we run to pretend to be the "test" bank.
@ -302,6 +290,12 @@ struct TALER_FAKEBANK_Handle
*/
pthread_mutex_t uuid_map_lock;
/**
* Lock for accessing the internals of
* accounts and transaction array entries.
*/
pthread_mutex_t big_lock;
/**
* Current transaction counter.
*/
@ -380,9 +374,6 @@ lookup_account (struct TALER_FAKEBANK_Handle *h,
if (NULL == account)
{
account = GNUNET_new (struct Account);
GNUNET_assert (0 ==
pthread_mutex_init (&account->lock,
NULL));
account->account_name = GNUNET_strdup (name);
GNUNET_assert (GNUNET_OK ==
TALER_amount_get_zero (h->currency,
@ -409,7 +400,7 @@ check_log (struct TALER_FAKEBANK_Handle *h)
{
for (uint64_t i = 0; i<h->ram_limit; i++)
{
struct Transaction *t = &h->transactions[i];
struct Transaction *t = h->transactions[i];
if (t->unchecked)
continue;
@ -541,62 +532,10 @@ TALER_FAKEBANK_check_credit (struct TALER_FAKEBANK_Handle *h,
}
/**
* Clean up space used by old transaction @a t.
* The transaction @a t must already be locked
* when calling this function!
*
* @param[in,out] h bank handle
* @param[in] t transaction to clean up
*/
static void
clean_transaction (struct TALER_FAKEBANK_Handle *h,
struct Transaction *t)
{
struct Account *da = t->debit_account;
struct Account *ca = t->credit_account;
if (NULL == da)
return; /* nothing to be cleaned */
/* slot was already in use, must clean out old
entry first! */
GNUNET_assert (0 ==
pthread_mutex_lock (&da->lock));
GNUNET_CONTAINER_MDLL_remove (out,
da->out_head,
da->out_tail,
t);
GNUNET_assert (0 ==
pthread_mutex_unlock (&da->lock));
GNUNET_assert (0 ==
pthread_mutex_lock (&ca->lock));
GNUNET_CONTAINER_MDLL_remove (in,
ca->in_head,
ca->in_tail,
t);
GNUNET_assert (0 ==
pthread_mutex_unlock (&ca->lock));
if (T_DEBIT == t->type)
{
GNUNET_assert (0 ==
pthread_mutex_lock (&h->uuid_map_lock));
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_remove (h->uuid_map,
&t->request_uid,
t));
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->uuid_map_lock));
}
t->debit_account = NULL;
t->credit_account = NULL;
}
/**
* Update @a account balance by @a amount.
*
* The @a account must already be locked when calling
* The @a big_lock must already be locked when calling
* this function.
*
* @param[in,out] account account to update
@ -642,16 +581,24 @@ update_balance (struct Account *account,
* The transaction @a t must already be locked
* when calling this function!
*
* @param[in] t transaction to clean up
* @param[in,out] h bank handle
* @param[in,out] t transaction to add to account lists
*/
static void
post_transaction (struct Transaction *t)
post_transaction (struct TALER_FAKEBANK_Handle *h,
struct Transaction *t)
{
struct Account *debit_acc = t->debit_account;
struct Account *credit_acc = t->credit_account;
uint64_t row_id;
struct Transaction *old;
GNUNET_assert (0 ==
pthread_mutex_lock (&debit_acc->lock));
pthread_mutex_lock (&h->big_lock));
row_id = ++h->serial_counter;
old = h->transactions[row_id % h->ram_limit];
h->transactions[row_id % h->ram_limit] = t;
t->row_id = row_id;
GNUNET_CONTAINER_MDLL_insert_tail (out,
debit_acc->out_head,
debit_acc->out_tail,
@ -659,10 +606,6 @@ post_transaction (struct Transaction *t)
update_balance (debit_acc,
&t->amount,
true);
GNUNET_assert (0 ==
pthread_mutex_unlock (&debit_acc->lock));
GNUNET_assert (0 ==
pthread_mutex_lock (&credit_acc->lock));
GNUNET_CONTAINER_MDLL_insert_tail (in,
credit_acc->in_head,
credit_acc->in_tail,
@ -670,8 +613,39 @@ post_transaction (struct Transaction *t)
update_balance (credit_acc,
&t->amount,
false);
if (NULL != old)
{
struct Account *da;
struct Account *ca;
da = old->debit_account;
ca = old->credit_account;
/* slot was already in use, must clean out old
entry first! */
GNUNET_CONTAINER_MDLL_remove (out,
da->out_head,
da->out_tail,
old);
GNUNET_CONTAINER_MDLL_remove (in,
ca->in_head,
ca->in_tail,
old);
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&credit_acc->lock));
pthread_mutex_unlock (&h->big_lock));
if ( (NULL != old) &&
(T_DEBIT == old->type) )
{
GNUNET_assert (0 ==
pthread_mutex_lock (&h->uuid_map_lock));
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multihashmap_remove (h->uuid_map,
&old->request_uid,
old));
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->uuid_map_lock));
}
GNUNET_free (old);
}
@ -725,12 +699,8 @@ make_transfer (
pthread_mutex_lock (&h->uuid_map_lock));
t = GNUNET_CONTAINER_multihashmap_get (h->uuid_map,
request_uid);
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->uuid_map_lock));
if (NULL != t)
{
GNUNET_assert (0 ==
pthread_mutex_lock (&t->lock));
if ( (debit_acc != t->debit_account) ||
(credit_acc != t->credit_account) ||
(0 != TALER_amount_cmp (amount,
@ -742,30 +712,18 @@ make_transfer (
/* Transaction exists, but with different details. */
GNUNET_break (0);
GNUNET_assert (0 ==
pthread_mutex_unlock (&t->lock));
pthread_mutex_unlock (&h->uuid_map_lock));
return GNUNET_SYSERR;
}
*ret_row_id = t->row_id;
GNUNET_assert (0 ==
pthread_mutex_unlock (&t->lock));
pthread_mutex_unlock (&h->uuid_map_lock));
return GNUNET_OK;
}
}
*ret_row_id = __sync_fetch_and_add (&h->serial_counter,
1);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Making transfer %llu from %s to %s over %s and subject %s; for exchange: %s\n",
(unsigned long long) *ret_row_id,
debit_account,
credit_account,
TALER_amount2s (amount),
TALER_B2S (subject),
exchange_base_url);
t = &h->transactions[*ret_row_id % h->ram_limit];
GNUNET_assert (0 ==
pthread_mutex_lock (&t->lock));
clean_transaction (h,
t);
pthread_mutex_unlock (&h->uuid_map_lock));
}
t = GNUNET_new (struct Transaction);
t->unchecked = true;
t->debit_account = debit_acc;
t->credit_account = credit_acc;
@ -783,7 +741,8 @@ make_transfer (
&t->request_uid);
else
t->request_uid = *request_uid;
post_transaction (t);
post_transaction (h,
t);
GNUNET_assert (0 ==
pthread_mutex_lock (&h->uuid_map_lock));
GNUNET_assert (GNUNET_OK ==
@ -794,8 +753,15 @@ make_transfer (
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->uuid_map_lock));
GNUNET_assert (0 ==
pthread_mutex_unlock (&t->lock));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Making transfer %llu from %s to %s over %s and subject %s; for exchange: %s\n",
(unsigned long long) t->row_id,
debit_account,
credit_account,
TALER_amount2s (amount),
TALER_B2S (subject),
exchange_base_url);
*ret_row_id = t->row_id;
return GNUNET_OK;
}
@ -826,7 +792,6 @@ make_admin_transfer (
const struct GNUNET_PeerIdentity *pid;
struct Account *debit_acc;
struct Account *credit_acc;
uint64_t ret;
GNUNET_static_assert (sizeof (*pid) ==
sizeof (*reserve_pub));
@ -859,34 +824,21 @@ make_admin_transfer (
return GNUNET_NO;
}
ret = __sync_fetch_and_add (&h->serial_counter,
1);
if (NULL != row_id)
*row_id = ret;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Making transfer from %s to %s over %s and subject %s at row %llu\n",
debit_account,
credit_account,
TALER_amount2s (amount),
TALER_B2S (reserve_pub),
(unsigned long long) ret);
t = &h->transactions[ret % h->ram_limit];
GNUNET_assert (0 ==
pthread_mutex_lock (&t->lock));
clean_transaction (h,
t);
t = GNUNET_new (struct Transaction);
t->unchecked = true;
t->debit_account = debit_acc;
t->credit_account = credit_acc;
t->amount = *amount;
t->row_id = ret;
t->date = GNUNET_TIME_absolute_get ();
(void) GNUNET_TIME_round_abs (&t->date);
if (NULL != timestamp)
*timestamp = t->date;
t->type = T_CREDIT;
t->subject.credit.reserve_pub = *reserve_pub;
post_transaction (t);
post_transaction (h,
t);
if (NULL != row_id)
*row_id = t->row_id;
GNUNET_assert (0 ==
pthread_mutex_lock (&h->rpubs_lock));
GNUNET_assert (GNUNET_OK ==
@ -897,8 +849,13 @@ make_admin_transfer (
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
GNUNET_assert (0 ==
pthread_mutex_unlock (&h->rpubs_lock));
GNUNET_assert (0 ==
pthread_mutex_unlock (&t->lock));
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Making transfer from %s to %s over %s and subject %s at row %llu\n",
debit_account,
credit_account,
TALER_amount2s (amount),
TALER_B2S (reserve_pub),
(unsigned long long) t->row_id);
return GNUNET_OK;
}
@ -908,7 +865,7 @@ TALER_FAKEBANK_check_empty (struct TALER_FAKEBANK_Handle *h)
{
for (uint64_t i = 0; i<h->ram_limit; i++)
{
struct Transaction *t = &h->transactions[i];
struct Transaction *t = h->transactions[i];
if (t->unchecked)
{
@ -929,8 +886,6 @@ free_account (void *cls,
{
struct Account *account = val;
GNUNET_assert (0 ==
pthread_mutex_destroy (&account->lock));
GNUNET_free (account->account_name);
GNUNET_free (account);
return GNUNET_OK;
@ -966,14 +921,16 @@ TALER_FAKEBANK_stop (struct TALER_FAKEBANK_Handle *h)
}
GNUNET_CONTAINER_multihashmap_destroy (h->uuid_map);
GNUNET_CONTAINER_multipeermap_destroy (h->rpubs);
for (uint64_t i = 0; i<h->ram_limit; i++)
pthread_mutex_destroy (&h->transactions[i].lock);
GNUNET_assert (0 ==
pthread_mutex_destroy (&h->big_lock));
GNUNET_assert (0 ==
pthread_mutex_destroy (&h->uuid_map_lock));
GNUNET_assert (0 ==
pthread_mutex_destroy (&h->accounts_lock));
GNUNET_assert (0 ==
pthread_mutex_destroy (&h->rpubs_lock));
for (uint64_t i = 0; i<h->ram_limit; i++)
GNUNET_free (h->transactions[i]);
GNUNET_free (h->transactions);
GNUNET_free (h->my_baseurl);
GNUNET_free (h->currency);
@ -1424,20 +1381,40 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
acc = lookup_account (h,
account);
GNUNET_asprintf (&debit_payto,
"payto://x-taler-bank/localhost/%s",
account);
history = json_array ();
if (NULL == history)
{
GNUNET_break (0);
GNUNET_free (debit_payto);
return MHD_NO;
}
GNUNET_assert (0 ==
pthread_mutex_lock (&h->big_lock));
if (! ha.have_start)
{
GNUNET_assert (0 ==
pthread_mutex_lock (&acc->lock));
pos = (0 > ha.delta)
? acc->out_tail
: acc->out_head;
}
else
{
struct Transaction *t = &h->transactions[ha.start_idx % h->ram_limit];
struct Transaction *t = h->transactions[ha.start_idx % h->ram_limit];
if (NULL == t)
{
GNUNET_assert (0 ==
pthread_mutex_lock (&t->lock));
pthread_mutex_unlock (&h->big_lock));
GNUNET_free (debit_payto);
/* FIXME: suspend for long-polling instead */
return TALER_MHD_reply_json_pack (connection,
MHD_HTTP_OK,
"{s:o}",
"outgoing_transactions",
history);
}
if (t->debit_account != acc)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@ -1445,23 +1422,17 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
(unsigned long long) ha.start_idx,
account);
GNUNET_assert (0 ==
pthread_mutex_unlock (&t->lock));
pthread_mutex_unlock (&h->big_lock));
GNUNET_free (debit_payto);
json_decref (history);
return MHD_NO;
}
GNUNET_assert (0 ==
pthread_mutex_lock (&acc->lock));
GNUNET_assert (0 ==
pthread_mutex_unlock (&t->lock));
/* range is exclusive, skip the matching entry */
if (0 > ha.delta)
pos = t->prev_out;
else
pos = t->next_out;
}
GNUNET_asprintf (&debit_payto,
"payto://x-taler-bank/localhost/%s",
account);
history = json_array ();
while ( (0 != ha.delta) &&
(NULL != pos) )
{
@ -1472,10 +1443,6 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
GNUNET_asprintf (&credit_payto,
"payto://x-taler-bank/localhost/%s",
pos->credit_account->account_name);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Appending credit_payto (%s) from credit_account (%s) within fakebank\n",
credit_payto,
pos->credit_account->account_name);
trans = json_pack (
"{s:I, s:o, s:o, s:s, s:s, s:s, s:o}",
"row_id", (json_int_t) pos->row_id,
@ -1487,6 +1454,7 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
pos->subject.debit.exchange_base_url,
"wtid", GNUNET_JSON_from_data_auto (
&pos->subject.debit.wtid));
GNUNET_assert (NULL != trans);
GNUNET_free (credit_payto);
GNUNET_assert (0 ==
json_array_append_new (history,
@ -1501,7 +1469,7 @@ handle_debit_history (struct TALER_FAKEBANK_Handle *h,
pos = pos->next_out;
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&acc->lock));
pthread_mutex_unlock (&h->big_lock));
GNUNET_free (debit_payto);
return TALER_MHD_reply_json_pack (connection,
MHD_HTTP_OK,
@ -1539,20 +1507,35 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
}
acc = lookup_account (h,
account);
history = json_array ();
GNUNET_assert (NULL != history);
GNUNET_asprintf (&credit_payto,
"payto://x-taler-bank/localhost/%s",
account);
GNUNET_assert (0 ==
pthread_mutex_lock (&h->big_lock));
if (! ha.have_start)
{
GNUNET_assert (0 ==
pthread_mutex_lock (&acc->lock));
pos = (0 > ha.delta)
? acc->in_tail
: acc->in_head;
}
else
{
struct Transaction *t = &h->transactions[ha.start_idx % h->ram_limit];
struct Transaction *t = h->transactions[ha.start_idx % h->ram_limit];
if (NULL == t)
{
GNUNET_assert (0 ==
pthread_mutex_lock (&t->lock));
pthread_mutex_unlock (&h->big_lock));
GNUNET_free (credit_payto);
/* FIXME: suspend for long-polling instead */
return TALER_MHD_reply_json_pack (connection,
MHD_HTTP_OK,
"{s:o}",
"incoming_transactions",
history);
}
if (t->credit_account != acc)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@ -1560,23 +1543,17 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
(unsigned long long) ha.start_idx,
account);
GNUNET_assert (0 ==
pthread_mutex_unlock (&t->lock));
pthread_mutex_unlock (&h->big_lock));
json_decref (history);
GNUNET_free (credit_payto);
return MHD_NO;
}
GNUNET_assert (0 ==
pthread_mutex_lock (&acc->lock));
GNUNET_assert (0 ==
pthread_mutex_unlock (&t->lock));
/* range is exclusive, skip the matching entry */
if (0 > ha.delta)
pos = t->prev_in;
else
pos = t->next_in;
}
GNUNET_asprintf (&credit_payto,
"payto://x-taler-bank/localhost/%s",
account);
history = json_array ();
while ( (0 != ha.delta) &&
(NULL != pos) )
{
@ -1587,12 +1564,6 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
GNUNET_asprintf (&debit_payto,
"payto://x-taler-bank/localhost/%s",
pos->debit_account->account_name);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Returning transaction %s->%s (%s) at %llu\n",
pos->debit_account->account_name,
pos->credit_account->account_name,
TALER_B2S (&pos->subject.credit.reserve_pub),
(unsigned long long) pos->row_id);
trans = json_pack (
"{s:I, s:o, s:o, s:s, s:s, s:o}",
"row_id", (json_int_t) pos->row_id,
@ -1602,6 +1573,7 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
"debit_account", debit_payto,
"reserve_pub", GNUNET_JSON_from_data_auto (
&pos->subject.credit.reserve_pub));
GNUNET_assert (NULL != trans);
GNUNET_free (debit_payto);
GNUNET_assert (0 ==
json_array_append_new (history,
@ -1616,7 +1588,7 @@ handle_credit_history (struct TALER_FAKEBANK_Handle *h,
pos = pos->next_in;
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&acc->lock));
pthread_mutex_unlock (&h->big_lock));
GNUNET_free (credit_payto);
return TALER_MHD_reply_json_pack (connection,
MHD_HTTP_OK,
@ -1909,7 +1881,7 @@ TALER_FAKEBANK_start2 (uint16_t port,
{
struct TALER_FAKEBANK_Handle *h;
if (SIZE_MAX / sizeof (struct Transaction) < ram_limit)
if (SIZE_MAX / sizeof (struct Transaction *) < ram_limit)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"This CPU architecture does not support keeping %llu transactions in RAM\n",
@ -1921,7 +1893,7 @@ TALER_FAKEBANK_start2 (uint16_t port,
h->port = port;
h->force_close = close_connections;
h->ram_limit = ram_limit;
h->serial_counter = 1;
h->serial_counter = 0;
GNUNET_assert (0 ==
pthread_mutex_init (&h->accounts_lock,
NULL));
@ -1931,8 +1903,11 @@ TALER_FAKEBANK_start2 (uint16_t port,
GNUNET_assert (0 ==
pthread_mutex_init (&h->uuid_map_lock,
NULL));
GNUNET_assert (0 ==
pthread_mutex_init (&h->big_lock,
NULL));
h->transactions
= GNUNET_malloc_large (sizeof (struct Transaction)
= GNUNET_malloc_large (sizeof (struct Transaction *)
* ram_limit);
if (NULL == h->transactions)
{
@ -1961,12 +1936,6 @@ TALER_FAKEBANK_start2 (uint16_t port,
TALER_FAKEBANK_stop (h);
return NULL;
}
for (uint64_t i = 0; i<ram_limit; i++)
{
GNUNET_assert (0 ==
pthread_mutex_init (&h->transactions[i].lock,
NULL));
}
h->currency = GNUNET_strdup (currency);
GNUNET_asprintf (&h->my_baseurl,
"http://localhost:%u/",

View File

@ -28,7 +28,7 @@ DB = postgres
# exchange (or the twister) is actually listening.
base_url = "http://localhost:8081/"
WIREWATCH_IDLE_SLEEP_INTERVAL = 5 ms
WIREWATCH_IDLE_SLEEP_INTERVAL = 1500 ms
[exchange-offline]
MASTER_PRIV_FILE = ${TALER_DATA_HOME}/exchange/offline-keys/master.priv

View File

@ -497,6 +497,20 @@ parallel_benchmark (void)
{
if (use_fakebank)
{
unsigned long long pnum;
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (cfg,
"bank",
"HTTP_PORT",
&pnum))
{
GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR,
"bank",
"HTTP_PORT",
"must be valid port number");
return GNUNET_SYSERR;
}
/* start fakebank */
fakebank = fork ();
if (0 == fakebank)
@ -515,7 +529,33 @@ parallel_benchmark (void)
return GNUNET_SYSERR;
}
/* wait for fakebank to be ready */
sleep (1 + history_size / 65536);
{
char *bank_url;
int ret;
GNUNET_asprintf (&bank_url,
"http://localhost:%u/",
(unsigned int) (uint16_t) pnum);
ret = TALER_TESTING_wait_httpd_ready (bank_url);
GNUNET_free (bank_url);
if (0 != ret)
{
int wstatus;
kill (fakebank,
SIGTERM);
if (fakebank !=
waitpid (fakebank,
&wstatus,
0))
{
GNUNET_log_strerror (GNUNET_ERROR_TYPE_ERROR,
"waitpid");
}
fakebank = -1;
exit (ret);
}
}
}
else
{
@ -587,7 +627,11 @@ parallel_benchmark (void)
(MODE_BANK == mode) )
{
printf ("Press ENTER to stop!\n");
if (MODE_BANK != mode)
duration = GNUNET_TIME_absolute_get_duration (start_time);
(void) getchar ();
if (MODE_BANK == mode)
duration = GNUNET_TIME_absolute_get_duration (start_time);
}
if ( (MODE_BANK == mode) ||
@ -817,7 +861,7 @@ main (int argc,
/* If we're the bank, we're done now. No need to print results. */
return (GNUNET_OK == result) ? 0 : result;
}
duration = GNUNET_TIME_absolute_get_duration (start_time);
if (GNUNET_OK == result)
{
struct rusage usage;

View File

@ -29,13 +29,12 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
#define DEBUG_LOGGING 0
/**
* What is the initial batch size we use for credit history
* What is the maximum batch size we use for credit history
* requests with the bank. See `batch_size` below.
*/
#define INITIAL_BATCH_SIZE 1024
#define MAXIMUM_BATCH_SIZE 1024
/**
* Information we keep for each supported account.
@ -81,35 +80,49 @@ struct WireAccount
* Encoded offset in the wire transfer list from where
* to start the next query with the bank.
*/
uint64_t last_row_off;
uint64_t batch_start;
/**
* Latest row offset seen in this transaction, becomes
* the new #last_row_off upon commit.
* the new #batch_start upon commit.
*/
uint64_t latest_row_off;
/**
* Offset where our current shard ends.
* Offset where our current shard begins (inclusive).
*/
uint64_t shard_start;
/**
* Offset where our current shard ends (exclusive).
*/
uint64_t shard_end;
/**
* When did we start with the shard?
*/
struct GNUNET_TIME_Absolute shard_start_time;
/**
* Name of our job in the shard table.
*/
char *job_name;
/**
* How many transactions do we retrieve per batch?
*/
unsigned int batch_size;
/**
* How much do we incremnt @e batch_size on success?
*/
unsigned int batch_increment;
/**
* How many transactions did we see in the current batch?
*/
unsigned int current_batch_size;
/**
* Are we running from scratch and should re-process all transactions
* for this account?
*/
bool reset_mode;
/**
* Should we delay the next request to the wire plugin a bit? Set to
* false if we actually did some work.
@ -157,13 +170,29 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
* How long should we sleep when idle before trying to find more work?
* Also used for how long we wait to grab a shard before trying it again.
* The value should be set to a bit above the average time it takes to
* process a shard.
*/
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
* Modulus to apply to group shards.
* How long did we take to finish the last shard?
*/
static unsigned int shard_size = 1024;
static struct GNUNET_TIME_Relative shard_delay;
/**
* Modulus to apply to group shards. The shard size must ultimately be a
* multiple of the batch size. Thus, if this is not a multiple of the
* #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
*/
static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
/**
* How many workers should we plan our scheduling with?
*/
static unsigned int max_workers = 16;
/**
* Value to return from main(). 0 on success, non-zero on
@ -186,11 +215,6 @@ static enum
*/
static int test_mode;
/**
* Are we running from scratch and should re-process all transactions?
*/
static int reset_mode;
/**
* Current task waiting for execution, if any.
*/
@ -221,6 +245,7 @@ shutdown_task (void *cls)
wa);
TALER_BANK_auth_free (&wa->auth);
GNUNET_free (wa->section_name);
GNUNET_free (wa->job_name);
GNUNET_free (wa);
}
}
@ -263,7 +288,6 @@ add_account_cb (void *cls,
if (GNUNET_YES != ai->credit_enabled)
return; /* not enabled for us, skip */
wa = GNUNET_new (struct WireAccount);
wa->reset_mode = reset_mode;
if (GNUNET_OK !=
TALER_BANK_auth_parse_cfg (cfg,
ai->section_name,
@ -276,7 +300,12 @@ add_account_cb (void *cls,
return;
}
wa->section_name = GNUNET_strdup (ai->section_name);
wa->batch_size = INITIAL_BATCH_SIZE;
GNUNET_asprintf (&wa->job_name,
"wirewatch-%s",
ai->section_name);
wa->batch_size = MAXIMUM_BATCH_SIZE;
if (0 != shard_size % wa->batch_size)
wa->batch_size = shard_size;
GNUNET_CONTAINER_DLL_insert (wa_head,
wa_tail,
wa);
@ -333,6 +362,127 @@ static void
find_transfers (void *cls);
/**
* We encountered a serialization error.
* Rollback the transaction and try again
*
* @param wa account we are transacting on
*/
static void
handle_soft_error (struct WireAccount *wa)
{
db_plugin->rollback (db_plugin->cls,
wa->session);
if (1 < wa->batch_size)
{
wa->batch_size /= 2;
wa->batch_increment = 0;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Reduced batch size to %llu due to serialization issue\n",
(unsigned long long) wa->batch_size);
}
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
}
/**
* We are finished with the current transaction, try
* to commit and then schedule the next iteration.
*
* @param wa wire account to commit for
*/
static void
do_commit (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
if (wa->shard_end <= wa->latest_row_off)
{
/* shard is complete, mark this as well */
qs = db_plugin->complete_shard (db_plugin->cls,
wa->session,
wa->job_name,
wa->shard_start,
wa->shard_end);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls,
wa->session);
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for complete_shard. Rolling back.\n");
handle_soft_error (wa);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* already existed, ok, let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
break;
}
}
qs = db_plugin->commit (db_plugin->cls,
wa->session);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* reduce transaction size to reduce rollback probability */
handle_soft_error (wa);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
break;
}
/* transaction success, update #last_row_off */
wa->batch_start = wa->latest_row_off;
wa->session = NULL; /* should not be needed */
if (wa->batch_size < MAXIMUM_BATCH_SIZE)
{
wa->batch_increment++;
wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
wa->batch_size + wa->batch_increment);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
}
if ( (wa->delay) &&
(test_mode) &&
(NULL == wa->next) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown due to test mode!\n");
GNUNET_SCHEDULER_shutdown ();
return;
}
if (wa->delay)
{
wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa_pos = wa_pos->next;
if (NULL == wa_pos)
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
}
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
&find_transfers,
NULL);
}
/**
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
@ -370,89 +520,38 @@ history_cb (void *cls,
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"End of list. Committing progress!\n");
qs = db_plugin->commit (db_plugin->cls,
do_commit (wa);
return GNUNET_OK; /* will be ignored anyway */
}
if (serial_id < wa->latest_row_off)
{
/* we are done with the current shard, commit and stop this iteration! */
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Serial ID %llu not monotonic (got %llu before). Failing!\n",
(unsigned long long) serial_id,
(unsigned long long) wa->latest_row_off);
db_plugin->rollback (db_plugin->cls,
session);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK;
wa->hh = NULL;
return GNUNET_SYSERR;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
if (serial_id > wa->shard_end)
{
/* reduce transaction size to reduce rollback probability */
if (2 > wa->batch_size)
{
wa->batch_size /= 2;
/* we are done with the current shard, commit and stop this iteration! */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Reduced batch size to %llu due to serialization issue\n",
(unsigned long long) wa->batch_size);
}
/* try again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
return GNUNET_OK; /* will be ignored anyway */
}
GNUNET_break (0 <= qs);
/* transaction success, update #last_row_off */
wa->last_row_off = wa->latest_row_off;
wa->latest_row_off = 0; /* should not be needed */
wa->session = NULL; /* should not be needed */
if (wa->batch_size < INITIAL_BATCH_SIZE)
{
wa->batch_size += 1;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
}
if ( (wa->delay) &&
(test_mode) &&
(NULL == wa->next) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown due to test mode!\n");
GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK;
}
if (wa->delay)
{
wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa_pos = wa_pos->next;
if (NULL == wa_pos)
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
}
task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
&find_transfers,
NULL);
return GNUNET_OK; /* will be ignored anyway */
"Serial ID %llu past shard end at %llu, ending iteration early!\n",
(unsigned long long) serial_id,
(unsigned long long) wa->shard_end);
wa->latest_row_off = serial_id - 1;
do_commit (wa);
wa->hh = NULL;
return GNUNET_SYSERR;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding wire transfer over %s with (hashed) subject `%s'\n",
TALER_amount2s (&details->amount),
TALER_B2S (&details->reserve_pub));
/**
* Debug block.
*/
#if DEBUG_LOGGING
{
/** Should be 53, give 80 just to be extra conservative (and aligned). */
#define PUBSIZE 80
char wtid_s[PUBSIZE];
GNUNET_break (NULL !=
GNUNET_STRINGS_data_to_string (&details->reserve_pub,
sizeof (details->reserve_pub),
&wtid_s[0],
PUBSIZE));
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Plain text subject (= reserve_pub): %s\n",
wtid_s);
}
#endif
/* FIXME-PERFORMANCE: Consider using Postgres multi-valued insert here,
for up to 15x speed-up according to
https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006
@ -466,26 +565,27 @@ history_cb (void *cls,
details->debit_account_url,
wa->section_name,
serial_id);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls,
session);
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for reserves_in_insert. Rolling back.\n");
db_plugin->rollback (db_plugin->cls,
session);
handle_soft_error (wa);
wa->hh = NULL;
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
return GNUNET_SYSERR;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* already existed, ok, let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
break;
}
wa->delay = false;
wa->latest_row_off = serial_id;
@ -515,8 +615,59 @@ find_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
db_plugin->preflight (db_plugin->cls,
session);
wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
wa_pos->session = session;
if (wa_pos->shard_end <= wa_pos->batch_start)
{
uint64_t start;
uint64_t end;
struct GNUNET_TIME_Relative delay;
/* advance to next shard */
delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
GNUNET_CRYPTO_QUALITY_WEAK,
4 * GNUNET_TIME_relative_max (
wirewatch_idle_sleep_interval,
GNUNET_TIME_relative_multiply (shard_delay,
max_workers)).rel_value_us);
qs = db_plugin->begin_shard (db_plugin->cls,
wa_pos->job_name,
delay,
shard_size,
&start,
&end);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain starting point for montoring from database!\n");
global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
&find_transfers,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0);
task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
&find_transfers,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();
wa_pos->shard_start = start;
wa_pos->shard_end = end;
wa_pos->batch_start = start;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting with shard at %llu\n",
(unsigned long long) start);
break;
}
}
if (GNUNET_OK !=
db_plugin->start (db_plugin->cls,
session,
@ -528,51 +679,13 @@ find_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
wa_pos->session = session;
if (wa_pos->shard_end == wa_pos->last_row_off)
{
/* advance to next shard */
// FIXME: if other processes are running in parallel,
// update 'last_row_off' to next free shard!
wa_pos->shard_end = wa_pos->last_row_off + shard_size;
}
if (! wa_pos->reset_mode)
{
// FIXME: need good way to fetch
// shard data here!
qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
session,
wa_pos->section_name,
&wa_pos->last_row_off);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain starting point for montoring from database!\n");
db_plugin->rollback (db_plugin->cls,
session);
global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
/* try again */
db_plugin->rollback (db_plugin->cls,
session);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
NULL);
return;
}
}
wa_pos->reset_mode = true;
limit = GNUNET_MIN (wa_pos->batch_size,
wa_pos->shard_end - wa_pos->last_row_off);
wa_pos->shard_end - wa_pos->batch_start);
GNUNET_assert (NULL == wa_pos->hh);
wa_pos->latest_row_off = wa_pos->batch_start;
wa_pos->hh = TALER_BANK_credit_history (ctx,
&wa_pos->auth,
wa_pos->last_row_off,
wa_pos->batch_start,
limit,
&history_cb,
wa_pos);
@ -644,10 +757,6 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_option_flag ('r',
"reset",
"start fresh with all transactions in the history",
&reset_mode),
GNUNET_GETOPT_option_uint ('S',
"size",
"SIZE",
@ -659,6 +768,11 @@ main (int argc,
"test",
"run in test mode and exit when idle",
&test_mode),
GNUNET_GETOPT_option_uint ('w',
"workers",
"COUNT",
"Plan work load with up to COUNT worker processes (default: 16)",
&max_workers),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;

View File

@ -27,6 +27,7 @@ DROP TABLE IF EXISTS auditor_denom_sigs CASCADE;
DROP TABLE IF EXISTS exchange_sign_keys CASCADE;
DROP TABLE IF EXISTS wire_accounts CASCADE;
DROP TABLE IF EXISTS signkey_revocations CASCADE;
DROP TABLE IF EXISTS work_shards CASCADE;
-- And we're out of here...
COMMIT;

View File

@ -389,6 +389,9 @@ COMMENT ON TABLE recoup
COMMENT ON COLUMN recoup.coin_pub
IS 'Do not CASCADE ON DROP on the coin_pub, as we may keep the coin alive!';
-- Note: this first index is redundant;
-- It is implicitly removed by the exchange-0002.sql
-- schema changes.
CREATE INDEX IF NOT EXISTS recoup_by_coin_index
ON recoup
(coin_pub);
@ -415,6 +418,8 @@ CREATE TABLE IF NOT EXISTS recoup_refresh
COMMENT ON COLUMN recoup_refresh.coin_pub
IS 'Do not CASCADE ON DROP on the coin_pub, as we may keep the coin alive!';
-- Note: this index is redundant; implicitly removed
-- by the exchange-0002.sql update!
CREATE INDEX IF NOT EXISTS recoup_refresh_by_coin_index
ON recoup_refresh
(coin_pub);

View File

@ -80,7 +80,9 @@ UPDATE reserves_in
ALTER TABLE reserves_in
ALTER COLUMN reserve_uuid SET NOT NULL;
ALTER TABLE reserves_in
DROP COLUMN reserve_pub;
DROP COLUMN reserve_pub,
ADD CONSTRAINT unique_in PRIMARY KEY (reserve_uuid, wire_reference);
ALTER TABLE reserves_out
ADD COLUMN reserve_uuid INT8 REFERENCES reserves (reserve_uuid) ON DELETE CASCADE;
UPDATE reserves_out
@ -93,6 +95,12 @@ ALTER TABLE reserves_out
DROP COLUMN reserve_pub;
ALTER TABLE reserves_close
ADD COLUMN reserve_uuid INT8 REFERENCES reserves (reserve_uuid) ON DELETE CASCADE;
CREATE INDEX IF NOT EXISTS reserves_out_reserve_uuid_index
ON reserves_out
(reserve_uuid);
COMMENT ON INDEX reserves_out_reserve_uuid_index
IS 'for get_reserves_out';
UPDATE reserves_close
SET reserve_uuid=r.reserve_uuid
FROM reserves_close rclose
@ -101,6 +109,11 @@ ALTER TABLE reserves_close
ALTER COLUMN reserve_uuid SET NOT NULL;
ALTER TABLE reserves_close
DROP COLUMN reserve_pub;
CREATE INDEX IF NOT EXISTS reserves_close_by_uuid
ON reserves_close
(reserve_uuid);
-- change all foreign keys using 'denom_pub_hash' to using 'denominations_serial' instead
ALTER TABLE reserves_out
@ -113,6 +126,11 @@ ALTER TABLE reserves_out
ALTER COLUMN denominations_serial SET NOT NULL;
ALTER TABLE reserves_out
DROP COLUMN denom_pub_hash;
CREATE INDEX IF NOT EXISTS reserves_out_for_get_withdraw_info
ON reserves_out
(denominations_serial
,h_blind_ev
);
ALTER TABLE known_coins
ADD COLUMN denominations_serial INT8 REFERENCES denominations (denominations_serial) ON DELETE CASCADE;
@ -124,6 +142,9 @@ ALTER TABLE known_coins
ALTER COLUMN denominations_serial SET NOT NULL;
ALTER TABLE known_coins
DROP COLUMN denom_pub_hash;
CREATE INDEX IF NOT EXISTS known_coins_by_denomination
ON known_coins
(denominations_serial);
ALTER TABLE denomination_revocations
ADD COLUMN denominations_serial INT8 REFERENCES denominations (denominations_serial) ON DELETE CASCADE;
@ -137,6 +158,9 @@ ALTER TABLE denomination_revocations
DROP COLUMN denom_pub_hash;
ALTER TABLE denomination_revocations
ADD CONSTRAINT denominations_serial_pk PRIMARY KEY (denominations_serial);
CREATE INDEX IF NOT EXISTS denomination_revocations_by_denomination
ON denomination_revocations
(denominations_serial);
ALTER TABLE refresh_revealed_coins
ADD COLUMN denominations_serial INT8 REFERENCES denominations (denominations_serial) ON DELETE CASCADE;
@ -148,6 +172,9 @@ ALTER TABLE refresh_revealed_coins
ALTER COLUMN denominations_serial SET NOT NULL;
ALTER TABLE refresh_revealed_coins
DROP COLUMN denom_pub_hash;
CREATE INDEX IF NOT EXISTS refresh_revealed_coins_denominations_index
ON refresh_revealed_coins
(denominations_serial);
-- Change all foreign keys involving 'coin_pub' to use known_coin_id instead.
ALTER TABLE recoup_refresh
@ -161,6 +188,7 @@ ALTER TABLE recoup_refresh
ALTER TABLE recoup_refresh
DROP COLUMN coin_pub;
ALTER TABLE recoup
ADD COLUMN known_coin_id INT8 REFERENCES known_coins (known_coin_id) ON DELETE CASCADE;
UPDATE recoup
@ -172,6 +200,7 @@ ALTER TABLE recoup
ALTER TABLE recoup
DROP COLUMN coin_pub;
ALTER TABLE refresh_commitments
ADD COLUMN old_known_coin_id INT8 REFERENCES known_coins (known_coin_id) ON DELETE CASCADE;
UPDATE refresh_commitments
@ -182,6 +211,10 @@ ALTER TABLE refresh_commitments
ALTER COLUMN old_known_coin_id SET NOT NULL;
ALTER TABLE refresh_commitments
DROP COLUMN old_coin_pub;
CREATE INDEX IF NOT EXISTS refresh_commitments_old_coin_pub_index
ON refresh_commitments
(old_known_coin_id);
ALTER TABLE deposits
ADD COLUMN known_coin_id INT8 REFERENCES known_coins (known_coin_id) ON DELETE CASCADE;
@ -190,7 +223,8 @@ UPDATE deposits
FROM deposits o
INNER JOIN known_coins d USING(coin_pub);
ALTER TABLE deposits
ALTER COLUMN known_coin_id SET NOT NULL;
ALTER COLUMN known_coin_id SET NOT NULL,
ADD CONSTRAINT deposit_unique UNIQUE (known_coin_id, merchant_pub, h_contract_terms);
ALTER TABLE deposits
DROP COLUMN coin_pub;
@ -216,6 +250,16 @@ ALTER TABLE recoup
ALTER COLUMN reserve_out_serial_id SET NOT NULL;
ALTER TABLE recoup
DROP COLUMN h_blind_ev;
CREATE INDEX IF NOT EXISTS recoup_by_h_blind_ev
ON recoup
(reserve_out_serial_id);
CREATE INDEX IF NOT EXISTS recoup_for_by_reserve
ON recoup
(known_coin_id
,reserve_out_serial_id
);
COMMENT ON COLUMN recoup.reserve_out_serial_id
IS 'Identifies the h_blind_ev of the recouped coin.';
@ -228,11 +272,20 @@ UPDATE recoup_refresh
FROM recoup_refresh o
INNER JOIN refresh_revealed_coins d ON (d.h_coin_ev = o.h_blind_ev);
ALTER TABLE recoup_refresh
ALTER COLUMN rrc_serial SET NOT NULL;
ALTER COLUMN rrc_serial SET NOT NULL,
ADD CONSTRAINT recoup_unique UNIQUE (rrc_serial);
ALTER TABLE recoup_refresh
DROP COLUMN h_blind_ev;
COMMENT ON COLUMN recoup_refresh.rrc_serial
IS 'Identifies the h_blind_ev of the recouped coin (as h_coin_ev).';
CREATE INDEX IF NOT EXISTS recoup_refresh_by_h_blind_ev
ON recoup_refresh
(rrc_serial);
CREATE INDEX IF NOT EXISTS recoup_refresh_for_by_reserve
ON recoup_refresh
(known_coin_id
,rrc_serial
);
-- Change 'rc' in refresh_transfer_keys and refresh_revealed_coins tables to 'melt_serial_id'
@ -248,6 +301,14 @@ ALTER TABLE refresh_transfer_keys
DROP COLUMN rc;
COMMENT ON COLUMN refresh_transfer_keys.melt_serial_id
IS 'Identifies the refresh commitment (rc) of the operation.';
CREATE INDEX IF NOT EXISTS refresh_transfer_keys_coin_tpub
ON refresh_transfer_keys
(melt_serial_id
,transfer_pub
);
COMMENT ON INDEX refresh_transfer_keys_coin_tpub
IS 'for get_link (unsure if this helps or hurts for performance as there should be very few transfer public keys per rc, but at least in theory this helps the ORDER BY clause)';
ALTER TABLE refresh_revealed_coins
ADD COLUMN melt_serial_id INT8 REFERENCES refresh_commitments (melt_serial_id) ON DELETE CASCADE;
@ -280,6 +341,8 @@ ALTER TABLE refunds
DROP COLUMN merchant_pub,
DROP COLUMN h_contract_terms,
DROP COLUMN known_coin_id;
ALTER TABLE refunds
ADD CONSTRAINT refunds_primary_key PRIMARY KEY (deposit_serial_id, rtransaction_id);
COMMENT ON COLUMN refunds.deposit_serial_id
IS 'Identifies ONLY the merchant_pub, h_contract_terms and known_coin_id. Multiple deposits may match a refund, this only identifies one of them.';
@ -380,7 +443,7 @@ CREATE TABLE IF NOT EXISTS work_shards
,last_attempt INT8 NOT NULL
,start_row INT8 NOT NULL
,end_row INT8 NOT NULL
,completed BOOLEAN NOT NULL
,completed BOOLEAN NOT NULL DEFAULT FALSE
,job_name VARCHAR NOT NULL
,PRIMARY KEY (job_name, start_row)
);

View File

@ -424,7 +424,8 @@ postgres_get_session (void *cls)
",gc_date"
" FROM reserves"
" WHERE reserve_pub=$1"
" LIMIT 1;",
" LIMIT 1"
";", // FOR UPDATE;", // FIXME: helpful?
1),
/* Used in #postgres_reserves_in_insert() when the reserve is new */
GNUNET_PQ_make_prepare ("reserve_create",
@ -2463,7 +2464,6 @@ postgres_get_session (void *cls)
" end_row"
" FROM work_shards"
" WHERE job_name=$1"
" AND completed=FALSE"
" ORDER BY end_row DESC"
" LIMIT 1;",
1),
@ -3529,7 +3529,8 @@ postgres_reserves_in_insert (void *cls,
balance; we do this after checking for duplication, as
otherwise we might have to actually pay the cost to roll this
back for duplicate transactions; like this, we should virtually
never actually have to rollback anything. */struct TALER_EXCHANGEDB_Reserve updated_reserve;
never actually have to rollback anything. */
struct TALER_EXCHANGEDB_Reserve updated_reserve;
updated_reserve.pub = reserve.pub;
if (0 >
@ -10356,6 +10357,10 @@ postgres_begin_shard (void *cls,
};
now = GNUNET_TIME_absolute_get ();
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Trying to claim shard %llu-%llu\n",
(unsigned long long) *start_row,
(unsigned long long) *end_row);
qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
"claim_next_shard",
params);
@ -10374,7 +10379,8 @@ postgres_begin_shard (void *cls,
/* continued below */
break;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0);
/* someone else got this shard already,
try again */
postgres_rollback (cls,
session);
continue;
@ -10434,6 +10440,10 @@ postgres_complete_shard (void *cls,
};
(void) cls;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Completing shard %llu-%llu\n",
(unsigned long long) start_row,
(unsigned long long) end_row);
return GNUNET_PQ_eval_prepared_non_select (session->conn,
"complete_shard",
params);

View File

@ -172,6 +172,18 @@ int
TALER_TESTING_wait_exchange_ready (const char *base_url);
/**
* Wait for an HTTPD service to have started. Waits for at
* most 10s, after that returns 77 to indicate an error.
*
* @param base_url what URL should we expect the exchange
* to be running at
* @return 0 on success
*/
int
TALER_TESTING_wait_httpd_ready (const char *base_url);
/**
* Wait for the auditor to have started. Waits for at
* most 10s, after that returns 77 to indicate an error.

View File

@ -446,14 +446,6 @@ TALER_TESTING_find_pk (const struct TALER_EXCHANGE_Keys *keys,
}
/**
* Wait for the exchange to have started. Waits for at
* most 10s, after that returns 77 to indicate an error.
*
* @param base_url what URL should we expect the exchange
* to be running at
* @return 0 on success
*/
int
TALER_TESTING_wait_exchange_ready (const char *base_url)
{
@ -464,20 +456,51 @@ TALER_TESTING_wait_exchange_ready (const char *base_url)
"wget -q -t 1 -T 1 %sseed -o /dev/null -O /dev/null",
base_url); // make sure ends with '/'
/* give child time to start and bind against the socket */
fprintf (stderr,
"Waiting for `taler-exchange-httpd' to be ready (check with: %s)\n",
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Waiting for `taler-exchange-httpd` service to be ready (check with: %s)\n",
wget_cmd);
iter = 0;
do
{
if (10 == iter)
{
fprintf (stderr,
"Failed to launch `taler-exchange-httpd' (or `wget')\n");
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to launch `taler-exchange-httpd` service (or `wget')\n");
GNUNET_free (wget_cmd);
return 77;
}
sleep (1);
iter++;
}
while (0 != system (wget_cmd));
GNUNET_free (wget_cmd);
return 0;
}
int
TALER_TESTING_wait_httpd_ready (const char *base_url)
{
char *wget_cmd;
unsigned int iter;
GNUNET_asprintf (&wget_cmd,
"wget -q -t 1 -T 1 %s -o /dev/null -O /dev/null",
base_url); // make sure ends with '/'
/* give child time to start and bind against the socket */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Waiting for HTTP service to be ready (check with: %s)\n",
wget_cmd);
iter = 0;
do
{
if (10 == iter)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to launch HTTP service (or `wget')\n");
GNUNET_free (wget_cmd);
return 77;
}
fprintf (stderr, ".\n");
sleep (1);
iter++;
}