wallet-core: improve crypto worker error handling

This commit is contained in:
Florian Dold 2022-09-26 14:40:06 +02:00
parent 360cb80610
commit 25eb7624b3
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
5 changed files with 612 additions and 87 deletions

File diff suppressed because it is too large Load Diff

View File

@ -22,7 +22,9 @@
/**
* Imports.
*/
import { Logger } from "@gnu-taler/taler-util";
import { Logger, TalerErrorCode } from "@gnu-taler/taler-util";
import { TalerError } from "../../errors.js";
import { openPromise } from "../../util/promiseUtils.js";
import { timer, performanceNow, TimerHandle } from "../../util/timer.js";
import { nullCrypto, TalerCryptoInterface } from "../cryptoImplementation.js";
import { CryptoWorker } from "./cryptoWorkerInterface.js";
@ -32,7 +34,7 @@ const logger = new Logger("cryptoApi.ts");
/**
* State of a crypto worker.
*/
interface WorkerState {
interface WorkerInfo {
/**
* The actual worker thread.
*/
@ -64,6 +66,8 @@ interface WorkItem {
* Time when the work was submitted to a (non-busy) worker thread.
*/
startTime: BigInt;
state: WorkItemState;
}
/**
@ -92,12 +96,18 @@ export class CryptoApiStoppedError extends Error {
}
}
export enum WorkItemState {
Pending = 1,
Running = 2,
Finished = 3,
}
/**
* Dispatcher for cryptographic operations to underlying crypto workers.
*/
export class CryptoDispatcher {
private nextRpcId = 1;
private workers: WorkerState[];
private workers: WorkerInfo[];
private workQueues: WorkItem[][];
private workerFactory: CryptoWorkerFactory;
@ -141,7 +151,7 @@ export class CryptoDispatcher {
/**
* Start a worker (if not started) and set as busy.
*/
wake(ws: WorkerState, work: WorkItem): void {
wake(ws: WorkerInfo, work: WorkItem): void {
if (this.stopped) {
return;
}
@ -167,10 +177,11 @@ export class CryptoDispatcher {
};
this.resetWorkerTimeout(ws);
work.startTime = performanceNow();
work.state = WorkItemState.Running;
timer.after(0, () => worker.postMessage(msg));
}
resetWorkerTimeout(ws: WorkerState): void {
resetWorkerTimeout(ws: WorkerInfo): void {
if (ws.idleTimeoutHandle !== null) {
ws.idleTimeoutHandle.clear();
ws.idleTimeoutHandle = null;
@ -187,7 +198,7 @@ export class CryptoDispatcher {
ws.idleTimeoutHandle.unref();
}
handleWorkerError(ws: WorkerState, e: any): void {
handleWorkerError(ws: WorkerInfo, e: any): void {
if (ws.currentWorkItem) {
logger.error(`error in worker during ${ws.currentWorkItem.operation}`, e);
} else {
@ -203,6 +214,7 @@ export class CryptoDispatcher {
logger.error(e as string);
}
if (ws.currentWorkItem !== null) {
ws.currentWorkItem.state = WorkItemState.Finished;
ws.currentWorkItem.reject(e);
ws.currentWorkItem = null;
this.numBusy--;
@ -210,7 +222,7 @@ export class CryptoDispatcher {
this.findWork(ws);
}
private findWork(ws: WorkerState): void {
private findWork(ws: WorkerInfo): void {
// try to find more work for this worker
for (let i = 0; i < NUM_PRIO; i++) {
const q = this.workQueues[NUM_PRIO - i - 1];
@ -225,26 +237,38 @@ export class CryptoDispatcher {
}
}
handleWorkerMessage(ws: WorkerState, msg: any): void {
handleWorkerMessage(ws: WorkerInfo, msg: any): void {
const id = msg.data.id;
if (typeof id !== "number") {
console.error("rpc id must be number");
logger.error("rpc id must be number");
return;
}
const currentWorkItem = ws.currentWorkItem;
ws.currentWorkItem = null;
this.numBusy--;
this.findWork(ws);
if (!currentWorkItem) {
console.error("unsolicited response from worker");
logger.error("unsolicited response from worker");
return;
}
if (id !== currentWorkItem.rpcId) {
console.error(`RPC with id ${id} has no registry entry`);
logger.error(`RPC with id ${id} has no registry entry`);
return;
}
if (currentWorkItem.state === WorkItemState.Running) {
this.numBusy--;
currentWorkItem.state = WorkItemState.Finished;
if (msg.data.type === "success") {
currentWorkItem.resolve(msg.data.result);
} else if (msg.data.type === "error") {
currentWorkItem.reject(
TalerError.fromDetail(TalerErrorCode.WALLET_CRYPTO_WORKER_ERROR, {
innerError: msg.data.error,
}),
);
} else {
currentWorkItem.reject(new Error("bad message from crypto worker"));
}
}
this.findWork(ws);
}
cryptoApi: TalerCryptoInterface;
@ -258,7 +282,7 @@ export class CryptoDispatcher {
this.cryptoApi = fns;
this.workerFactory = workerFactory;
this.workers = new Array<WorkerState>(workerFactory.getConcurrency());
this.workers = new Array<WorkerInfo>(workerFactory.getConcurrency());
for (let i = 0; i < this.workers.length; i++) {
this.workers[i] = {
@ -282,36 +306,42 @@ export class CryptoDispatcher {
if (this.stopped) {
throw new CryptoApiStoppedError();
}
const p: Promise<T> = new Promise<T>((resolve, reject) => {
const rpcId = this.nextRpcId++;
const myProm = openPromise<T>();
const workItem: WorkItem = {
operation,
req,
resolve,
reject,
resolve: myProm.resolve,
reject: myProm.reject,
rpcId,
startTime: BigInt(0),
state: WorkItemState.Pending,
};
let scheduled = false;
if (this.numBusy === this.workers.length) {
// All workers are busy, queue work item
const q = this.workQueues[priority];
if (!q) {
throw Error("assertion failed");
}
this.workQueues[priority].push(workItem);
return;
scheduled = true;
}
if (!scheduled) {
for (const ws of this.workers) {
if (ws.currentWorkItem !== null) {
continue;
}
this.wake(ws, workItem);
return;
scheduled = true;
break;
}
}
if (!scheduled) {
// Could not schedule work.
throw Error("assertion failed");
});
}
// Make sure that we wait for the result while a timer is active
// to prevent the event loop from dying, as just waiting for a promise
@ -324,14 +354,20 @@ export class CryptoDispatcher {
logger.warn(`crypto RPC call ('${operation}') timed out`);
timedOut = true;
reject(new Error(`crypto RPC call ('${operation}') timed out`));
if (workItem.state === WorkItemState.Running) {
workItem.state = WorkItemState.Finished;
this.numBusy--;
}
});
p.then((x) => {
myProm.promise
.then((x) => {
if (timedOut) {
return;
}
timeout.clear();
resolve(x);
}).catch((x) => {
})
.catch((x) => {
logger.info(`crypto RPC call ${operation} threw`);
if (timedOut) {
return;

View File

@ -22,6 +22,7 @@ import { CryptoWorker } from "./cryptoWorkerInterface.js";
import os from "os";
import { Logger } from "@gnu-taler/taler-util";
import { nativeCryptoR } from "../cryptoImplementation.js";
import { getErrorDetailFromException } from "../../errors.js";
const logger = new Logger("nodeThreadWorker.ts");
@ -69,58 +70,72 @@ const workerCode = `
* a message.
*/
export function handleWorkerMessage(msg: any): void {
const handleRequest = async (): Promise<void> => {
const req = msg.req;
if (typeof req !== "object") {
console.error("request must be an object");
logger.error("request must be an object");
return;
}
const id = msg.id;
if (typeof id !== "number") {
console.error("RPC id must be number");
logger.error("RPC id must be number");
return;
}
const operation = msg.operation;
if (typeof operation !== "string") {
console.error("RPC operation must be string");
logger.error("RPC operation must be string");
return;
}
const handleRequest = async (): Promise<void> => {
const impl = nativeCryptoR;
if (!(operation in impl)) {
console.error(`crypto operation '${operation}' not found`);
logger.error(`crypto operation '${operation}' not found`);
return;
}
let responseMsg: any;
try {
const result = await (impl as any)[operation](impl, req);
responseMsg = { data: { type: "success", result, id } };
} catch (e: any) {
logger.error(`error during operation: ${e.stack ?? e.toString()}`);
responseMsg = {
data: {
type: "error",
error: getErrorDetailFromException(e),
id,
},
};
}
try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const _r = "require";
const worker_threads: typeof import("worker_threads") =
module[_r]("worker_threads");
// const worker_threads = require("worker_threads");
const p = worker_threads.parentPort;
worker_threads.parentPort?.postMessage;
if (p) {
p.postMessage({ data: { result, id } });
p.postMessage(responseMsg);
} else {
console.error("parent port not available (not running in thread?");
logger.error("parent port not available (not running in thread?");
}
} catch (e) {
console.error("error during operation", e);
} catch (e: any) {
logger.error(
`error sending back operation result: ${e.stack ?? e.toString()}`,
);
return;
}
};
handleRequest().catch((e) => {
console.error("error in node worker", e);
logger.error("error in node worker", e);
});
}
export function handleWorkerError(e: Error): void {
console.log("got error from worker", e);
logger.error(`got error from worker: ${e.stack ?? e.toString()}`);
}
export class NodeThreadCryptoWorkerFactory implements CryptoWorkerFactory {
@ -161,7 +176,7 @@ class NodeThreadCryptoWorker implements CryptoWorker {
this.nodeWorker = new worker_threads.Worker(workerCode, { eval: true });
this.nodeWorker.on("error", (err: Error) => {
console.error("error in node worker:", err);
logger.error("error in node worker:", err);
if (this.onerror) {
this.onerror(err);
}

View File

@ -15,6 +15,7 @@
*/
import { Logger } from "@gnu-taler/taler-util";
import { getErrorDetailFromException } from "../../errors.js";
import {
nativeCryptoR,
TalerCryptoInterfaceR,
@ -139,7 +140,7 @@ export class SynchronousCryptoWorker {
private dispatchMessage(msg: any): void {
if (this.onmessage) {
this.onmessage({ data: msg });
this.onmessage(msg);
}
}
@ -151,20 +152,27 @@ export class SynchronousCryptoWorker {
const impl = this.cryptoImplR;
if (!(operation in impl)) {
console.error(`crypto operation '${operation}' not found`);
logger.error(`crypto operation '${operation}' not found`);
return;
}
let result: any;
let responseMsg: any;
try {
result = await (impl as any)[operation](impl, req);
const result = await (impl as any)[operation](impl, req);
responseMsg = { data: { type: "success", result, id } };
} catch (e: any) {
logger.error(`error during operation '${operation}': ${e}`);
return;
logger.error(`error during operation: ${e.stack ?? e.toString()}`);
responseMsg = {
data: {
type: "error",
id,
error: getErrorDetailFromException(e),
},
};
}
try {
setTimeout(() => this.dispatchMessage({ result, id }), 0);
setTimeout(() => this.dispatchMessage(responseMsg), 0);
} catch (e) {
logger.error("got error during dispatch", e);
}
@ -176,22 +184,22 @@ export class SynchronousCryptoWorker {
postMessage(msg: any): void {
const req = msg.req;
if (typeof req !== "object") {
console.error("request must be an object");
logger.error("request must be an object");
return;
}
const id = msg.id;
if (typeof id !== "number") {
console.error("RPC id must be number");
logger.error("RPC id must be number");
return;
}
const operation = msg.operation;
if (typeof operation !== "string") {
console.error("RPC operation must be string");
logger.error("RPC operation must be string");
return;
}
this.handleRequest(operation, id, req).catch((e) => {
console.error("Error while handling crypto request:", e);
logger.error("Error while handling crypto request:", e);
});
}

View File

@ -73,6 +73,9 @@ export interface DetailsMap {
[TalerErrorCode.WALLET_PAY_MERCHANT_SERVER_ERROR]: {
requestError: TalerErrorDetail;
};
[TalerErrorCode.WALLET_CRYPTO_WORKER_ERROR]: {
innerError: TalerErrorDetail;
};
}
type ErrBody<Y> = Y extends keyof DetailsMap ? DetailsMap[Y] : never;