batch modifications

This commit is contained in:
Joseph 2022-11-23 10:41:57 -05:00
parent 87198f124c
commit b6476ac881
No known key found for this signature in database
GPG Key ID: E709789D3076B5CC
3 changed files with 175 additions and 37 deletions

@ -1 +1 @@
Subproject commit 20f8eb7a72e2160409f0f78264ec5198e9caa193 Subproject commit 212ee0a78adc43cb5c04d6ea96ccc2fe74fed62b

View File

@ -16,7 +16,7 @@
/** /**
* @file exchangedb/pg_batch_reserves_in_insert.c * @file exchangedb/pg_batch_reserves_in_insert.c
* @brief Implementation of the reserves_in_insert function for Postgres * @brief Implementation of the reserves_in_insert function for Postgres
* @author JOSEPHxu * @author Joseph XU
*/ */
#include "platform.h" #include "platform.h"
#include "taler_error_codes.h" #include "taler_error_codes.h"
@ -35,15 +35,12 @@
/** /**
* Generate event notification for the reserve * Generate event notification for the reserve change.
* change.
* *
* @param pg plugin state
* @param reserve_pub reserve to notfiy on * @param reserve_pub reserve to notfiy on
*/ */
static void static char *
notify_on_reserve (struct PostgresClosure *pg, compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
const struct TALER_ReservePublicKeyP *reserve_pub)
{ {
struct TALER_ReserveEventP rep = { struct TALER_ReserveEventP rep = {
.header.size = htons (sizeof (rep)), .header.size = htons (sizeof (rep)),
@ -51,12 +48,7 @@ notify_on_reserve (struct PostgresClosure *pg,
.reserve_pub = *reserve_pub .reserve_pub = *reserve_pub
}; };
GNUNET_log (GNUNET_ERROR_TYPE_INFO, return GNUNET_PG_get_event_notify_channel (&rep.header);
"Notifying on reserve!\n");
TEH_PG_event_notify (pg,
&rep.header,
NULL,
0);
} }
@ -75,8 +67,11 @@ TEH_PG_batch_reserves_in_insert (void *cls,
uint64_t reserve_uuid; uint64_t reserve_uuid;
bool conflicted; bool conflicted;
bool transaction_duplicate; bool transaction_duplicate;
bool need_update = false;
struct GNUNET_TIME_Timestamp reserve_expiration struct GNUNET_TIME_Timestamp reserve_expiration
= GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
bool conflicts[reserves_length];
char *notify_s[reserves_length];
PREPARE (pg, PREPARE (pg,
"reserve_create", "reserve_create",
@ -84,8 +79,8 @@ TEH_PG_batch_reserves_in_insert (void *cls,
"out_reserve_found AS conflicted" "out_reserve_found AS conflicted"
",transaction_duplicate" ",transaction_duplicate"
",ruuid AS reserve_uuid" ",ruuid AS reserve_uuid"
" FROM exchange_do_batch_reserves_in" " FROM batch_reserves_insert"
" ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);"); " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12);");
expiry = GNUNET_TIME_absolute_to_timestamp ( expiry = GNUNET_TIME_absolute_to_timestamp (
GNUNET_TIME_absolute_add (reserves->execution_time.abs_time, GNUNET_TIME_absolute_add (reserves->execution_time.abs_time,
pg->idle_reserve_expiration_time)); pg->idle_reserve_expiration_time));
@ -98,34 +93,51 @@ TEH_PG_batch_reserves_in_insert (void *cls,
GNUNET_STRINGS_relative_time_to_string ( GNUNET_STRINGS_relative_time_to_string (
pg->idle_reserve_expiration_time, pg->idle_reserve_expiration_time,
GNUNET_NO)); GNUNET_NO));
{
if (GNUNET_OK !=
TEH_PG_start_read_committed(pg,
"READ_COMMITED"))
{
GNUNET_break (0);
return GNUNET_DB_STATUS_HARD_ERROR;
}
}
/* Optimistically assume this is a new reserve, create balance for the first /* Optimistically assume this is a new reserve, create balance for the first
time; we do this before adding the actual transaction to "reserves_in", time; we do this before adding the actual transaction to "reserves_in",
as for a new reserve it can't be a duplicate 'add' operation, and as as for a new reserve it can't be a duplicate 'add' operation, and as
the 'add' operation needs the reserve entry as a foreign key. */ the 'add' operation needs the reserve entry as a foreign key. */
for (unsigned int i = 0; i<reserves_length; i++) for (unsigned int i = 0; i<reserves_length; i++)
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
notify_s[i] = compute_notify_on_reserve (&reserve->reserve_pub);
}
for (unsigned int i=0;i<reserves_length;i++)
{ {
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i]; const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
struct GNUNET_PQ_QueryParam params[] = { struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub), /*$1*/ GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
GNUNET_PQ_query_param_timestamp (&expiry), /*$4*/ GNUNET_PQ_query_param_timestamp (&expiry),
GNUNET_PQ_query_param_timestamp (&gc), /*$5*/ GNUNET_PQ_query_param_timestamp (&gc),
GNUNET_PQ_query_param_uint64 (&reserve->wire_reference), /*6*/ GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
TALER_PQ_query_param_amount (&reserve->balance), /*7+8*/ TALER_PQ_query_param_amount (&reserve->balance),
GNUNET_PQ_query_param_string (reserve->exchange_account_name), /*9*/ GNUNET_PQ_query_param_string (reserve->exchange_account_name),
GNUNET_PQ_query_param_timestamp (&reserve->execution_time), /*10*/ GNUNET_PQ_query_param_timestamp (&reserve->execution_time),
GNUNET_PQ_query_param_auto_from_type (&h_payto), /*11*/ GNUNET_PQ_query_param_auto_from_type (&h_payto),
GNUNET_PQ_query_param_string (reserve->sender_account_details),/*12*/ GNUNET_PQ_query_param_string (reserve->sender_account_details),
GNUNET_PQ_query_param_timestamp (&reserve_expiration),/*13*/ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
GNUNET_PQ_query_param_string (notify_s[i]),
GNUNET_PQ_query_param_end GNUNET_PQ_query_param_end
}; };
/* We should get all our results into results[]*/
struct GNUNET_PQ_ResultSpec rs[] = { struct GNUNET_PQ_ResultSpec rs[] = {
GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
&reserve_uuid),
GNUNET_PQ_result_spec_bool ("conflicted", GNUNET_PQ_result_spec_bool ("conflicted",
&conflicted), &conflicted),
GNUNET_PQ_result_spec_bool ("transaction_duplicate", GNUNET_PQ_result_spec_bool ("transaction_duplicate",
&transaction_duplicate), &transaction_duplicate),
GNUNET_PQ_result_spec_uint64 ("reserve_uuid",
&reserve_uuid),
GNUNET_PQ_result_spec_end GNUNET_PQ_result_spec_end
}; };
@ -137,15 +149,92 @@ TEH_PG_batch_reserves_in_insert (void *cls,
params, params,
rs); rs);
if (qs1 < 0) if (qs1 < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to create reserves (%d)\n",
qs1);
return qs1; return qs1;
notify_on_reserve (pg, }
&reserve->reserve_pub); GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1);
GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1); results[i] = (transaction_duplicate)
results[i] = (transaction_duplicate)
? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
if ( (! conflicted) && transaction_duplicate) conflicts[i] = conflicted;
TEH_PG_rollback (pg); if (!conflicts[i] && transaction_duplicate)
{
GNUNET_break (0);
TEH_PG_rollback (pg);
return GNUNET_DB_STATUS_HARD_ERROR;
}
need_update |= conflicted;
} }
// commit
{
enum GNUNET_DB_QueryStatus cs;
cs = TEH_PG_commit (pg);
if (cs < 0)
return cs;
}
if (!need_update)
goto exit;
// begin serializable
{
if (GNUNET_OK !=
TEH_PG_start(pg,
"reserve-insert-continued"))
{
GNUNET_break (0);
return GNUNET_DB_STATUS_HARD_ERROR;
}
}
enum GNUNET_DB_QueryStatus qs2;
PREPARE (pg,
"reserves_in_add_transaction",
"SELECT batch_reserves_update"
" ($1,$2,$3,$4,$5,$6,$7,$8,$9);");
for (unsigned int i=0;i<reserves_length;i++)
{
if (! conflicts[i])
continue;
{
const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub),
GNUNET_PQ_query_param_timestamp (&expiry),
GNUNET_PQ_query_param_uint64 (&reserve->wire_reference),
TALER_PQ_query_param_amount (&reserve->balance),
GNUNET_PQ_query_param_string (reserve->exchange_account_name),
GNUNET_PQ_query_param_bool (conflicted),
GNUNET_PQ_query_param_auto_from_type (&h_payto),
GNUNET_PQ_query_param_string (notify_s[i]),
GNUNET_PQ_query_param_end
};
qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn,
"reserves_in_add_transaction",
params);
if (qs2<0)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to update reserves (%d)\n",
qs2);
return qs2;
}
}
}
{
enum GNUNET_DB_QueryStatus cs;
cs = TEH_PG_commit (pg);
if (cs < 0)
return cs;
}
exit:
for (unsigned int i=0;i<reserves_length;i++)
GNUNET_free (notify_s[i]);
return reserves_length; return reserves_length;
} }

