wallet-core: expose withdrawal progress, towards huge withdrawal test

This commit is contained in:
Florian Dold 2023-02-09 22:44:36 +01:00
parent a8c5a9696c
commit 3cf2d4cba9
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
7 changed files with 75 additions and 42 deletions

View File

@ -1981,7 +1981,6 @@ export class WalletClient {
walletClient.args.onNotification(n);
}
waiter.notify(n);
console.log("got notification from wallet-core in WalletClient");
},
});
this.remoteWallet = w;

View File

@ -19,20 +19,21 @@
*/
import {
GlobalTestState,
WalletCli,
setupDb,
ExchangeService,
FakebankService,
WalletService,
WalletClient,
} from "../harness/harness.js";
import { WalletApiOperation } from "@gnu-taler/taler-wallet-core";
import { CoinConfig, defaultCoinConfig } from "../harness/denomStructures.js";
import { URL } from "@gnu-taler/taler-util";
import { NotificationType, URL } from "@gnu-taler/taler-util";
/**
* Withdraw a high amount. Mostly intended
* as a perf test.
*/
export async function runWithdrawalHighTest(t: GlobalTestState) {
export async function runWithdrawalHugeTest(t: GlobalTestState) {
// Set up test environment
const db = await setupDb(t);
@ -71,7 +72,16 @@ export async function runWithdrawalHighTest(t: GlobalTestState) {
console.log("setup done!");
const wallet = new WalletCli(t);
const walletService = new WalletService(t, { name: "w1" });
await walletService.start();
await walletService.pingUntilAvailable();
const wallet = new WalletClient({
unixPath: walletService.socketPath,
});
await wallet.connect();
const withdrawalFinishedCond = wallet.waitForNotificationCond((wn) => wn.type === NotificationType.WithdrawGroupFinished);
await wallet.client.call(WalletApiOperation.AddExchange, {
exchangeBaseUrl: exchange.baseUrl,
@ -85,15 +95,13 @@ export async function runWithdrawalHighTest(t: GlobalTestState) {
await exchange.runWirewatchOnce();
await wallet.runUntilDone();
await withdrawalFinishedCond;
// Check balance
const balResp = await wallet.client.call(WalletApiOperation.GetBalances, {});
console.log(balResp);
await t.shutdown();
}
runWithdrawalHighTest.suites = ["wallet-perf"];
runWithdrawalHighTest.excludeByDefault = true;
runWithdrawalHugeTest.suites = ["wallet-perf"];
runWithdrawalHugeTest.excludeByDefault = true;

View File

@ -95,7 +95,7 @@ import { runAgeRestrictionsPeerTest } from "./test-age-restrictions-peer.js";
import { runWalletNotificationsTest } from "./test-wallet-notifications.js";
import { runAgeRestrictionsMixedMerchantTest } from "./test-age-restrictions-mixed-merchant.js";
import { runWalletCryptoWorkerTest } from "./test-wallet-cryptoworker.js";
import { runWithdrawalHighTest } from "./test-withdrawal-high.js";
import { runWithdrawalHugeTest } from "./test-withdrawal-huge.js";
import { runKycTest } from "./test-kyc.js";
import { runPaymentAbortTest } from "./test-payment-abort.js";
import { runWithdrawalFeesTest } from "./test-withdrawal-fees.js";
@ -190,7 +190,7 @@ const allTests: TestMainFunction[] = [
runWithdrawalBankIntegratedTest,
runWithdrawalFakebankTest,
runWithdrawalFeesTest,
runWithdrawalHighTest,
runWithdrawalHugeTest,
];
export interface TestRunSpec {

View File

@ -83,6 +83,8 @@ export interface ReserveNotYetFoundNotification {
export interface CoinWithdrawnNotification {
type: NotificationType.CoinWithdrawn;
numWithdrawn: number;
numTotal: number;
}
export interface RefundStartedNotification {

View File

@ -30,7 +30,6 @@ const logger = new Logger("twrpc-impl.node.ts");
function readStreamLinewise(args: ReadLinewiseArgs): void {
let chunks: Uint8Array[] = [];
args.sock.on("data", (buf: Uint8Array) => {
logger.info(`received ${buf.length} bytes`);
// Process all newlines in the newly received buffer
while (1) {
const newlineIdx = buf.indexOf("\n".charCodeAt(0));
@ -78,28 +77,23 @@ export async function connectRpc<T>(args: RpcConnectArgs<T>): Promise<T> {
sock: client,
onLine(line) {
const lineStr = bytesToString(line);
logger.info(`got line from server: ${lineStr}`);
// Are we currently parsing the body of a request?
if (!parsingBody) {
const strippedLine = lineStr.trim();
if (strippedLine == "%message") {
logger.info("got message start");
parsingBody = "message";
} else if (strippedLine == "%hello-from-server") {
logger.info("got hello from server");
} else if (strippedLine.startsWith("%error:")) {
logger.info("got error from server, disconnecting");
client.end();
res.onDisconnect();
} else {
logger.info("got unknown request");
logger.warn("got unknown request");
client.write("%error: invalid message\n");
client.end();
}
} else if (parsingBody == "message") {
const strippedLine = lineStr.trim();
if (strippedLine == "%end") {
logger.info("finished request");
let req = bodyChunks.join("");
let reqJson: any = undefined;
try {
@ -109,7 +103,6 @@ export async function connectRpc<T>(args: RpcConnectArgs<T>): Promise<T> {
logger.info(`message was: ${req}`);
}
if (reqJson !== undefined) {
logger.info(`request: ${req}`);
res.onMessage(reqJson);
} else {
client.write("%error: invalid JSON");
@ -149,7 +142,6 @@ export async function runRpcServer(args: RpcServerArgs): Promise<void> {
let parsingBody: string | undefined = undefined;
let bodyChunks: string[] = [];
logger.info("got new connection");
sock.write("%hello-from-server\n");
const handlers = args.onConnect({
sendResponse(message) {
@ -161,21 +153,19 @@ export async function runRpcServer(args: RpcServerArgs): Promise<void> {
});
sock.on("error", (err) => {
logger.info(`connection error: ${err}`);
logger.error(`connection error: ${err}`);
});
function processLine(line: Uint8Array) {
const lineStr = bytesToString(line);
logger.info(`got line: ${lineStr}`);
if (!parsingBody) {
const strippedLine = lineStr.trim();
if (strippedLine == "%request") {
logger.info("got request start");
parsingBody = "request";
} else if (strippedLine === "%hello-from-client") {
console.log("got hello from client");
// Nothing to do, ignore hello
} else if (strippedLine.startsWith("%error:")) {
console.log("got error from client");
logger.warn("got error from client");
sock.end();
handlers.onDisconnect();
} else {
@ -186,7 +176,6 @@ export async function runRpcServer(args: RpcServerArgs): Promise<void> {
} else if (parsingBody == "request") {
const strippedLine = lineStr.trim();
if (strippedLine == "%end") {
logger.info("finished request");
let req = bodyChunks.join("");
let reqJson: any = undefined;
try {
@ -195,7 +184,6 @@ export async function runRpcServer(args: RpcServerArgs): Promise<void> {
logger.warn("JSON request from client was invalid");
}
if (reqJson !== undefined) {
logger.info(`request: ${req}`);
handlers.onMessage(reqJson);
} else {
sock.write("%error: invalid JSON");
@ -207,7 +195,7 @@ export async function runRpcServer(args: RpcServerArgs): Promise<void> {
bodyChunks.push(lineStr);
}
} else {
logger.info("invalid parser state");
logger.error("invalid parser state");
sock.write("%error: internal error\n");
sock.end();
}
@ -219,7 +207,7 @@ export async function runRpcServer(args: RpcServerArgs): Promise<void> {
});
sock.on("close", (hadError: boolean) => {
logger.info(`connection closed, hadError=${hadError}`);
logger.trace(`connection closed, hadError=${hadError}`);
handlers.onDisconnect();
});
});

View File

@ -462,9 +462,10 @@ async function processPlanchetGenerate(
*/
async function processPlanchetExchangeRequest(
ws: InternalWalletState,
withdrawalGroup: WithdrawalGroupRecord,
wgContext: WithdrawalGroupContext,
coinIdx: number,
): Promise<WithdrawResponse | undefined> {
const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
logger.info(
`processing planchet exchange request ${withdrawalGroup.withdrawalGroupId}/${coinIdx}`,
);
@ -593,8 +594,9 @@ async function processPlanchetExchangeRequest(
*/
async function processPlanchetExchangeBatchRequest(
ws: InternalWalletState,
withdrawalGroup: WithdrawalGroupRecord,
wgContext: WithdrawalGroupContext,
): Promise<WithdrawBatchResponse | undefined> {
const withdrawalGroup: WithdrawalGroupRecord = wgContext.wgRecord;
logger.info(
`processing planchet exchange batch request ${withdrawalGroup.withdrawalGroupId}`,
);
@ -671,10 +673,11 @@ async function processPlanchetExchangeBatchRequest(
async function processPlanchetVerifyAndStoreCoin(
ws: InternalWalletState,
withdrawalGroup: WithdrawalGroupRecord,
wgContext: WithdrawalGroupContext,
coinIdx: number,
resp: WithdrawResponse,
): Promise<void> {
const withdrawalGroup = wgContext.wgRecord;
const d = await ws.db
.mktx((x) => [x.withdrawalGroups, x.planchets, x.denominations])
.runReadOnly(async (tx) => {
@ -786,6 +789,8 @@ async function processPlanchetVerifyAndStoreCoin(
const planchetCoinPub = planchet.coinPub;
wgContext.planchetsFinished.add(planchet.coinPub);
// Check if this is the first time that the whole
// withdrawal succeeded. If so, mark the withdrawal
// group as finished.
@ -811,6 +816,8 @@ async function processPlanchetVerifyAndStoreCoin(
if (firstSuccess) {
ws.notify({
type: NotificationType.CoinWithdrawn,
numTotal: wgContext.numPlanchets,
numWithdrawn: wgContext.planchetsFinished.size,
});
}
}
@ -983,6 +990,21 @@ enum BankStatusResultCode {
Aborted = "aborted",
}
/**
* Withdrawal context that is kept in-memory.
*
* Used to store some cached info during a withdrawal operation.
*/
export interface WithdrawalGroupContext {
numPlanchets: number;
planchetsFinished: Set<string>;
/**
* Cached withdrawal group record from the database.
*/
wgRecord: WithdrawalGroupRecord;
}
export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
@ -1122,8 +1144,27 @@ export async function processWithdrawalGroup(
.map((x) => x.count)
.reduce((a, b) => a + b);
const wgContext: WithdrawalGroupContext = {
numPlanchets: numTotalCoins,
planchetsFinished: new Set<string>(),
wgRecord: withdrawalGroup,
};
let work: Promise<void>[] = [];
await ws.db
.mktx((x) => [x.planchets])
.runReadOnly(async (tx) => {
const planchets = await tx.planchets.indexes.byGroup.getAll(
withdrawalGroupId,
);
for (const p of planchets) {
if (p.planchetStatus === PlanchetStatus.WithdrawalDone) {
wgContext.planchetsFinished.add(p.coinPub);
}
}
});
for (let i = 0; i < numTotalCoins; i++) {
work.push(processPlanchetGenerate(ws, withdrawalGroup, i));
}
@ -1134,7 +1175,7 @@ export async function processWithdrawalGroup(
work = [];
if (ws.batchWithdrawal) {
const resp = await processPlanchetExchangeBatchRequest(ws, withdrawalGroup);
const resp = await processPlanchetExchangeBatchRequest(ws, wgContext);
if (!resp) {
throw Error("unable to do batch withdrawal");
}
@ -1142,7 +1183,7 @@ export async function processWithdrawalGroup(
work.push(
processPlanchetVerifyAndStoreCoin(
ws,
withdrawalGroup,
wgContext,
coinIdx,
resp.ev_sigs[coinIdx],
),
@ -1150,16 +1191,12 @@ export async function processWithdrawalGroup(
}
} else {
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
const resp = await processPlanchetExchangeRequest(
ws,
withdrawalGroup,
coinIdx,
);
const resp = await processPlanchetExchangeRequest(ws, wgContext, coinIdx);
if (!resp) {
continue;
}
work.push(
processPlanchetVerifyAndStoreCoin(ws, withdrawalGroup, coinIdx, resp),
processPlanchetVerifyAndStoreCoin(ws, wgContext, coinIdx, resp),
);
}
}

View File

@ -110,7 +110,6 @@ export async function createRemoteWallet(
}
h.promiseCapability.resolve(m as any);
} else if (type === "notification") {
logger.info("got notification");
if (args.notificationHandler) {
args.notificationHandler((m as any).payload);
}