wallet-core: test crypto dispatcher, fix timeout handling
This commit is contained in:
parent
9c63d67781
commit
60374078f4
@ -0,0 +1,130 @@
|
|||||||
|
/*
|
||||||
|
This file is part of GNU Taler
|
||||||
|
(C) 2023 Taler Systems S.A.
|
||||||
|
|
||||||
|
GNU Taler is free software; you can redistribute it and/or modify it under the
|
||||||
|
terms of the GNU General Public License as published by the Free Software
|
||||||
|
Foundation; either version 3, or (at your option) any later version.
|
||||||
|
|
||||||
|
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||||
|
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||||
|
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
|
||||||
|
|
||||||
|
You should have received a copy of the GNU General Public License along with
|
||||||
|
GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
|
||||||
|
*/
|
||||||
|
|
||||||
|
import test from "ava";
|
||||||
|
import { CryptoDispatcher, CryptoWorkerFactory } from "./crypto-dispatcher.js";
|
||||||
|
import { CryptoWorker, CryptoWorkerResponseMessage } from "./cryptoWorkerInterface.js";
|
||||||
|
import { SynchronousCryptoWorkerFactoryNode } from "./synchronousWorkerFactoryNode.js";
|
||||||
|
import { processRequestWithImpl } from "./worker-common.js";
|
||||||
|
|
||||||
|
|
||||||
|
export class MyCryptoWorker implements CryptoWorker {
|
||||||
|
/**
|
||||||
|
* Function to be called when we receive a message from the worker thread.
|
||||||
|
*/
|
||||||
|
onmessage: undefined | ((m: any) => void) = undefined;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Function to be called when we receive an error from the worker thread.
|
||||||
|
*/
|
||||||
|
onerror: undefined | ((m: any) => void) = undefined;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an event listener for either an "error" or "message" event.
|
||||||
|
*/
|
||||||
|
addEventListener(event: "message" | "error", fn: (x: any) => void): void {
|
||||||
|
switch (event) {
|
||||||
|
case "message":
|
||||||
|
this.onmessage = fn;
|
||||||
|
break;
|
||||||
|
case "error":
|
||||||
|
this.onerror = fn;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private dispatchMessage(msg: any): void {
|
||||||
|
if (this.onmessage) {
|
||||||
|
this.onmessage(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a message to the worker thread.
|
||||||
|
*/
|
||||||
|
postMessage(msg: any): void {
|
||||||
|
const handleRequest = async () => {
|
||||||
|
let responseMsg: CryptoWorkerResponseMessage;
|
||||||
|
if (msg.operation === "testSuccess") {
|
||||||
|
responseMsg = {
|
||||||
|
id: msg.id,
|
||||||
|
type: "success",
|
||||||
|
result: {
|
||||||
|
testResult: 42,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (msg.operation === "testError") {
|
||||||
|
responseMsg = {
|
||||||
|
id: msg.id,
|
||||||
|
type: "error",
|
||||||
|
error: {
|
||||||
|
code: 42,
|
||||||
|
hint: "bla",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (msg.operation === "testTimeout") {
|
||||||
|
// Don't respond
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
setTimeout(() => this.dispatchMessage(responseMsg), 0);
|
||||||
|
} catch (e) {
|
||||||
|
console.error("got error during dispatch", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
handleRequest().catch((e) => {
|
||||||
|
console.error("Error while handling crypto request:", e);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forcibly terminate the worker thread.
|
||||||
|
*/
|
||||||
|
terminate(): void {
|
||||||
|
// This is a no-op.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
export class MyCryptoWorkerFactory implements CryptoWorkerFactory {
|
||||||
|
startWorker(): CryptoWorker {
|
||||||
|
return new MyCryptoWorker();
|
||||||
|
}
|
||||||
|
|
||||||
|
getConcurrency(): number {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
test("continues after error", async (t) => {
|
||||||
|
const cryptoDisp = new CryptoDispatcher(
|
||||||
|
new MyCryptoWorkerFactory(),
|
||||||
|
);
|
||||||
|
const resp1 = await cryptoDisp.doRpc("testSuccess", 0, {});
|
||||||
|
t.assert((resp1 as any).testResult === 42);
|
||||||
|
const exc = await t.throwsAsync(async() => {
|
||||||
|
const resp2 = await cryptoDisp.doRpc("testError", 0, {});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Check that it still works after one error.
|
||||||
|
const resp2 = await cryptoDisp.doRpc("testSuccess", 0, {});
|
||||||
|
t.assert((resp2 as any).testResult === 42);
|
||||||
|
|
||||||
|
// Check that it still works after timeout.
|
||||||
|
const resp3 = await cryptoDisp.doRpc("testSuccess", 0, {});
|
||||||
|
t.assert((resp3 as any).testResult === 42);
|
||||||
|
});
|
@ -203,13 +203,7 @@ export class CryptoDispatcher {
|
|||||||
ws.idleTimeoutHandle.unref();
|
ws.idleTimeoutHandle.unref();
|
||||||
}
|
}
|
||||||
|
|
||||||
handleWorkerError(ws: WorkerInfo, e: any): void {
|
private resetWorker(ws: WorkerInfo, e: any): void {
|
||||||
if (ws.currentWorkItem) {
|
|
||||||
logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e);
|
|
||||||
} else {
|
|
||||||
logger.error("error in worker", e);
|
|
||||||
}
|
|
||||||
logger.error(e.message);
|
|
||||||
try {
|
try {
|
||||||
if (ws.w) {
|
if (ws.w) {
|
||||||
ws.w.terminate();
|
ws.w.terminate();
|
||||||
@ -227,6 +221,16 @@ export class CryptoDispatcher {
|
|||||||
this.findWork(ws);
|
this.findWork(ws);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleWorkerError(ws: WorkerInfo, e: any): void {
|
||||||
|
if (ws.currentWorkItem) {
|
||||||
|
logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e);
|
||||||
|
} else {
|
||||||
|
logger.error("error in worker", e);
|
||||||
|
}
|
||||||
|
logger.error(e.message);
|
||||||
|
this.resetWorker(ws, e);
|
||||||
|
}
|
||||||
|
|
||||||
private findWork(ws: WorkerInfo): void {
|
private findWork(ws: WorkerInfo): void {
|
||||||
// try to find more work for this worker
|
// try to find more work for this worker
|
||||||
for (let i = 0; i < NUM_PRIO; i++) {
|
for (let i = 0; i < NUM_PRIO; i++) {
|
||||||
@ -304,7 +308,7 @@ export class CryptoDispatcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private doRpc<T>(
|
doRpc<T>(
|
||||||
operation: string,
|
operation: string,
|
||||||
priority: number,
|
priority: number,
|
||||||
req: unknown,
|
req: unknown,
|
||||||
@ -355,30 +359,22 @@ export class CryptoDispatcher {
|
|||||||
// (The worker child process won't keep us alive either, because we un-ref
|
// (The worker child process won't keep us alive either, because we un-ref
|
||||||
// it to make sure it doesn't keep us alive if there is no work.)
|
// it to make sure it doesn't keep us alive if there is no work.)
|
||||||
return new Promise<T>((resolve, reject) => {
|
return new Promise<T>((resolve, reject) => {
|
||||||
let timedOut = false;
|
let timeoutHandle: TimerHandle | undefined = undefined;
|
||||||
const timeout = timer.after(10000, () => {
|
const timeoutMs = 5000;
|
||||||
logger.warn(`crypto RPC call ('${operation}') timed out`);
|
const onTimeout = () => {
|
||||||
timedOut = true;
|
// FIXME: Maybe destroy and re-init worker if request is in processing
|
||||||
reject(new Error(`crypto RPC call ('${operation}') timed out`));
|
// state and really taking too long?
|
||||||
if (workItem.state === WorkItemState.Running) {
|
logger.warn(`crypto RPC call ('${operation}') has been queued for a long time`);
|
||||||
workItem.state = WorkItemState.Finished;
|
timeoutHandle = timer.after(timeoutMs, onTimeout);
|
||||||
this.numBusy--;
|
};
|
||||||
}
|
|
||||||
});
|
|
||||||
myProm.promise
|
myProm.promise
|
||||||
.then((x) => {
|
.then((x) => {
|
||||||
if (timedOut) {
|
timeoutHandle?.clear();
|
||||||
return;
|
|
||||||
}
|
|
||||||
timeout.clear();
|
|
||||||
resolve(x);
|
resolve(x);
|
||||||
})
|
})
|
||||||
.catch((x) => {
|
.catch((x) => {
|
||||||
logger.info(`crypto RPC call ${operation} threw`);
|
logger.info(`crypto RPC call ${operation} threw`);
|
||||||
if (timedOut) {
|
timeoutHandle?.clear();
|
||||||
return;
|
|
||||||
}
|
|
||||||
timeout.clear();
|
|
||||||
reject(x);
|
reject(x);
|
||||||
});
|
});
|
||||||
});
|
});
|
@ -21,7 +21,7 @@ import { Logger } from "@gnu-taler/taler-util";
|
|||||||
import os from "os";
|
import os from "os";
|
||||||
import url from "url";
|
import url from "url";
|
||||||
import { nativeCryptoR } from "../cryptoImplementation.js";
|
import { nativeCryptoR } from "../cryptoImplementation.js";
|
||||||
import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
|
import { CryptoWorkerFactory } from "./crypto-dispatcher.js";
|
||||||
import { CryptoWorker } from "./cryptoWorkerInterface.js";
|
import { CryptoWorker } from "./cryptoWorkerInterface.js";
|
||||||
import { processRequestWithImpl } from "./worker-common.js";
|
import { processRequestWithImpl } from "./worker-common.js";
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
/**
|
/**
|
||||||
* Imports.
|
* Imports.
|
||||||
*/
|
*/
|
||||||
import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
|
import { CryptoWorkerFactory } from "./crypto-dispatcher.js";
|
||||||
import { CryptoWorker } from "./cryptoWorkerInterface.js";
|
import { CryptoWorker } from "./cryptoWorkerInterface.js";
|
||||||
import { SynchronousCryptoWorkerNode } from "./synchronousWorkerNode.js";
|
import { SynchronousCryptoWorkerNode } from "./synchronousWorkerNode.js";
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
/**
|
/**
|
||||||
* Imports.
|
* Imports.
|
||||||
*/
|
*/
|
||||||
import { CryptoWorkerFactory } from "./cryptoDispatcher.js";
|
import { CryptoWorkerFactory } from "./crypto-dispatcher.js";
|
||||||
import { CryptoWorker } from "./cryptoWorkerInterface.js";
|
import { CryptoWorker } from "./cryptoWorkerInterface.js";
|
||||||
import { SynchronousCryptoWorkerPlain } from "./synchronousWorkerPlain.js";
|
import { SynchronousCryptoWorkerPlain } from "./synchronousWorkerPlain.js";
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ export type { CryptoWorker } from "./crypto/workers/cryptoWorkerInterface.js";
|
|||||||
export {
|
export {
|
||||||
CryptoWorkerFactory,
|
CryptoWorkerFactory,
|
||||||
CryptoDispatcher,
|
CryptoDispatcher,
|
||||||
} from "./crypto/workers/cryptoDispatcher.js";
|
} from "./crypto/workers/crypto-dispatcher.js";
|
||||||
|
|
||||||
export * from "./pending-types.js";
|
export * from "./pending-types.js";
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ import {
|
|||||||
CoinRefreshRequest,
|
CoinRefreshRequest,
|
||||||
RefreshReason,
|
RefreshReason,
|
||||||
} from "@gnu-taler/taler-util";
|
} from "@gnu-taler/taler-util";
|
||||||
import { CryptoDispatcher } from "./crypto/workers/cryptoDispatcher.js";
|
import { CryptoDispatcher } from "./crypto/workers/crypto-dispatcher.js";
|
||||||
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
|
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
|
||||||
import { ExchangeDetailsRecord, ExchangeRecord, WalletStoresV1 } from "./db.js";
|
import { ExchangeDetailsRecord, ExchangeRecord, WalletStoresV1 } from "./db.js";
|
||||||
import { PendingOperationsResponse } from "./pending-types.js";
|
import { PendingOperationsResponse } from "./pending-types.js";
|
||||||
|
@ -52,7 +52,7 @@ import {
|
|||||||
DerivedRefreshSession,
|
DerivedRefreshSession,
|
||||||
RefreshNewDenomInfo,
|
RefreshNewDenomInfo,
|
||||||
} from "../crypto/cryptoTypes.js";
|
} from "../crypto/cryptoTypes.js";
|
||||||
import { CryptoApiStoppedError } from "../crypto/workers/cryptoDispatcher.js";
|
import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js";
|
||||||
import {
|
import {
|
||||||
CoinRecord,
|
CoinRecord,
|
||||||
CoinSourceType,
|
CoinSourceType,
|
||||||
|
@ -106,7 +106,7 @@ import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
|
|||||||
import {
|
import {
|
||||||
CryptoDispatcher,
|
CryptoDispatcher,
|
||||||
CryptoWorkerFactory,
|
CryptoWorkerFactory,
|
||||||
} from "./crypto/workers/cryptoDispatcher.js";
|
} from "./crypto/workers/crypto-dispatcher.js";
|
||||||
import { clearDatabase } from "./db-utils.js";
|
import { clearDatabase } from "./db-utils.js";
|
||||||
import {
|
import {
|
||||||
AuditorTrustRecord,
|
AuditorTrustRecord,
|
||||||
|
Loading…
Reference in New Issue
Block a user