wallet-core: implement partial withdrawal batching, don't block when generating planchets

This commit is contained in:
Florian Dold 2023-02-10 13:21:37 +01:00
parent c4180e1290
commit 18c30b9a00
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
4 changed files with 210 additions and 206 deletions

View File

@ -1361,7 +1361,12 @@ export class ExchangeService implements ExchangeServiceInterface {
this.exchangeWirewatchProc = this.globalState.spawnService(
"taler-exchange-wirewatch",
["-c", this.configFilename, ...this.timetravelArgArr],
[
"-c",
this.configFilename,
"--longpoll-timeout=5s",
...this.timetravelArgArr,
],
`exchange-wirewatch-${this.name}`,
);
@ -1951,6 +1956,9 @@ export class WalletService {
],
`wallet-${this.opts.name}`,
);
logger.info(
`hint: connect to wallet using taler-wallet-cli --wallet-connection=${unixPath}`,
);
}
async pingUntilAvailable(): Promise<void> {

View File

@ -87,9 +87,10 @@ export async function runWithdrawalHugeTest(t: GlobalTestState) {
exchangeBaseUrl: exchange.baseUrl,
});
// Results in about 1K coins withdrawn
await wallet.client.call(WalletApiOperation.WithdrawFakebank, {
exchange: exchange.baseUrl,
amount: "TESTKUDOS:5000",
amount: "TESTKUDOS:10000",
bank: bank.baseUrl,
});

View File

