From 3cf2d4cba919203065f210f80f3f081948ad257a Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 9 Feb 2023 22:44:36 +0100 Subject: [PATCH] wallet-core: expose withdrawal progress, towards huge withdrawal test --- packages/taler-harness/src/harness/harness.ts | 1 - ...drawal-high.ts => test-withdrawal-huge.ts} | 26 +++++--- .../src/integrationtests/testrunner.ts | 4 +- packages/taler-util/src/notifications.ts | 2 + packages/taler-util/src/twrpc-impl.node.ts | 24 ++------ .../src/operations/withdraw.ts | 59 +++++++++++++++---- packages/taler-wallet-core/src/remote.ts | 1 - 7 files changed, 75 insertions(+), 42 deletions(-) rename packages/taler-harness/src/integrationtests/{test-withdrawal-high.ts => test-withdrawal-huge.ts} (78%) diff --git a/packages/taler-harness/src/harness/harness.ts b/packages/taler-harness/src/harness/harness.ts index e95dea0c6..b2c73c9ab 100644 --- a/packages/taler-harness/src/harness/harness.ts +++ b/packages/taler-harness/src/harness/harness.ts @@ -1981,7 +1981,6 @@ export class WalletClient { walletClient.args.onNotification(n); } waiter.notify(n); - console.log("got notification from wallet-core in WalletClient"); }, }); this.remoteWallet = w; diff --git a/packages/taler-harness/src/integrationtests/test-withdrawal-high.ts b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts similarity index 78% rename from packages/taler-harness/src/integrationtests/test-withdrawal-high.ts rename to packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts index deb0e6dde..248931d71 100644 --- a/packages/taler-harness/src/integrationtests/test-withdrawal-high.ts +++ b/packages/taler-harness/src/integrationtests/test-withdrawal-huge.ts @@ -19,20 +19,21 @@ */ import { GlobalTestState, - WalletCli, setupDb, ExchangeService, FakebankService, + WalletService, + WalletClient, } from "../harness/harness.js"; import { WalletApiOperation } from "@gnu-taler/taler-wallet-core"; import { CoinConfig, defaultCoinConfig } from "../harness/denomStructures.js"; -import { URL } from "@gnu-taler/taler-util"; +import { NotificationType, URL } from "@gnu-taler/taler-util"; /** * Withdraw a high amount. Mostly intended * as a perf test. */ -export async function runWithdrawalHighTest(t: GlobalTestState) { +export async function runWithdrawalHugeTest(t: GlobalTestState) { // Set up test environment const db = await setupDb(t); @@ -71,7 +72,16 @@ export async function runWithdrawalHighTest(t: GlobalTestState) { console.log("setup done!"); - const wallet = new WalletCli(t); + const walletService = new WalletService(t, { name: "w1" }); + await walletService.start(); + await walletService.pingUntilAvailable(); + + const wallet = new WalletClient({ + unixPath: walletService.socketPath, + }); + await wallet.connect(); + + const withdrawalFinishedCond = wallet.waitForNotificationCond((wn) => wn.type === NotificationType.WithdrawGroupFinished); await wallet.client.call(WalletApiOperation.AddExchange, { exchangeBaseUrl: exchange.baseUrl, @@ -85,15 +95,13 @@ export async function runWithdrawalHighTest(t: GlobalTestState) { await exchange.runWirewatchOnce(); - await wallet.runUntilDone(); + await withdrawalFinishedCond; // Check balance const balResp = await wallet.client.call(WalletApiOperation.GetBalances, {}); console.log(balResp); - - await t.shutdown(); } -runWithdrawalHighTest.suites = ["wallet-perf"]; -runWithdrawalHighTest.excludeByDefault = true; +runWithdrawalHugeTest.suites = ["wallet-perf"]; +runWithdrawalHugeTest.excludeByDefault = true; diff --git a/packages/taler-harness/src/integrationtests/testrunner.ts b/packages/taler-harness/src/integrationtests/testrunner.ts index 3d70e6860..70008e386 100644 --- a/packages/taler-harness/src/integrationtests/testrunner.ts +++ b/packages/taler-harness/src/integrationtests/testrunner.ts @@ -95,7 +95,7 @@ import { runAgeRestrictionsPeerTest } from "./test-age-restrictions-peer.js"; import { runWalletNotificationsTest } from "./test-wallet-notifications.js"; import { runAgeRestrictionsMixedMerchantTest } from "./test-age-restrictions-mixed-merchant.js"; import { runWalletCryptoWorkerTest } from "./test-wallet-cryptoworker.js"; -import { runWithdrawalHighTest } from "./test-withdrawal-high.js"; +import { runWithdrawalHugeTest } from "./test-withdrawal-huge.js"; import { runKycTest } from "./test-kyc.js"; import { runPaymentAbortTest } from "./test-payment-abort.js"; import { runWithdrawalFeesTest } from "./test-withdrawal-fees.js"; @@ -190,7 +190,7 @@ const allTests: TestMainFunction[] = [ runWithdrawalBankIntegratedTest, runWithdrawalFakebankTest, runWithdrawalFeesTest, - runWithdrawalHighTest, + runWithdrawalHugeTest, ]; export interface TestRunSpec { diff --git a/packages/taler-util/src/notifications.ts b/packages/taler-util/src/notifications.ts index c50cc72de..bc1c4b71f 100644 --- a/packages/taler-util/src/notifications.ts +++ b/packages/taler-util/src/notifications.ts @@ -83,6 +83,8 @@ export interface ReserveNotYetFoundNotification { export interface CoinWithdrawnNotification { type: NotificationType.CoinWithdrawn; + numWithdrawn: number; + numTotal: number; } export interface RefundStartedNotification { diff --git a/packages/taler-util/src/twrpc-impl.node.ts b/packages/taler-util/src/twrpc-impl.node.ts index b6333da51..30e362e5b 100644 --- a/packages/taler-util/src/twrpc-impl.node.ts +++ b/packages/taler-util/src/twrpc-impl.node.ts @@ -30,7 +30,6 @@ const logger = new Logger("twrpc-impl.node.ts"); function readStreamLinewise(args: ReadLinewiseArgs): void { let chunks: Uint8Array[] = []; args.sock.on("data", (buf: Uint8Array) => { - logger.info(`received ${buf.length} bytes`); // Process all newlines in the newly received buffer while (1) { const newlineIdx = buf.indexOf("\n".charCodeAt(0)); @@ -78,28 +77,23 @@ export async function connectRpc(args: RpcConnectArgs): Promise { sock: client, onLine(line) { const lineStr = bytesToString(line); - logger.info(`got line from server: ${lineStr}`); // Are we currently parsing the body of a request? if (!parsingBody) { const strippedLine = lineStr.trim(); if (strippedLine == "%message") { - logger.info("got message start"); parsingBody = "message"; } else if (strippedLine == "%hello-from-server") { - logger.info("got hello from server"); } else if (strippedLine.startsWith("%error:")) { - logger.info("got error from server, disconnecting"); client.end(); res.onDisconnect(); } else { - logger.info("got unknown request"); + logger.warn("got unknown request"); client.write("%error: invalid message\n"); client.end(); } } else if (parsingBody == "message") { const strippedLine = lineStr.trim(); if (strippedLine == "%end") { - logger.info("finished request"); let req = bodyChunks.join(""); let reqJson: any = undefined; try { @@ -109,7 +103,6 @@ export async function connectRpc(args: RpcConnectArgs): Promise { logger.info(`message was: ${req}`); } if (reqJson !== undefined) { - logger.info(`request: ${req}`); res.onMessage(reqJson); } else { client.write("%error: invalid JSON"); @@ -149,7 +142,6 @@ export async function runRpcServer(args: RpcServerArgs): Promise { let parsingBody: string | undefined = undefined; let bodyChunks: string[] = []; - logger.info("got new connection"); sock.write("%hello-from-server\n"); const handlers = args.onConnect({ sendResponse(message) { @@ -161,21 +153,19 @@ export async function runRpcServer(args: RpcServerArgs): Promise { }); sock.on("error", (err) => { - logger.info(`connection error: ${err}`); + logger.error(`connection error: ${err}`); }); function processLine(line: Uint8Array) { const lineStr = bytesToString(line); - logger.info(`got line: ${lineStr}`); if (!parsingBody) { const strippedLine = lineStr.trim(); if (strippedLine == "%request") { - logger.info("got request start"); parsingBody = "request"; } else if (strippedLine === "%hello-from-client") { - console.log("got hello from client"); + // Nothing to do, ignore hello } else if (strippedLine.startsWith("%error:")) { - console.log("got error from client"); + logger.warn("got error from client"); sock.end(); handlers.onDisconnect(); } else { @@ -186,7 +176,6 @@ export async function runRpcServer(args: RpcServerArgs): Promise { } else if (parsingBody == "request") { const strippedLine = lineStr.trim(); if (strippedLine == "%end") { - logger.info("finished request"); let req = bodyChunks.join(""); let reqJson: any = undefined; try { @@ -195,7 +184,6 @@ export async function runRpcServer(args: RpcServerArgs): Promise { logger.warn("JSON request from client was invalid"); } if (reqJson !== undefined) { - logger.info(`request: ${req}`); handlers.onMessage(reqJson); } else { sock.write("%error: invalid JSON"); @@ -207,7 +195,7 @@ export async function runRpcServer(args: RpcServerArgs): Promise { bodyChunks.push(lineStr); } } else { - logger.info("invalid parser state"); + logger.error("invalid parser state"); sock.write("%error: internal error\n"); sock.end(); } @@ -219,7 +207,7 @@ export async function runRpcServer(args: RpcServerArgs): Promise { }); sock.on("close", (hadError: boolean) => { - logger.info(`connection closed, hadError=${hadError}`); + logger.trace(`connection closed, hadError=${hadError}`); handlers.onDisconnect(); }); }); diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 667b97361..caa280fe5 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -462,9 +462,10 @@ async function processPlanchetGenerate( */ async function processPlanchetExchangeRequest( ws: InternalWalletState, - withdrawalGroup: WithdrawalGroupRecord, + wgContext: WithdrawalGroupContext, coinIdx: number, ): Promise { + const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; logger.info( `processing planchet exchange request ${withdrawalGroup.withdrawalGroupId}/${coinIdx}`, ); @@ -593,8 +594,9 @@ async function processPlanchetExchangeRequest( */ async function processPlanchetExchangeBatchRequest( ws: InternalWalletState, - withdrawalGroup: WithdrawalGroupRecord, + wgContext: WithdrawalGroupContext, ): Promise { + const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord; logger.info( `processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}`, ); @@ -671,10 +673,11 @@ async function processPlanchetExchangeBatchRequest( async function processPlanchetVerifyAndStoreCoin( ws: InternalWalletState, - withdrawalGroup: WithdrawalGroupRecord, + wgContext: WithdrawalGroupContext, coinIdx: number, resp: WithdrawResponse, ): Promise { + const withdrawalGroup = wgContext.wgRecord; const d = await ws.db .mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations]) .runReadOnly(async (tx) => { @@ -786,6 +789,8 @@ async function processPlanchetVerifyAndStoreCoin( const planchetCoinPub = planchet.coinPub; + wgContext.planchetsFinished.add(planchet.coinPub); + // Check if this is the first time that the whole // withdrawal succeeded. If so, mark the withdrawal // group as finished. @@ -811,6 +816,8 @@ async function processPlanchetVerifyAndStoreCoin( if (firstSuccess) { ws.notify({ type: NotificationType.CoinWithdrawn, + numTotal: wgContext.numPlanchets, + numWithdrawn: wgContext.planchetsFinished.size, }); } } @@ -983,6 +990,21 @@ enum BankStatusResultCode { Aborted = "aborted", } +/** + * Withdrawal context that is kept in-memory. + * + * Used to store some cached info during a withdrawal operation. + */ +export interface WithdrawalGroupContext { + numPlanchets: number; + planchetsFinished: Set; + + /** + * Cached withdrawal group record from the database. + */ + wgRecord: WithdrawalGroupRecord; +} + export async function processWithdrawalGroup( ws: InternalWalletState, withdrawalGroupId: string, @@ -1122,8 +1144,27 @@ export async function processWithdrawalGroup( .map((x) => x.count) .reduce((a, b) => a + b); + const wgContext: WithdrawalGroupContext = { + numPlanchets: numTotalCoins, + planchetsFinished: new Set(), + wgRecord: withdrawalGroup, + }; + let work: Promise[] = []; + await ws.db + .mktx((x) => [x.planchets]) + .runReadOnly(async (tx) => { + const planchets = await tx.planchets.indexes.byGroup.getAll( + withdrawalGroupId, + ); + for (const p of planchets) { + if (p.planchetStatus === PlanchetStatus.WithdrawalDone) { + wgContext.planchetsFinished.add(p.coinPub); + } + } + }); + for (let i = 0; i < numTotalCoins; i++) { work.push(processPlanchetGenerate(ws, withdrawalGroup, i)); } @@ -1134,7 +1175,7 @@ export async function processWithdrawalGroup( work = []; if (ws.batchWithdrawal) { - const resp = await processPlanchetExchangeBatchRequest(ws, withdrawalGroup); + const resp = await processPlanchetExchangeBatchRequest(ws, wgContext); if (!resp) { throw Error("unable to do batch withdrawal"); } @@ -1142,7 +1183,7 @@ export async function processWithdrawalGroup( work.push( processPlanchetVerifyAndStoreCoin( ws, - withdrawalGroup, + wgContext, coinIdx, resp.ev_sigs[coinIdx], ), @@ -1150,16 +1191,12 @@ export async function processWithdrawalGroup( } } else { for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) { - const resp = await processPlanchetExchangeRequest( - ws, - withdrawalGroup, - coinIdx, - ); + const resp = await processPlanchetExchangeRequest(ws, wgContext, coinIdx); if (!resp) { continue; } work.push( - processPlanchetVerifyAndStoreCoin(ws, withdrawalGroup, coinIdx, resp), + processPlanchetVerifyAndStoreCoin(ws, wgContext, coinIdx, resp), ); } } diff --git a/packages/taler-wallet-core/src/remote.ts b/packages/taler-wallet-core/src/remote.ts index a240d4606..2628fea07 100644 --- a/packages/taler-wallet-core/src/remote.ts +++ b/packages/taler-wallet-core/src/remote.ts @@ -110,7 +110,6 @@ export async function createRemoteWallet( } h.promiseCapability.resolve(m as any); } else if (type === "notification") { - logger.info("got notification"); if (args.notificationHandler) { args.notificationHandler((m as any).payload); }