make wirewatch batch size adaptive to transaction success/failure

This commit is contained in:
Christian Grothoff 2018-08-11 11:29:45 +02:00
parent 96c2fb8e10
commit a1f289512e
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 22 additions and 3 deletions

View File

@ -1,5 +1,5 @@
{ {
"url": "payto://x-taler-bank/localhost:8082/2", "url": "payto://x-taler-bank/localhost:8082/2",
"salt": "B1902XWPRPR4K8MTNPQVRG3ZB7FV8967VTKAGPQ3XTJJEM46BHB7XDSQHRXB3287BCYQACHS59WY7QJQ7A7GMM981AGG1E32EHGB72R", "salt": "RJG7PDRM3YFFQ9YXHE5034R2HF9X68PQKC2W0CQWV4BQ50RYQT339GWN33601S53C3GGG35DG9C6479H4PGMZ9SVJ7A22RK99S4BN78",
"master_sig": "NMNK7RQVXCH73MDQXQYRWP206VFNHAPV3CX8FPFZKFC3NGFD3ZVX6AK7F55TTADR2YNN3TB5TTZDS23MCBDC028DCXP44SFG4P3T22G" "master_sig": "H9GKSPFDQVCP8NBW0X4ZMWEFHZGZNTWZETWRTMYK3831HYTDKWR7R5KY2YJ7XW6HNZ27Q9NXW2DGJWJ016WBK01AKWA6MRNFVPQ6G2G"
} }

View File

@ -165,6 +165,16 @@ static int test_mode;
*/ */
static int reset_mode; static int reset_mode;
/**
* How many transactions do we retrieve per batch?
*/
static unsigned int batch_size = 1024;
/**
* How many transactions did we see in the current batch?
*/
static unsigned int current_batch_size;
/** /**
* Next task to run, if any. * Next task to run, if any.
*/ */
@ -407,6 +417,9 @@ history_cb (void *cls,
/* do we need to rollback explicitly on commit failure!? */ /* do we need to rollback explicitly on commit failure!? */
db_plugin->rollback (db_plugin->cls, db_plugin->rollback (db_plugin->cls,
session); session);
/* reduce transaction size to reduce rollback probability */
if (2 > current_batch_size)
current_batch_size /= 2;
/* try again */ /* try again */
GNUNET_assert (NULL == task); GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers, task = GNUNET_SCHEDULER_add_now (&find_transfers,
@ -421,6 +434,10 @@ history_cb (void *cls,
last_row_off_size = latest_row_off_size; last_row_off_size = latest_row_off_size;
latest_row_off = NULL; latest_row_off = NULL;
latest_row_off_size = 0; latest_row_off_size = 0;
/* if successful at limit, try increasing transaction batch size (AIMD) */
if (current_batch_size == batch_size)
batch_size++;
} }
GNUNET_break (0 <= qs); GNUNET_break (0 <= qs);
if ( (GNUNET_YES == delay) && if ( (GNUNET_YES == delay) &&
@ -489,6 +506,7 @@ history_cb (void *cls,
"Adding wire transfer over %s with subject `%s'\n", "Adding wire transfer over %s with subject `%s'\n",
TALER_amount2s (&details->amount), TALER_amount2s (&details->amount),
TALER_B2S (&details->wtid)); TALER_B2S (&details->wtid));
current_batch_size++;
/* Wire transfer identifier == reserve public key */ /* Wire transfer identifier == reserve public key */
GNUNET_assert (sizeof (reserve_pub) == sizeof (details->wtid)); GNUNET_assert (sizeof (reserve_pub) == sizeof (details->wtid));
memcpy (&reserve_pub, memcpy (&reserve_pub,
@ -603,12 +621,13 @@ find_transfers (void *cls)
( (NULL != last_row_off) && ( (NULL != last_row_off) &&
(0 != last_row_off_size) ) ); (0 != last_row_off_size) ) );
delay = GNUNET_YES; delay = GNUNET_YES;
current_batch_size = 0;
hh = wa_pos->wire_plugin->get_history (wa_pos->wire_plugin->cls, hh = wa_pos->wire_plugin->get_history (wa_pos->wire_plugin->cls,
wa_pos->section_name, wa_pos->section_name,
TALER_BANK_DIRECTION_CREDIT, TALER_BANK_DIRECTION_CREDIT,
last_row_off, last_row_off,
last_row_off_size, last_row_off_size,
1024, batch_size,
&history_cb, &history_cb,
session); session);
if (NULL == hh) if (NULL == hh)