wallet-core: implement batch withdrawal

This commit is contained in:
Florian Dold 2022-05-03 17:53:32 +02:00
parent b4e219f7ff
commit f16d2e52d5
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
5 changed files with 162 additions and 15 deletions

View File

@ -904,6 +904,10 @@ export class WithdrawResponse {
ev_sig: BlindedDenominationSignature;
}
export class WithdrawBatchResponse {
ev_sigs: WithdrawResponse[];
}
/**
* Easy to process format for the public data of coins
* managed by the wallet.
@ -1452,6 +1456,11 @@ export const codecForWithdrawResponse = (): Codec<WithdrawResponse> =>
.property("ev_sig", codecForBlindedDenominationSignature())
.build("WithdrawResponse");
export const codecForWithdrawBatchResponse = (): Codec<WithdrawBatchResponse> =>
buildCodecForObject<WithdrawBatchResponse>()
.property("ev_sigs", codecForList(codecForWithdrawResponse()))
.build("WithdrawBatchResponse");
export const codecForMerchantPayResponse = (): Codec<MerchantPayResponse> =>
buildCodecForObject<MerchantPayResponse>()
.property("sig", codecForString())

View File

@ -195,6 +195,14 @@ export const walletCli = clk
type WalletCliArgsType = clk.GetArgType<typeof walletCli>;
function checkEnvFlag(name: string): boolean {
const val = process.env[name];
if (val == "1") {
return true;
}
return false;
}
async function withWallet<T>(
walletCliArgs: WalletCliArgsType,
f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>,
@ -208,6 +216,11 @@ async function withWallet<T>(
persistentStoragePath: dbPath,
httpLib: myHttpLib,
});
if (checkEnvFlag("TALER_WALLET_BATCH_WITHDRAWAL")) {
wallet.setBatchWithdrawal(true);
}
applyVerbose(walletCliArgs.wallet.verbose);
try {
const w = {

View File

@ -215,6 +215,8 @@ export interface InternalWalletState {
insecureTrustExchange: boolean;
batchWithdrawal: boolean;
/**
* Asynchronous condition to interrupt the sleep of the
* retry loop.

View File

@ -24,6 +24,7 @@ import {
AmountString,
BankWithdrawDetails,
codecForTalerConfigResponse,
codecForWithdrawBatchResponse,
codecForWithdrawOperationStatusResponse,
codecForWithdrawResponse,
DenomKeyType,
@ -42,6 +43,7 @@ import {
UnblindedSignature,
URL,
VersionMatchResult,
WithdrawBatchResponse,
WithdrawResponse,
WithdrawUriInfoResponse,
} from "@gnu-taler/taler-util";
@ -70,11 +72,7 @@ import {
readSuccessResponseJsonOrThrow,
} from "../util/http.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
import {
resetRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import { resetRetryInfo, RetryInfo } from "../util/retries.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
@ -585,6 +583,108 @@ async function processPlanchetExchangeRequest(
}
}
/**
* Send the withdrawal request for a generated planchet to the exchange.
*
* The verification of the response is done asynchronously to enable parallelism.
*/
async function processPlanchetExchangeBatchRequest(
ws: InternalWalletState,
withdrawalGroup: WithdrawalGroupRecord,
): Promise<WithdrawBatchResponse | undefined> {
logger.info(
`processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}`,
);
const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms
.map((x) => x.count)
.reduce((a, b) => a + b);
const d = await ws.db
.mktx((x) => ({
withdrawalGroups: x.withdrawalGroups,
planchets: x.planchets,
exchanges: x.exchanges,
denominations: x.denominations,
}))
.runReadOnly(async (tx) => {
const reqBody: { planchets: ExchangeWithdrawRequest[] } = {
planchets: [],
};
const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
if (!exchange) {
logger.error("db inconsistent: exchange for planchet not found");
return;
}
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
withdrawalGroup.withdrawalGroupId,
coinIdx,
]);
if (!planchet) {
return;
}
if (planchet.withdrawalDone) {
logger.warn("processPlanchet: planchet already withdrawn");
return;
}
const denom = await ws.getDenomInfo(
ws,
tx,
withdrawalGroup.exchangeBaseUrl,
planchet.denomPubHash,
);
if (!denom) {
logger.error("db inconsistent: denom for planchet not found");
return;
}
const planchetReq: ExchangeWithdrawRequest = {
denom_pub_hash: planchet.denomPubHash,
reserve_sig: planchet.withdrawSig,
coin_ev: planchet.coinEv,
};
reqBody.planchets.push(planchetReq);
}
return reqBody;
});
if (!d) {
return;
}
const reqUrl = new URL(
`reserves/${withdrawalGroup.reservePub}/batch-withdraw`,
withdrawalGroup.exchangeBaseUrl,
).href;
try {
const resp = await ws.http.postJson(reqUrl, d);
const r = await readSuccessResponseJsonOrThrow(
resp,
codecForWithdrawBatchResponse(),
);
return r;
} catch (e) {
const errDetail = getErrorDetailFromException(e);
logger.trace("withdrawal batch request failed", e);
logger.trace(e);
await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
.runReadWrite(async (tx) => {
let wg = await tx.withdrawalGroups.get(
withdrawalGroup.withdrawalGroupId,
);
if (!wg) {
return;
}
wg.lastError = errDetail;
await tx.withdrawalGroups.put(wg);
});
return;
}
}
async function processPlanchetVerifyAndStoreCoin(
ws: InternalWalletState,
withdrawalGroup: WithdrawalGroupRecord,
@ -931,18 +1031,35 @@ async function processWithdrawGroupImpl(
work = [];
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
const resp = await processPlanchetExchangeRequest(
ws,
withdrawalGroup,
coinIdx,
);
if (ws.batchWithdrawal) {
const resp = await processPlanchetExchangeBatchRequest(ws, withdrawalGroup);
if (!resp) {
continue;
return;
}
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
work.push(
processPlanchetVerifyAndStoreCoin(
ws,
withdrawalGroup,
coinIdx,
resp.ev_sigs[coinIdx],
),
);
}
} else {
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
const resp = await processPlanchetExchangeRequest(
ws,
withdrawalGroup,
coinIdx,
);
if (!resp) {
continue;
}
work.push(
processPlanchetVerifyAndStoreCoin(ws, withdrawalGroup, coinIdx, resp),
);
}
work.push(
processPlanchetVerifyAndStoreCoin(ws, withdrawalGroup, coinIdx, resp),
);
}
await Promise.all(work);

View File

@ -1101,6 +1101,10 @@ export class Wallet {
this.ws.insecureTrustExchange = true;
}
setBatchWithdrawal(enable: boolean): void {
this.ws.batchWithdrawal = enable;
}
static async create(
db: DbAccess<typeof WalletStoresV1>,
http: HttpRequestLibrary,
@ -1158,6 +1162,8 @@ class InternalWalletStateImpl implements InternalWalletState {
insecureTrustExchange = false;
batchWithdrawal = false;
readonly timerGroup: TimerGroup;
latch = new AsyncCondition();
stopped = false;