From 18c30b9a00a4e5dee629f4e06c261509ff7ba455 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Fri, 10 Feb 2023 13:21:37 +0100 Subject: [PATCH] wallet-core: implement partial withdrawal batching, don't block when generating planchets --- packages/taler-harness/src/harness/harness.ts | 10 +- .../integrationtests/test-withdrawal-huge.ts | 3 +- packages/taler-util/src/taler-types.ts | 19 +- .../src/operations/withdraw.ts | 384 +++++++++--------- 4 files changed, 210 insertions(+), 206 deletions(-) diff --git a/packages/taler-harness/src/harness/harness.ts b/packages/taler-harness/src/harness/harness.ts index 3403c266e..4e5d8238c 100644 --- a/packages/taler-harness/src/harness/harness.ts +++ b/packages/taler-harness/src/harness/harness.ts @@ -1361,7 +1361,12 @@ export class ExchangeService implements ExchangeServiceInterface { this.exchangeWirewatchProc = this.globalState.spawnService( "taler-exchange-wirewatch", - ["-c", this.configFilename, ...this.timetravelArgArr], + [ + "-c", + this.configFilename, + "--longpoll-timeout=5s", + ...this.timetravelArgArr, + ], `exchange-wirewatch-${this.name}`, ); @@ -1951,6 +1956,9 @@ export class WalletService { ], `wallet-${this.opts.name}`, ); + logger.info( + `hint: connect to wallet using taler-wallet-cli --wallet-connection=${unixPath}`, + ); } async pingUntilAvailable(): Promise { diff --git a/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts index 579d727b1..437d799b8 100644 --- a/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts +++ b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts @@ -87,9 +87,10 @@ export async function runWithdrawalHugeTest(t: GlobalTestState) { exchangeBaseUrl: exchange.baseUrl, }); + // Results in about 1K coins withdrawn await wallet.client.call(WalletApiOperation.WithdrawFakebank, { exchange: exchange.baseUrl, - amount: "TESTKUDOS:5000", + amount: "TESTKUDOS:10000", bank: bank.baseUrl, }); diff --git a/packages/taler-util/src/taler-types.ts b/packages/taler-util/src/taler-types.ts index a9303ed9c..bb15f0494 100644 --- a/packages/taler-util/src/taler-types.ts +++ b/packages/taler-util/src/taler-types.ts @@ -951,12 +951,12 @@ export const codecForBlindedDenominationSignature = () => .alternative(DenomKeyType.Rsa, codecForRsaBlindedDenominationSignature()) .build("BlindedDenominationSignature"); -export class WithdrawResponse { +export class ExchangeWithdrawResponse { ev_sig: BlindedDenominationSignature; } -export class WithdrawBatchResponse { - ev_sigs: WithdrawResponse[]; +export class ExchangeWithdrawBatchResponse { + ev_sigs: ExchangeWithdrawResponse[]; } export interface MerchantPayResponse { @@ -1476,13 +1476,13 @@ export const codecForRecoupConfirmation = (): Codec => .property("old_coin_pub", codecOptional(codecForString())) .build("RecoupConfirmation"); -export const codecForWithdrawResponse = (): Codec => - buildCodecForObject() +export const codecForWithdrawResponse = (): Codec => + buildCodecForObject() .property("ev_sig", codecForBlindedDenominationSignature()) .build("WithdrawResponse"); -export const codecForWithdrawBatchResponse = (): Codec => - buildCodecForObject() +export const codecForWithdrawBatchResponse = (): Codec => + buildCodecForObject() .property("ev_sigs", codecForList(codecForWithdrawResponse())) .build("WithdrawBatchResponse"); @@ -1753,6 +1753,11 @@ export interface ExchangeWithdrawRequest { coin_ev: CoinEnvelope; } +export interface ExchangeBatchWithdrawRequest { + planchets: ExchangeWithdrawRequest[]; +} + + export interface ExchangeRefreshRevealRequest { new_denoms_h: HashCodeString[]; coin_evs: CoinEnvelope[]; diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index caa280fe5..987a5e062 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -59,9 +59,11 @@ import { TransactionType, UnblindedSignature, URL, - WithdrawBatchResponse, - WithdrawResponse, + ExchangeWithdrawBatchResponse, + ExchangeWithdrawResponse, WithdrawUriInfoResponse, + ExchangeBatchWithdrawRequest, + WalletNotification, } from "@gnu-taler/taler-util"; import { EddsaKeypair } from "../crypto/cryptoImplementation.js"; import { @@ -93,6 +95,7 @@ import { import { walletCoreDebugFlags } from "../util/debugFlags.js"; import { HttpRequestLibrary, + HttpResponse, readSuccessResponseJsonOrErrorCode, readSuccessResponseJsonOrThrow, throwUnexpectedRequestError, @@ -455,21 +458,43 @@ async function processPlanchetGenerate( }); } +interface WithdrawalRequestBatchArgs { + /** + * Use the batched request on the network level. + * Not supported by older exchanges. + */ + useBatchRequest: boolean; + + coinStartIndex: number; + + batchSize: number; +} + +interface WithdrawalBatchResult { + coinIdxs: number[]; + batchResp: ExchangeWithdrawBatchResponse; +} + /** * Send the withdrawal request for a generated planchet to the exchange. * * The verification of the response is done asynchronously to enable parallelism. */ -async function processPlanchetExchangeRequest( +async function processPlanchetExchangeBatchRequest( ws: InternalWalletState, wgContext: WithdrawalGroupContext, - coinIdx: number, -): Promise { + args: WithdrawalRequestBatchArgs, +): Promise { const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; logger.info( - `processing planchet exchange request ${withdrawalGroup.withdrawalGroupId}/${coinIdx}`, + `processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}, start=${args.coinStartIndex}, len=${args.batchSize}`, ); - const d = await ws.db + + const batchReq: ExchangeBatchWithdrawRequest = { planchets: [] }; + // Indices of coins that are included in the batch request + const coinIdxs: number[] = []; + + await ws.db .mktx((x) => [ x.withdrawalGroups, x.planchets, @@ -477,96 +502,88 @@ async function processPlanchetExchangeRequest( x.denominations, ]) .runReadOnly(async (tx) => { - let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; + for ( + let coinIdx = args.coinStartIndex; + coinIdx < args.coinStartIndex + args.batchSize && + coinIdx < wgContext.numPlanchets; + coinIdx++ + ) { + let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ + withdrawalGroup.withdrawalGroupId, + coinIdx, + ]); + if (!planchet) { + continue; + } + if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) { + logger.warn("processPlanchet: planchet already withdrawn"); + continue; + } + const denom = await ws.getDenomInfo( + ws, + tx, + withdrawalGroup.exchangeBaseUrl, + planchet.denomPubHash, + ); + + if (!denom) { + logger.error("db inconsistent: denom for planchet not found"); + continue; + } + + const planchetReq: ExchangeWithdrawRequest = { + denom_pub_hash: planchet.denomPubHash, + reserve_sig: planchet.withdrawSig, + coin_ev: planchet.coinEv, + }; + batchReq.planchets.push(planchetReq); + coinIdxs.push(coinIdx); } - if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) { - logger.warn("processPlanchet: planchet already withdrawn"); - return; - } - const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl); - if (!exchange) { - logger.error("db inconsistent: exchange for planchet not found"); - return; - } - - const denom = await ws.getDenomInfo( - ws, - tx, - withdrawalGroup.exchangeBaseUrl, - planchet.denomPubHash, - ); - - if (!denom) { - logger.error("db inconsistent: denom for planchet not found"); - return; - } - - logger.trace( - `processing planchet #${coinIdx} in withdrawal ${withdrawalGroup.withdrawalGroupId}`, - ); - - const reqBody: ExchangeWithdrawRequest = { - denom_pub_hash: planchet.denomPubHash, - reserve_sig: planchet.withdrawSig, - coin_ev: planchet.coinEv, - }; - const reqUrl = new URL( - `reserves/${withdrawalGroup.reservePub}/withdraw`, - exchange.baseUrl, - ).href; - - return { reqUrl, reqBody }; }); - if (!d) { - return; + if (batchReq.planchets.length == 0) { + logger.warn("empty withdrawal batch"); + return { + batchResp: { ev_sigs: [] }, + coinIdxs: [], + }; } - const { reqUrl, reqBody } = d; - try { - const resp = await ws.http.postJson(reqUrl, reqBody); - if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { - logger.info("withdrawal requires KYC"); - const respJson = await resp.json(); - const uuidResp = codecForWalletKycUuid().decode(respJson); - logger.info(`kyc uuid response: ${j2s(uuidResp)}`); - await ws.db - .mktx((x) => [x.planchets, x.withdrawalGroups]) - .runReadWrite(async (tx) => { + async function handleKycRequired(resp: HttpResponse, startIdx: number) { + logger.info("withdrawal requires KYC"); + const respJson = await resp.json(); + const uuidResp = codecForWalletKycUuid().decode(respJson); + logger.info(`kyc uuid response: ${j2s(uuidResp)}`); + await ws.db + .mktx((x) => [x.planchets, x.withdrawalGroups]) + .runReadWrite(async (tx) => { + for (let i = 0; i < startIdx; i++) { let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ withdrawalGroup.withdrawalGroupId, - coinIdx, + coinIdxs[i], ]); if (!planchet) { - return; + continue; } planchet.planchetStatus = PlanchetStatus.KycRequired; - const wg2 = await tx.withdrawalGroups.get( - withdrawalGroup.withdrawalGroupId, - ); - if (!wg2) { - return; - } - wg2.kycPending = { - paytoHash: uuidResp.h_payto, - requirementRow: uuidResp.requirement_row, - }; await tx.planchets.put(planchet); - await tx.withdrawalGroups.put(wg2); - }); - return; - } - const r = await readSuccessResponseJsonOrThrow( - resp, - codecForWithdrawResponse(), - ); - return r; - } catch (e) { + } + const wg2 = await tx.withdrawalGroups.get( + withdrawalGroup.withdrawalGroupId, + ); + if (!wg2) { + return; + } + wg2.kycPending = { + paytoHash: uuidResp.h_payto, + requirementRow: uuidResp.requirement_row, + }; + await tx.withdrawalGroups.put(wg2); + }); + return; + } + + async function storeCoinError(e: any, coinIdx: number) { const errDetail = getErrorDetailFromException(e); logger.trace("withdrawal request failed", e); logger.trace(String(e)); @@ -583,101 +600,81 @@ async function processPlanchetExchangeRequest( planchet.lastError = errDetail; await tx.planchets.put(planchet); }); - return; } -} -/** - * Send the withdrawal request for a generated planchet to the exchange. - * - * The verification of the response is done asynchronously to enable parallelism. - */ -async function processPlanchetExchangeBatchRequest( - ws: InternalWalletState, - wgContext: WithdrawalGroupContext, -): Promise { - const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; - logger.info( - `processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}`, - ); - const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms - .map((x) => x.count) - .reduce((a, b) => a + b); - const d = await ws.db - .mktx((x) => [ - x.withdrawalGroups, - x.planchets, - x.exchanges, - x.denominations, - ]) - .runReadOnly(async (tx) => { - const reqBody: { planchets: ExchangeWithdrawRequest[] } = { - planchets: [], + // FIXME: handle individual error codes better! + + if (args.useBatchRequest) { + const reqUrl = new URL( + `reserves/${withdrawalGroup.reservePub}/batch-withdraw`, + withdrawalGroup.exchangeBaseUrl, + ).href; + + try { + const resp = await ws.http.postJson(reqUrl, batchReq); + if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { + await handleKycRequired(resp, 0); + } + const r = await readSuccessResponseJsonOrThrow( + resp, + codecForWithdrawBatchResponse(), + ); + return { + coinIdxs, + batchResp: r, }; - const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl); - if (!exchange) { - logger.error("db inconsistent: exchange for planchet not found"); - return; - } - - for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { - let planchet = await tx.planchets.indexes.byGroupAndIndex.get([ - withdrawalGroup.withdrawalGroupId, - coinIdx, - ]); - if (!planchet) { - return; - } - if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) { - logger.warn("processPlanchet: planchet already withdrawn"); - return; - } - const denom = await ws.getDenomInfo( - ws, - tx, + } catch (e) { + await storeCoinError(e, coinIdxs[0]); + return { + batchResp: { ev_sigs: [] }, + coinIdxs: [], + }; + } + } else { + // We emulate the batch response here by making multiple individual requests + const responses: ExchangeWithdrawBatchResponse = { + ev_sigs: [], + }; + for (let i = 0; i < batchReq.planchets.length; i++) { + try { + const p = batchReq.planchets[i]; + const reqUrl = new URL( + `reserves/${withdrawalGroup.reservePub}/withdraw`, withdrawalGroup.exchangeBaseUrl, - planchet.denomPubHash, - ); - - if (!denom) { - logger.error("db inconsistent: denom for planchet not found"); - return; + ).href; + const resp = await ws.http.postJson(reqUrl, p); + if (resp.status === HttpStatusCode.UnavailableForLegalReasons) { + await handleKycRequired(resp, i); + // We still return blinded coins that we could actually withdraw. + return { + coinIdxs, + batchResp: responses, + }; } - - const planchetReq: ExchangeWithdrawRequest = { - denom_pub_hash: planchet.denomPubHash, - reserve_sig: planchet.withdrawSig, - coin_ev: planchet.coinEv, - }; - reqBody.planchets.push(planchetReq); + const r = await readSuccessResponseJsonOrThrow( + resp, + codecForWithdrawResponse(), + ); + responses.ev_sigs.push(r); + } catch (e) { + await storeCoinError(e, coinIdxs[i]); } - return reqBody; - }); - - if (!d) { - return; + } + return { + coinIdxs, + batchResp: responses, + }; } - - const reqUrl = new URL( - `reserves/${withdrawalGroup.reservePub}/batch-withdraw`, - withdrawalGroup.exchangeBaseUrl, - ).href; - - const resp = await ws.http.postJson(reqUrl, d); - const r = await readSuccessResponseJsonOrThrow( - resp, - codecForWithdrawBatchResponse(), - ); - return r; } async function processPlanchetVerifyAndStoreCoin( ws: InternalWalletState, wgContext: WithdrawalGroupContext, coinIdx: number, - resp: WithdrawResponse, + resp: ExchangeWithdrawResponse, ): Promise { const withdrawalGroup = wgContext.wgRecord; + logger.info(`checking and storing planchet idx=${coinIdx}`); const d = await ws.db .mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations]) .runReadOnly(async (tx) => { @@ -791,6 +788,14 @@ async function processPlanchetVerifyAndStoreCoin( wgContext.planchetsFinished.add(planchet.coinPub); + // We create the notification here, as the async transaction below + // allows other planchet withdrawals to change wgContext.planchetsFinished + const notification: WalletNotification = { + type: NotificationType.CoinWithdrawn, + numTotal: wgContext.numPlanchets, + numWithdrawn: wgContext.planchetsFinished.size, + } + // Check if this is the first time that the whole // withdrawal succeeded. If so, mark the withdrawal // group as finished. @@ -814,11 +819,7 @@ async function processPlanchetVerifyAndStoreCoin( }); if (firstSuccess) { - ws.notify({ - type: NotificationType.CoinWithdrawn, - numTotal: wgContext.numPlanchets, - numWithdrawn: wgContext.planchetsFinished.size, - }); + ws.notify(notification); } } @@ -1150,8 +1151,6 @@ export async function processWithdrawalGroup( wgRecord: withdrawalGroup, }; - let work: Promise[] = []; - await ws.db .mktx((x) => [x.planchets]) .runReadOnly(async (tx) => { @@ -1165,44 +1164,35 @@ export async function processWithdrawalGroup( } }); + // We sequentially generate planchets, so that + // large withdrawal groups don't make the wallet unresponsive. for (let i = 0; i < numTotalCoins; i++) { - work.push(processPlanchetGenerate(ws, withdrawalGroup, i)); + await processPlanchetGenerate(ws, withdrawalGroup, i); } - // Generate coins concurrently (parallelism only happens in the crypto API workers) - await Promise.all(work); + const maxBatchSize = 100; - work = []; - - if (ws.batchWithdrawal) { - const resp = await processPlanchetExchangeBatchRequest(ws, wgContext); - if (!resp) { - throw Error("unable to do batch withdrawal"); - } - for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { + for (let i = 0; i < numTotalCoins; i += maxBatchSize) { + const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, { + batchSize: maxBatchSize, + coinStartIndex: i, + useBatchRequest: ws.batchWithdrawal, + }); + let work: Promise[] = []; + work = []; + for (let j = 0; j < resp.coinIdxs.length; j++) { work.push( processPlanchetVerifyAndStoreCoin( ws, wgContext, - coinIdx, - resp.ev_sigs[coinIdx], + resp.coinIdxs[j], + resp.batchResp.ev_sigs[j], ), ); } - } else { - for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { - const resp = await processPlanchetExchangeRequest(ws, wgContext, coinIdx); - if (!resp) { - continue; - } - work.push( - processPlanchetVerifyAndStoreCoin(ws, wgContext, coinIdx, resp), - ); - } + await Promise.all(work); } - await Promise.all(work); - let numFinished = 0; let numKycRequired = 0; let finishedForFirstTime = false;