array-based do_reserves_in_insert

This commit is contained in:
Christian Grothoff 2023-05-18 14:45:28 +02:00
parent 9f081d28d7
commit bac7123763
No known key found for this signature in database
GPG Key ID: 939E6BE1E29FC3CC
2 changed files with 222 additions and 8 deletions

View File

@ -963,3 +963,144 @@ BEGIN
CLOSE curs_transaction_exist; CLOSE curs_transaction_exist;
RETURN; RETURN;
END $$; END $$;
CREATE OR REPLACE FUNCTION exchange_do_array_reserves_insert(
IN in_gc_date INT8,
IN in_reserve_expiration INT8,
IN ina_reserve_pub BYTEA[],
IN ina_wire_ref INT8[],
IN ina_credit_val INT8[],
IN ina_credit_frac INT4[],
IN ina_exchange_account_name VARCHAR[],
IN ina_execution_date INT8[],
IN ina_wire_source_h_payto BYTEA[],
IN ina_payto_uri VARCHAR[],
IN ina_notify TEXT[],
OUT transaction_duplicate BOOLEAN,
OUT ruuid INT8)
LANGUAGE plpgsql
AS $$
DECLARE
curs REFCURSOR;
DECLARE
conflict BOOL;
DECLARE
dup BOOL;
DECLARE
uuid INT8;
DECLARE
i RECORD;
BEGIN
INSERT INTO wire_targets
(wire_target_h_payto
,payto_uri)
SELECT
wire_source_h_payto
,payto_uri
FROM
UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto
,UNNEST (ina_payto_uri) AS payto_uri
ON CONFLICT DO NOTHING;
OPEN curs FOR
WITH reserve_changes AS (
SELECT
reserve_pub
,wire_ref
,credit_val
,credit_frac
,exchange_account_name
,execution_date
,wire_source_h_payto
,payto_uri
,notify
FROM
UNNEST (ina_reserve_pub) AS reserve_pub
,UNNEST (ina_wire_ref) AS wire_ref
,UNNEST (ina_credit_val) AS credit_val
,UNNEST (ina_credit_frac) AS credit_frac
,UNNEST (ina_exchange_account_name) AS exchange_account_name
,UNNEST (ina_execution_date) AS execution_date
,UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto
,UNNEST (ina_notify) AS notify;
<<loop>> LOOP
FETCH FROM curs INTO i;
IF NOT FOUND
THEN
EXIT loop;
END IF;
INSERT INTO reserves
(reserve_pub
,current_balance_val
,current_balance_frac
,expiration_date
,gc_date
) VALUES (
i.reserve_pub
,i.credit_val
,i.credit_frac
,in_reserve_expiration
,in_gc_date
)
ON CONFLICT DO NOTHING
RETURNING reserve_uuid
INTO uuid;
conflict = NOT FOUND;
INSERT INTO reserves_in
(reserve_pub
,wire_reference
,credit_val
,credit_frac
,exchange_account_section
,wire_source_h_payto
,execution_date
) VALUES (
i.reserve_pub
,i.wire_reference
,i.credit_val
,i.credit_frac
,i.exchange_account_section
,i.wire_source_h_payto
,i.execution_date
ON CONFLICT DO NOTHING;
IF NOT FOUND
THEN
IF conflict
THEN
dup = TRUE;
else
dup = FALSE;
END IF;
ELSE
IF NOT conflict
THEN
EXECUTE FORMAT (
'NOTIFY %s'
,i.notify);
END IF;
dup = FALSE;
END IF;
RETURN (dup,uuid);
END LOOP loop_reserve;
CLOSE curs;
RETURN;
END $$;

View File

@ -619,12 +619,36 @@ TEH_PG_reserves_in_insert (
#if 0 #if 0
/**
* Closure for our helper_cb()
*/
struct Context struct Context
{ {
/**
* Array of reserve UUIDs to initialize.
*/
uint64_t *reserve_uuids; uint64_t *reserve_uuids;
/**
* Array with entries set to 'true' for duplicate transactions.
*/
bool *transaction_duplicates; bool *transaction_duplicates;
/**
* Array with entries set to 'true' for rows with conflicts.
*/
bool *conflicts; bool *conflicts;
/**
* Set to #GNUNET_SYSERR on failures.
*/
struct GNUNET_GenericReturnValue status; struct GNUNET_GenericReturnValue status;
/**
* Single value (no array) set to true if we need
* to follow-up with an update.
*/
bool *needs_update;
}; };
@ -665,6 +689,7 @@ helper_cb (void *cls,
ctx->status = GNUNET_SYSERR; ctx->status = GNUNET_SYSERR;
return; return;
} }
*ctx->need_update |= ctx->conflicts[i];
} }
} }
@ -685,7 +710,6 @@ TEH_PG_reserves_in_insertN (
const char *sender_account_details[GNUNET_NZL (reserves_length)]; const char *sender_account_details[GNUNET_NZL (reserves_length)];
const char *exchange_account_names[GNUNET_NZL (reserves_length)]; const char *exchange_account_names[GNUNET_NZL (reserves_length)];
uint64_t wire_references[GNUNET_NZL (reserves_length)]; uint64_t wire_references[GNUNET_NZL (reserves_length)];
uint64_t reserve_uuids[GNUNET_NZL (reserves_length)]; uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
bool transaction_duplicates[GNUNET_NZL (reserves_length)]; bool transaction_duplicates[GNUNET_NZL (reserves_length)];
bool conflicts[GNUNET_NZL (reserves_length)]; bool conflicts[GNUNET_NZL (reserves_length)];
@ -693,6 +717,7 @@ TEH_PG_reserves_in_insertN (
= GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
struct GNUNET_TIME_Timestamp gc struct GNUNET_TIME_Timestamp gc
= GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
bool needs_update = false;
enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qs;
for (unsigned int i = 0; i<reserves_length; i++) for (unsigned int i = 0; i<reserves_length; i++)
@ -709,6 +734,25 @@ TEH_PG_reserves_in_insertN (
exchange_account_names[i] = reserve->exchange_account_name; exchange_account_names[i] = reserve->exchange_account_name;
wire_references[i] = reserve->wire_reference; wire_references[i] = reserve->wire_reference;
} }
/* NOTE: kind-of pointless to explicitly start a transaction here... */
if (GNUNET_OK !=
TEH_PG_preflight (pg))
{
GNUNET_break (0);
qs = GNUNET_DB_STATUS_HARD_ERROR;
goto finished;
}
if (GNUNET_OK !=
TEH_PG_start_read_committed (pg,
"READ_COMMITED"))
{
GNUNET_break (0);
qs = GNUNET_DB_STATUS_HARD_ERROR;
goto finished;
}
PREPARE (pg, PREPARE (pg,
"reserves_insert_with_array", "reserves_insert_with_array",
"SELECT" "SELECT"
@ -752,6 +796,7 @@ TEH_PG_reserves_in_insertN (
.reserve_uuids = reserve_uuids, .reserve_uuids = reserve_uuids,
.transaction_duplicates = transaction_duplicates, .transaction_duplicates = transaction_duplicates,
.conflicts = conflicts, .conflicts = conflicts,
.needs_update = &needs_update,
.status = GNUNET_OK .status = GNUNET_OK
}; };
@ -766,12 +811,42 @@ TEH_PG_reserves_in_insertN (
GNUNET_log (GNUNET_ERROR_TYPE_WARNING, GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to insert into reserves (%d)\n", "Failed to insert into reserves (%d)\n",
qs); qs);
for (unsigned int i = 0; i<reserves_length; i++) goto finished;
GNUNET_free (rrs[i].notify_s);
return qs;
} }
} }
{
enum GNUNET_DB_QueryStatus cs;
cs = TEH_PG_commit (pg);
if (cs < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to commit\n");
qs = cs;
goto finished;
}
}
for (unsigned int i = 0; i<reserves_length; i++)
{
results[i] = transaction_duplicates[i]
? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
if (! need_update)
{
qs = reserves_length;
goto finished;
}
if (GNUNET_OK !=
TEH_PG_start (pg,
"reserve-insert-continued"))
{
GNUNET_break (0);
qs = GNUNET_DB_STATUS_HARD_ERROR;
goto finished;
}
for (unsigned int i = 0; i<reserves_length; i++) for (unsigned int i = 0; i<reserves_length; i++)
{ {
@ -806,16 +881,14 @@ TEH_PG_reserves_in_insertN (
"Failed to update reserves (%d)\n", "Failed to update reserves (%d)\n",
qs); qs);
results[i] = qs; results[i] = qs;
for (unsigned int i = 0; i<reserves_length; i++) goto finished;
GNUNET_free (rrs[i].notify_s);
return qs;
} }
results[i] = duplicate results[i] = duplicate
? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
} }
} }
// FIXME: convert results back for caller, too! finished:
for (unsigned int i = 0; i<reserves_length; i++) for (unsigned int i = 0; i<reserves_length; i++)
GNUNET_free (rrs[i].notify_s); GNUNET_free (rrs[i].notify_s);
return qs; return qs;