View File

@ -91,9 +91,22 @@ run (void *cls)
result = 77; result = 77;
goto cleanup; goto cleanup;
} }
<<<<<<< HEAD
for (unsigned int i = 0; i< 7; i++) for (unsigned int i = 0; i< 7; i++)
=======
if (GNUNET_OK !=
plugin->setup_partitions (plugin->cls,
num_partitions))
{ {
static unsigned int batches[] = {1, 1, 2, 4, 16, 64, 256}; GNUNET_break (0);
result = 77;
goto cleanup;
}
for (unsigned int i = 0; i< 8; i++)
>>>>>>> 26922c6d (batch modifications)
{
static unsigned int batches[] = {1, 1,0, 2, 4, 16, 64, 256};
const char *sndr = "payto://x-taler-bank/localhost:8080/1"; const char *sndr = "payto://x-taler-bank/localhost:8080/1";
struct TALER_Amount value; struct TALER_Amount value;
unsigned int batch_size = batches[i]; unsigned int batch_size = batches[i];
@ -101,6 +114,7 @@ run (void *cls)
struct GNUNET_TIME_Timestamp ts; struct GNUNET_TIME_Timestamp ts;
struct GNUNET_TIME_Relative duration; struct GNUNET_TIME_Relative duration;
struct TALER_EXCHANGEDB_ReserveInInfo reserves[batch_size]; struct TALER_EXCHANGEDB_ReserveInInfo reserves[batch_size];
/* struct TALER_EXCHANGEDB_ReserveInInfo reserves2[batch_size];*/
enum GNUNET_DB_QueryStatus results[batch_size]; enum GNUNET_DB_QueryStatus results[batch_size];
GNUNET_assert (GNUNET_OK == GNUNET_assert (GNUNET_OK ==
TALER_string_to_amount (CURRENCY ":1.000010", TALER_string_to_amount (CURRENCY ":1.000010",
@ -109,9 +123,14 @@ run (void *cls)
ts = GNUNET_TIME_timestamp_get (); ts = GNUNET_TIME_timestamp_get ();
for (unsigned int r = 0; r<10; r++) for (unsigned int r = 0; r<10; r++)
{ {
<<<<<<< HEAD
plugin->start_read_committed (plugin->cls, plugin->start_read_committed (plugin->cls,
"test_by_j"); "test_by_j");
=======
plugin->start (plugin->cls,
"test_by_exchange_j");
>>>>>>> 26922c6d (batch modifications)
for (unsigned int k = 0; k<batch_size; k++) for (unsigned int k = 0; k<batch_size; k++)
{ {
RND_BLK (&reserves[k].reserve_pub); RND_BLK (&reserves[k].reserve_pub);
@ -120,6 +139,7 @@ run (void *cls)
reserves[k].sender_account_details = sndr; reserves[k].sender_account_details = sndr;
reserves[k].exchange_account_name = "name"; reserves[k].exchange_account_name = "name";
reserves[k].wire_reference = k; reserves[k].wire_reference = k;
<<<<<<< HEAD
} }
FAILIF (batch_size != FAILIF (batch_size !=
@ -129,13 +149,42 @@ run (void *cls)
results)); results));
plugin->commit (plugin->cls); plugin->commit (plugin->cls);
=======
}
FAILIF (batch_size !=
plugin->batch_reserves_in_insert (plugin->cls,
reserves,
batch_size,
results));
/*plugin->commit (plugin->cls);*/
>>>>>>> 26922c6d (batch modifications)
} }
/*
for (unsigned int s=0;s<10;s++)
{
for (unsigned int k = 0; k<batch_size; k++)
{
RND_BLK (&reserves2[k].reserve_pub);
reserves2[k].balance = value;
reserves2[k].execution_time = ts;
reserves2[k].sender_account_details = sndr;
reserves2[k].exchange_account_name = "name";
reserves2[k].wire_reference = k;
}
FAILIF (batch_size !=
plugin->batch_reserves_in_insert (plugin->cls,
reserves2,
batch_size,
results));
}*/
duration = GNUNET_TIME_absolute_get_duration (now); duration = GNUNET_TIME_absolute_get_duration (now);
fprintf (stdout, fprintf (stdout,
"for a batchsize equal to %d it took %s\n", "for a batchsize equal to %d it took %s\n",
batch_size, batch_size,
GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_STRINGS_relative_time_to_string (duration,
GNUNET_NO) ); GNUNET_NO) );
} }
result = 0; result = 0;
drop: drop:
@ -155,7 +204,6 @@ main (int argc,
char *config_filename; char *config_filename;
char *testname; char *testname;
struct GNUNET_CONFIGURATION_Handle *cfg; struct GNUNET_CONFIGURATION_Handle *cfg;
(void) argc; (void) argc;
result = -1; result = -1;
if (NULL == (plugin_name = strrchr (argv[0], (int) '-'))) if (NULL == (plugin_name = strrchr (argv[0], (int) '-')))
@ -163,6 +211,7 @@ main (int argc,
GNUNET_break (0); GNUNET_break (0);
return -1; return -1;
} }
GNUNET_log_setup (argv[0], GNUNET_log_setup (argv[0],
"WARNING", "WARNING",
NULL); NULL);