@ -951,12 +951,12 @@ export const codecForBlindedDenominationSignature = () =>
.alternative(DenomKeyType.Rsa, codecForRsaBlindedDenominationSignature())
.build("BlindedDenominationSignature");
export class WithdrawResponse {
export class ExchangeWithdrawResponse {
ev_sig: BlindedDenominationSignature;
}
export class WithdrawBatchResponse {
ev_sigs: WithdrawResponse[];
export class ExchangeWithdrawBatchResponse {
ev_sigs: ExchangeWithdrawResponse[];
}
export interface MerchantPayResponse {
@ -1476,13 +1476,13 @@ export const codecForRecoupConfirmation = (): Codec<RecoupConfirmation> =>
.property("old_coin_pub", codecOptional(codecForString()))
.build("RecoupConfirmation");
export const codecForWithdrawResponse = (): Codec<WithdrawResponse> =>
buildCodecForObject<WithdrawResponse>()
export const codecForWithdrawResponse = (): Codec<ExchangeWithdrawResponse> =>
buildCodecForObject<ExchangeWithdrawResponse>()
.property("ev_sig", codecForBlindedDenominationSignature())
.build("WithdrawResponse");
export const codecForWithdrawBatchResponse = (): Codec<WithdrawBatchResponse> =>
buildCodecForObject<WithdrawBatchResponse>()
export const codecForWithdrawBatchResponse = (): Codec<ExchangeWithdrawBatchResponse> =>
buildCodecForObject<ExchangeWithdrawBatchResponse>()
.property("ev_sigs", codecForList(codecForWithdrawResponse()))
.build("WithdrawBatchResponse");
@ -1753,6 +1753,11 @@ export interface ExchangeWithdrawRequest {
coin_ev: CoinEnvelope;
}
export interface ExchangeBatchWithdrawRequest {
planchets: ExchangeWithdrawRequest[];
}
export interface ExchangeRefreshRevealRequest {
new_denoms_h: HashCodeString[];
coin_evs: CoinEnvelope[];

View File

@ -59,9 +59,11 @@ import {
TransactionType,
UnblindedSignature,
URL,
WithdrawBatchResponse,
WithdrawResponse,
ExchangeWithdrawBatchResponse,
ExchangeWithdrawResponse,
WithdrawUriInfoResponse,
ExchangeBatchWithdrawRequest,
WalletNotification,
} from "@gnu-taler/taler-util";
import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
import {
@ -93,6 +95,7 @@ import {
import { walletCoreDebugFlags } from "../util/debugFlags.js";
import {
HttpRequestLibrary,
HttpResponse,
readSuccessResponseJsonOrErrorCode,
readSuccessResponseJsonOrThrow,
throwUnexpectedRequestError,
@ -455,21 +458,43 @@ async function processPlanchetGenerate(
});
}
interface WithdrawalRequestBatchArgs {
/**
* Use the batched request on the network level.
* Not supported by older exchanges.
*/
useBatchRequest: boolean;
coinStartIndex: number;
batchSize: number;
}
interface WithdrawalBatchResult {
coinIdxs: number[];
batchResp: ExchangeWithdrawBatchResponse;
}
/**
* Send the withdrawal request for a generated planchet to the exchange.
*
* The verification of the response is done asynchronously to enable parallelism.
*/
async function processPlanchetExchangeRequest(
async function processPlanchetExchangeBatchRequest(
ws: InternalWalletState,
wgContext: WithdrawalGroupContext,
coinIdx: number,
): Promise<WithdrawResponse | undefined> {
args: WithdrawalRequestBatchArgs,
): Promise<WithdrawalBatchResult> {
const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
logger.info(
`processing planchet exchange request ${withdrawalGroup.withdrawalGroupId}/${coinIdx}`,
`processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}, start=${args.coinStartIndex}, len=${args.batchSize}`,
);
const d = await ws.db
const batchReq: ExchangeBatchWithdrawRequest = { planchets: [] };
// Indices of coins that are included in the batch request
const coinIdxs: number[] = [];
await ws.db
.mktx((x) => [
x.withdrawalGroups,
x.planchets,
@ -477,23 +502,23 @@ async function processPlanchetExchangeRequest(
x.denominations,
])
.runReadOnly(async (tx) => {
for (
let coinIdx = args.coinStartIndex;
coinIdx < args.coinStartIndex + args.batchSize &&
coinIdx < wgContext.numPlanchets;
coinIdx++
) {
let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
withdrawalGroup.withdrawalGroupId,
coinIdx,
]);
if (!planchet) {
return;
continue;
}
if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) {
logger.warn("processPlanchet: planchet already withdrawn");
return;
continue;
}
const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
if (!exchange) {
logger.error("db inconsistent: exchange for planchet not found");
return;
}
const denom = await ws.getDenomInfo(
ws,
tx,
@ -503,34 +528,28 @@ async function processPlanchetExchangeRequest(
if (!denom) {
logger.error("db inconsistent: denom for planchet not found");
return;
continue;
}
logger.trace(
`processing planchet #${coinIdx} in withdrawal ${withdrawalGroup.withdrawalGroupId}`,
);
const reqBody: ExchangeWithdrawRequest = {
const planchetReq: ExchangeWithdrawRequest = {
denom_pub_hash: planchet.denomPubHash,
reserve_sig: planchet.withdrawSig,
coin_ev: planchet.coinEv,
};
const reqUrl = new URL(
`reserves/${withdrawalGroup.reservePub}/withdraw`,
exchange.baseUrl,
).href;
return { reqUrl, reqBody };
batchReq.planchets.push(planchetReq);
coinIdxs.push(coinIdx);
}
});
if (!d) {
return;
if (batchReq.planchets.length == 0) {
logger.warn("empty withdrawal batch");
return {
batchResp: { ev_sigs: [] },
coinIdxs: [],
};
}
const { reqUrl, reqBody } = d;
try {
const resp = await ws.http.postJson(reqUrl, reqBody);
if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
async function handleKycRequired(resp: HttpResponse, startIdx: number) {
logger.info("withdrawal requires KYC");
const respJson = await resp.json();
const uuidResp = codecForWalletKycUuid().decode(respJson);
@ -538,14 +557,17 @@ async function processPlanchetExchangeRequest(
await ws.db
.mktx((x) => [x.planchets, x.withdrawalGroups])
.runReadWrite(async (tx) => {
for (let i = 0; i < startIdx; i++) {
let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
withdrawalGroup.withdrawalGroupId,
coinIdx,
coinIdxs[i],
]);
if (!planchet) {
return;
continue;
}
planchet.planchetStatus = PlanchetStatus.KycRequired;
await tx.planchets.put(planchet);
}
const wg2 = await tx.withdrawalGroups.get(
withdrawalGroup.withdrawalGroupId,
);
@ -556,17 +578,12 @@ async function processPlanchetExchangeRequest(
paytoHash: uuidResp.h_payto,
requirementRow: uuidResp.requirement_row,
};
await tx.planchets.put(planchet);
await tx.withdrawalGroups.put(wg2);
});
return;
}
const r = await readSuccessResponseJsonOrThrow(
resp,
codecForWithdrawResponse(),
);
return r;
} catch (e) {
async function storeCoinError(e: any, coinIdx: number) {
const errDetail = getErrorDetailFromException(e);
logger.trace("withdrawal request failed", e);
logger.trace(String(e));
@ -583,101 +600,81 @@ async function processPlanchetExchangeRequest(
planchet.lastError = errDetail;
await tx.planchets.put(planchet);
});
return;
}
}
/**
* Send the withdrawal request for a generated planchet to the exchange.
*
* The verification of the response is done asynchronously to enable parallelism.
*/
async function processPlanchetExchangeBatchRequest(
ws: InternalWalletState,
wgContext: WithdrawalGroupContext,
): Promise<WithdrawBatchResponse | undefined> {
const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
logger.info(
`processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}`,
);
const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms
.map((x) => x.count)
.reduce((a, b) => a + b);
const d = await ws.db
.mktx((x) => [
x.withdrawalGroups,
x.planchets,
x.exchanges,
x.denominations,
])
.runReadOnly(async (tx) => {
const reqBody: { planchets: ExchangeWithdrawRequest[] } = {
planchets: [],
};
const exchange = await tx.exchanges.get(withdrawalGroup.exchangeBaseUrl);
if (!exchange) {
logger.error("db inconsistent: exchange for planchet not found");
return;
}
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
let planchet = await tx.planchets.indexes.byGroupAndIndex.get([
withdrawalGroup.withdrawalGroupId,
coinIdx,
]);
if (!planchet) {
return;
}
if (planchet.planchetStatus === PlanchetStatus.WithdrawalDone) {
logger.warn("processPlanchet: planchet already withdrawn");
return;
}
const denom = await ws.getDenomInfo(
ws,
tx,
withdrawalGroup.exchangeBaseUrl,
planchet.denomPubHash,
);
if (!denom) {
logger.error("db inconsistent: denom for planchet not found");
return;
}
const planchetReq: ExchangeWithdrawRequest = {
denom_pub_hash: planchet.denomPubHash,
reserve_sig: planchet.withdrawSig,
coin_ev: planchet.coinEv,
};
reqBody.planchets.push(planchetReq);
}
return reqBody;
});
if (!d) {
return;
}
// FIXME: handle individual error codes better!
if (args.useBatchRequest) {
const reqUrl = new URL(
`reserves/${withdrawalGroup.reservePub}/batch-withdraw`,
withdrawalGroup.exchangeBaseUrl,
).href;
const resp = await ws.http.postJson(reqUrl, d);
try {
const resp = await ws.http.postJson(reqUrl, batchReq);
if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
await handleKycRequired(resp, 0);
}
const r = await readSuccessResponseJsonOrThrow(
resp,
codecForWithdrawBatchResponse(),
);
return r;
return {
coinIdxs,
batchResp: r,
};
} catch (e) {
await storeCoinError(e, coinIdxs[0]);
return {
batchResp: { ev_sigs: [] },
coinIdxs: [],
};
}
} else {
// We emulate the batch response here by making multiple individual requests
const responses: ExchangeWithdrawBatchResponse = {
ev_sigs: [],
};
for (let i = 0; i < batchReq.planchets.length; i++) {
try {
const p = batchReq.planchets[i];
const reqUrl = new URL(
`reserves/${withdrawalGroup.reservePub}/withdraw`,
withdrawalGroup.exchangeBaseUrl,
).href;
const resp = await ws.http.postJson(reqUrl, p);
if (resp.status === HttpStatusCode.UnavailableForLegalReasons) {
await handleKycRequired(resp, i);
// We still return blinded coins that we could actually withdraw.
return {
coinIdxs,
batchResp: responses,
};
}
const r = await readSuccessResponseJsonOrThrow(
resp,
codecForWithdrawResponse(),
);
responses.ev_sigs.push(r);
} catch (e) {
await storeCoinError(e, coinIdxs[i]);
}
}
return {
coinIdxs,
batchResp: responses,
};
}
}
async function processPlanchetVerifyAndStoreCoin(
ws: InternalWalletState,
wgContext: WithdrawalGroupContext,
coinIdx: number,
resp: WithdrawResponse,
resp: ExchangeWithdrawResponse,
): Promise<void> {
const withdrawalGroup = wgContext.wgRecord;
logger.info(`checking and storing planchet idx=${coinIdx}`);
const d = await ws.db
.mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations])
.runReadOnly(async (tx) => {
@ -791,6 +788,14 @@ async function processPlanchetVerifyAndStoreCoin(
wgContext.planchetsFinished.add(planchet.coinPub);
// We create the notification here, as the async transaction below
// allows other planchet withdrawals to change wgContext.planchetsFinished
const notification: WalletNotification = {
type: NotificationType.CoinWithdrawn,
numTotal: wgContext.numPlanchets,
numWithdrawn: wgContext.planchetsFinished.size,
}
// Check if this is the first time that the whole
// withdrawal succeeded. If so, mark the withdrawal
// group as finished.
@ -814,11 +819,7 @@ async function processPlanchetVerifyAndStoreCoin(
});
if (firstSuccess) {
ws.notify({
type: NotificationType.CoinWithdrawn,
numTotal: wgContext.numPlanchets,
numWithdrawn: wgContext.planchetsFinished.size,
});
ws.notify(notification);
}
}
@ -1150,8 +1151,6 @@ export async function processWithdrawalGroup(
wgRecord: withdrawalGroup,
};
let work: Promise<void>[] = [];
await ws.db
.mktx((x) => [x.planchets])
.runReadOnly(async (tx) => {
@ -1165,43 +1164,34 @@ export async function processWithdrawalGroup(
}
});
// We sequentially generate planchets, so that
// large withdrawal groups don't make the wallet unresponsive.
for (let i = 0; i < numTotalCoins; i++) {
work.push(processPlanchetGenerate(ws, withdrawalGroup, i));
await processPlanchetGenerate(ws, withdrawalGroup, i);
}
// Generate coins concurrently (parallelism only happens in the crypto API workers)
await Promise.all(work);
const maxBatchSize = 100;
for (let i = 0; i < numTotalCoins; i += maxBatchSize) {
const resp = await processPlanchetExchangeBatchRequest(ws, wgContext, {
batchSize: maxBatchSize,
coinStartIndex: i,
useBatchRequest: ws.batchWithdrawal,
});
let work: Promise<void>[] = [];
work = [];
if (ws.batchWithdrawal) {
const resp = await processPlanchetExchangeBatchRequest(ws, wgContext);
if (!resp) {
throw Error("unable to do batch withdrawal");
}
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
for (let j = 0; j < resp.coinIdxs.length; j++) {
work.push(
processPlanchetVerifyAndStoreCoin(
ws,
wgContext,
coinIdx,
resp.ev_sigs[coinIdx],
resp.coinIdxs[j],
resp.batchResp.ev_sigs[j],
),
);
}
} else {
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
const resp = await processPlanchetExchangeRequest(ws, wgContext, coinIdx);
if (!resp) {
continue;
}
work.push(
processPlanchetVerifyAndStoreCoin(ws, wgContext, coinIdx, resp),
);
}
}
await Promise.all(work);
}
let numFinished = 0;
let numKycRequired = 0;