wallet: address crypto worker hang and warning about worker termination

This commit is contained in:
Florian Dold 2022-03-23 13:11:36 +01:00
parent 739c2f9337
commit c539d1803c
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
24 changed files with 95 additions and 338 deletions

View File

@ -1124,7 +1124,9 @@ testCli.subcommand("tvgcheck", "tvgcheck").action(async (args) => {
testCli.subcommand("cryptoworker", "cryptoworker").action(async (args) => {
const workerFactory = new NodeThreadCryptoWorkerFactory();
const cryptoApi = new CryptoApi(workerFactory);
const res = await cryptoApi.hashString("foo");
const input = "foo";
console.log(`testing crypto worker by hashing string '${input}'`);
const res = await cryptoApi.hashString(input);
console.log(res);
});

View File

@ -1,258 +0,0 @@
/*
This file is part of GNU Taler
(C) 2019 GNUnet e.V.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
/**
* Common interface of the internal wallet state. This object is passed
* to the various operations (exchange management, withdrawal, refresh, reserve
* management, etc.).
*
* Some operations can be accessed via this state object. This allows mutual
* recursion between operations, without having cyclic dependencies between
* the respective TypeScript files.
*
* (You can think of this as a "header file" for the wallet implementation.)
*/
/**
* Imports.
*/
import {
WalletNotification,
BalancesResponse,
AmountJson,
DenominationPubKey,
AbsoluteTime,
TalerProtocolTimestamp,
} from "@gnu-taler/taler-util";
import { CryptoApi } from "./crypto/workers/cryptoApi.js";
import { ExchangeDetailsRecord, ExchangeRecord, WalletStoresV1 } from "./db.js";
import { PendingOperationsResponse } from "./pending-types.js";
import { AsyncOpMemoMap, AsyncOpMemoSingle } from "./util/asyncMemo.js";
import { HttpRequestLibrary } from "./util/http.js";
import { AsyncCondition } from "./util/promiseUtils.js";
import {
DbAccess,
GetReadOnlyAccess,
GetReadWriteAccess,
} from "./util/query.js";
import { TimerGroup } from "./util/timer.js";
export const EXCHANGE_COINS_LOCK = "exchange-coins-lock";
export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock";
export interface TrustInfo {
isTrusted: boolean;
isAudited: boolean;
}
export interface MerchantInfo {
protocolVersionCurrent: number;
}
/**
* Interface for merchant-related operations.
*/
export interface MerchantOperations {
getMerchantInfo(
ws: InternalWalletState,
merchantBaseUrl: string,
): Promise<MerchantInfo>;
}
export interface ReserveOperations {
processReserve(
ws: InternalWalletState,
reservePub: string,
forceNow?: boolean,
): Promise<void>;
}
/**
* Interface for exchange-related operations.
*/
export interface ExchangeOperations {
// FIXME: Should other operations maybe always use
// updateExchangeFromUrl?
getExchangeDetails(
tx: GetReadOnlyAccess<{
exchanges: typeof WalletStoresV1.exchanges;
exchangeDetails: typeof WalletStoresV1.exchangeDetails;
}>,
exchangeBaseUrl: string,
): Promise<ExchangeDetailsRecord | undefined>;
getExchangeTrust(
ws: InternalWalletState,
exchangeInfo: ExchangeRecord,
): Promise<TrustInfo>;
updateExchangeFromUrl(
ws: InternalWalletState,
baseUrl: string,
acceptedFormat?: string[],
forceNow?: boolean,
): Promise<{
exchange: ExchangeRecord;
exchangeDetails: ExchangeDetailsRecord;
}>;
}
export interface RecoupOperations {
createRecoupGroup(
ws: InternalWalletState,
tx: GetReadWriteAccess<{
recoupGroups: typeof WalletStoresV1.recoupGroups;
denominations: typeof WalletStoresV1.denominations;
refreshGroups: typeof WalletStoresV1.refreshGroups;
coins: typeof WalletStoresV1.coins;
}>,
coinPubs: string[],
): Promise<string>;
processRecoupGroup(
ws: InternalWalletState,
recoupGroupId: string,
forceNow?: boolean,
): Promise<void>;
}
export interface DenomInfo {
/**
* Value of one coin of the denomination.
*/
value: AmountJson;
/**
* The denomination public key.
*/
denomPub: DenominationPubKey;
/**
* Hash of the denomination public key.
* Stored in the database for faster lookups.
*/
denomPubHash: string;
/**
* Fee for withdrawing.
*/
feeWithdraw: AmountJson;
/**
* Fee for depositing.
*/
feeDeposit: AmountJson;
/**
* Fee for refreshing.
*/
feeRefresh: AmountJson;
/**
* Fee for refunding.
*/
feeRefund: AmountJson;
/**
* Validity start date of the denomination.
*/
stampStart: TalerProtocolTimestamp;
/**
* Date after which the currency can't be withdrawn anymore.
*/
stampExpireWithdraw: TalerProtocolTimestamp;
/**
* Date after the denomination officially doesn't exist anymore.
*/
stampExpireLegal: TalerProtocolTimestamp;
/**
* Data after which coins of this denomination can't be deposited anymore.
*/
stampExpireDeposit: TalerProtocolTimestamp;
}
export type NotificationListener = (n: WalletNotification) => void;
/**
* Internal, shard wallet state that is used by the implementation
* of wallet operations.
*
* FIXME: This should not be exported anywhere from the taler-wallet-core package,
* as it's an opaque implementation detail.
*/
export interface InternalWalletState {
memoProcessReserve: AsyncOpMemoMap<void>;
memoMakePlanchet: AsyncOpMemoMap<void>;
memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>;
memoGetBalance: AsyncOpMemoSingle<BalancesResponse>;
memoProcessRefresh: AsyncOpMemoMap<void>;
memoProcessRecoup: AsyncOpMemoMap<void>;
memoProcessDeposit: AsyncOpMemoMap<void>;
cryptoApi: CryptoApi;
timerGroup: TimerGroup;
stopped: boolean;
insecureTrustExchange: boolean;
/**
* Asynchronous condition to interrupt the sleep of the
* retry loop.
*
* Used to allow processing of new work faster.
*/
latch: AsyncCondition;
listeners: NotificationListener[];
initCalled: boolean;
merchantInfoCache: Record<string, MerchantInfo>;
exchangeOps: ExchangeOperations;
recoupOps: RecoupOperations;
merchantOps: MerchantOperations;
reserveOps: ReserveOperations;
getDenomInfo(
ws: InternalWalletState,
tx: GetReadOnlyAccess<{
denominations: typeof WalletStoresV1.denominations;
}>,
exchangeBaseUrl: string,
denomPubHash: string,
): Promise<DenomInfo | undefined>;
db: DbAccess<typeof WalletStoresV1>;
http: HttpRequestLibrary;
notify(n: WalletNotification): void;
addNotificationListener(f: (n: WalletNotification) => void): void;
/**
* Stop ongoing processing.
*/
stop(): void;
/**
* Run an async function after acquiring a list of locks, identified
* by string tokens.
*/
runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>;
runUntilDone(req?: { maxRetries?: number }): Promise<void>;
}

View File

@ -75,7 +75,7 @@ interface WorkerState {
/**
* Timer to terminate the worker if it's not busy enough.
*/
terminationTimerHandle: timer.TimerHandle | null;
idleTimeoutHandle: timer.TimerHandle | null;
}
interface WorkItem {
@ -114,6 +114,13 @@ export interface CryptoWorkerFactory {
getConcurrency(): number;
}
export class CryptoApiStoppedError extends Error {
constructor() {
super("Crypto API stopped");
Object.setPrototypeOf(this, CryptoApiStoppedError.prototype);
}
}
/**
* Crypto API that interfaces manages a background crypto thread
* for the execution of expensive operations.
@ -140,25 +147,25 @@ export class CryptoApi {
*/
terminateWorkers(): void {
for (const worker of this.workers) {
if (worker.w) {
logger.trace("terminating worker");
worker.w.terminate();
if (worker.terminationTimerHandle) {
worker.terminationTimerHandle.clear();
worker.terminationTimerHandle = null;
if (worker.idleTimeoutHandle) {
worker.idleTimeoutHandle.clear();
worker.idleTimeoutHandle = null;
}
if (worker.currentWorkItem) {
worker.currentWorkItem.reject(Error("explicitly terminated"));
worker.currentWorkItem = null;
}
if (worker.w) {
logger.trace("terminating worker");
worker.w.terminate();
worker.w = null;
}
}
}
stop(): void {
this.terminateWorkers();
this.stopped = true;
this.terminateWorkers();
}
/**
@ -166,8 +173,7 @@ export class CryptoApi {
*/
wake(ws: WorkerState, work: WorkItem): void {
if (this.stopped) {
logger.trace("cryptoApi is stopped");
return;
throw new CryptoApiStoppedError();
}
if (ws.currentWorkItem !== null) {
throw Error("assertion failed");
@ -195,19 +201,20 @@ export class CryptoApi {
}
resetWorkerTimeout(ws: WorkerState): void {
if (ws.terminationTimerHandle !== null) {
ws.terminationTimerHandle.clear();
ws.terminationTimerHandle = null;
if (ws.idleTimeoutHandle !== null) {
ws.idleTimeoutHandle.clear();
ws.idleTimeoutHandle = null;
}
const destroy = (): void => {
logger.trace("destroying crypto worker after idle timeout");
// terminate worker if it's idle
if (ws.w && ws.currentWorkItem === null) {
ws.w.terminate();
ws.w = null;
}
};
ws.terminationTimerHandle = timer.after(15 * 1000, destroy);
//ws.terminationTimerHandle.unref();
ws.idleTimeoutHandle = timer.after(15 * 1000, destroy);
ws.idleTimeoutHandle.unref();
}
handleWorkerError(ws: WorkerState, e: any): void {
@ -277,7 +284,7 @@ export class CryptoApi {
for (let i = 0; i < this.workers.length; i++) {
this.workers[i] = {
currentWorkItem: null,
terminationTimerHandle: null,
idleTimeoutHandle: null,
w: null,
};
}
@ -293,6 +300,9 @@ export class CryptoApi {
priority: number,
...args: any[]
): Promise<T> {
if (this.stopped) {
throw new CryptoApiStoppedError();
}
const p: Promise<T> = new Promise<T>((resolve, reject) => {
const rpcId = this.nextRpcId++;
const workItem: WorkItem = {
@ -324,7 +334,33 @@ export class CryptoApi {
throw Error("assertion failed");
});
return p;
// Make sure that we wait for the result while a timer is active
// to prevent the event loop from dying, as just waiting for a promise
// does not keep the process alive in Node.
// (The worker child process won't keep us alive either, because we un-ref
// it to make sure it doesn't keep us alive if there is no work.)
return new Promise<T>((resolve, reject) => {
let timedOut = false;
const timeout = timer.after(5000, () => {
logger.warn("crypto RPC call timed out");
timedOut = true;
reject(new Error("crypto RPC call timed out"));
});
p.then((x) => {
if (timedOut) {
return;
}
timeout.clear();
resolve(x);
});
p.catch((x) => {
if (timedOut) {
return;
}
timeout.clear();
reject(x);
});
});
}
createPlanchet(req: PlanchetCreationRequest): Promise<WithdrawalPlanchet> {

View File

@ -161,32 +161,3 @@ export function getErrorDetailFromException(e: any): TalerErrorDetail {
);
return err;
}
/**
* Run an operation and call the onOpError callback
* when there was an exception or operation error that must be reported.
* The cause will be re-thrown to the caller.
*/
export async function guardOperationException<T>(
op: () => Promise<T>,
onOpError: (e: TalerErrorDetail) => Promise<void>,
): Promise<T> {
try {
return await op();
} catch (e: any) {
if (
e instanceof TalerError &&
e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED)
) {
throw e;
}
const opErr = getErrorDetailFromException(e);
await onOpError(opErr);
throw TalerError.fromDetail(
TalerErrorCode.WALLET_PENDING_OPERATION_FAILED,
{
innerError: e.errorDetail,
},
);
}
}

View File

@ -41,7 +41,7 @@ export { SynchronousCryptoWorker } from "./crypto/workers/synchronousWorker.js";
export * from "./pending-types.js";
export * from "./util/debugFlags.js";
export { InternalWalletState } from "./common.js";
export { InternalWalletState } from "./internal-wallet-state.js";
export * from "./wallet-api-types.js";
export * from "./wallet.js";

View File

@ -57,7 +57,7 @@ import {
stringToBytes,
AbsoluteTime,
} from "@gnu-taler/taler-util";
import { InternalWalletState } from "../../common.js";
import { InternalWalletState } from "../../internal-wallet-state.js";
import {
AbortStatus,
CoinSourceType,

View File

@ -58,7 +58,7 @@ import {
} from "../../util/invariants.js";
import { Logger } from "@gnu-taler/taler-util";
import { initRetryInfo } from "../../util/retries.js";
import { InternalWalletState } from "../../common.js";
import { InternalWalletState } from "../../internal-wallet-state.js";
import { provideBackupState } from "./state.js";
import { makeEventId, TombstoneTag } from "../transactions.js";
import { getExchangeDetails } from "../exchanges.js";

View File

@ -55,7 +55,7 @@ import {
TalerProtocolTimestamp,
} from "@gnu-taler/taler-util";
import { gunzipSync, gzipSync } from "fflate";
import { InternalWalletState } from "../../common.js";
import { InternalWalletState } from "../../internal-wallet-state.js";
import { kdf } from "@gnu-taler/taler-util";
import { secretbox, secretbox_open } from "@gnu-taler/taler-util";
import {
@ -80,7 +80,6 @@ import {
WalletStoresV1,
WALLET_BACKUP_STATE_KEY,
} from "../../db.js";
import { guardOperationException } from "../../errors.js";
import {
readSuccessResponseJsonOrThrow,
readTalerErrorResponse,
@ -99,6 +98,7 @@ import {
import { exportBackup } from "./export.js";
import { BackupCryptoPrecomputedData, importBackup } from "./import.js";
import { getWalletBackupState, provideBackupState } from "./state.js";
import { guardOperationException } from "../common.js";
const logger = new Logger("operations/backup.ts");

View File

@ -23,7 +23,7 @@ import {
} from "../../db.js";
import { checkDbInvariant } from "../../util/invariants.js";
import { GetReadOnlyAccess } from "../../util/query.js";
import { InternalWalletState } from "../../common.js";
import { InternalWalletState } from "../../internal-wallet-state.js";
export async function provideBackupState(
ws: InternalWalletState,

View File

@ -25,7 +25,7 @@ import {
} from "@gnu-taler/taler-util";
import { CoinStatus, WalletStoresV1 } from "../db.js";
import { GetReadOnlyAccess } from "../util/query.js";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
const logger = new Logger("operations/balance.ts");

View File

@ -42,9 +42,8 @@ import {
TrackDepositGroupResponse,
URL,
} from "@gnu-taler/taler-util";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { DepositGroupRecord, OperationStatus } from "../db.js";
import { guardOperationException } from "../errors.js";
import { PayCoinSelection, selectPayCoins } from "../util/coinSelection.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
@ -57,6 +56,7 @@ import {
getTotalPaymentCost,
} from "./pay.js";
import { getTotalRefreshCost } from "./refresh.js";
import { guardOperationException } from "./common.js";
/**
* Logger.

View File

@ -64,12 +64,13 @@ import {
} from "../util/http.js";
import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import { guardOperationException, TalerError } from "../errors.js";
import { InternalWalletState, TrustInfo } from "../common.js";
import { TalerError } from "../errors.js";
import { InternalWalletState, TrustInfo } from "../internal-wallet-state.js";
import {
WALLET_CACHE_BREAKER_CLIENT_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "../versions.js";
import { guardOperationException } from "./common.js";
const logger = new Logger("exchanges.ts");

View File

@ -24,7 +24,7 @@ import {
codecForMerchantConfigResponse,
LibtoolVersion,
} from "@gnu-taler/taler-util";
import { InternalWalletState, MerchantInfo } from "../common.js";
import { InternalWalletState, MerchantInfo } from "../internal-wallet-state.js";
import { readSuccessResponseJsonOrThrow } from "../index.js";
const logger = new Logger("taler-wallet-core:merchants.ts");

View File

@ -55,7 +55,7 @@ import {
TransactionType,
URL,
} from "@gnu-taler/taler-util";
import { EXCHANGE_COINS_LOCK, InternalWalletState } from "../common.js";
import { EXCHANGE_COINS_LOCK, InternalWalletState } from "../internal-wallet-state.js";
import {
AbortStatus,
AllowedAuditorInfo,
@ -71,7 +71,6 @@ import {
WalletStoresV1,
} from "../db.js";
import {
guardOperationException,
makeErrorDetail,
makePendingOperationFailedError,
TalerError,
@ -100,6 +99,7 @@ import {
} from "../util/retries.js";
import { getExchangeDetails } from "./exchanges.js";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js";
import { guardOperationException } from "./common.js";
/**
* Logger.

View File

@ -36,7 +36,7 @@ import {
ReserveType,
} from "../pending-types.js";
import { AbsoluteTime } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadOnlyAccess } from "../util/query.js";
async function gatherExchangePending(

View File

@ -49,11 +49,11 @@ import {
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { Logger, URL } from "@gnu-taler/taler-util";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import { guardOperationException } from "../errors.js";
import { createRefreshGroup, processRefreshGroup } from "./refresh.js";
import { getReserveRequestTimeout, processReserve } from "./reserves.js";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadWriteAccess } from "../util/query.js";
import { guardOperationException } from "./common.js";
const logger = new Logger("operations/recoup.ts");

View File

@ -61,13 +61,12 @@ import {
AbsoluteTime,
URL,
} from "@gnu-taler/taler-util";
import { guardOperationException } from "../errors.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import {
DenomInfo,
EXCHANGE_COINS_LOCK,
InternalWalletState,
} from "../common.js";
} from "../internal-wallet-state.js";
import {
isWithdrawableDenom,
selectWithdrawalDenominations,
@ -78,6 +77,8 @@ import {
} from "../crypto/cryptoTypes.js";
import { GetReadWriteAccess } from "../util/query.js";
import { CryptoApi } from "../index.browser.js";
import { guardOperationException } from "./common.js";
import { CryptoApiStoppedError } from "../crypto/workers/cryptoApi.js";
const logger = new Logger("refresh.ts");
@ -944,6 +945,9 @@ export async function createRefreshGroup(
logger.info(`created refresh group ${refreshGroupId}`);
processRefreshGroup(ws, refreshGroupId).catch((e) => {
if (e instanceof CryptoApiStoppedError) {
return;
}
logger.warn(`processing refresh group ${refreshGroupId} failed: ${e}`);
});

View File

@ -59,9 +59,9 @@ import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import { guardOperationException } from "../errors.js";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { guardOperationException } from "./common.js";
const logger = new Logger("refund.ts");

View File

@ -38,7 +38,7 @@ import {
AbsoluteTime,
URL,
} from "@gnu-taler/taler-util";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
OperationStatus,
ReserveBankInfo,
@ -47,7 +47,7 @@ import {
WalletStoresV1,
WithdrawalGroupRecord,
} from "../db.js";
import { guardOperationException, TalerError } from "../errors.js";
import { TalerError } from "../errors.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import {
readSuccessResponseJsonOrErrorCode,
@ -74,6 +74,7 @@ import {
selectWithdrawalDenominations,
updateWithdrawalDenoms,
} from "./withdraw.js";
import { guardOperationException } from "./common.js";
const logger = new Logger("taler-wallet-core:reserves.ts");

View File

@ -35,7 +35,7 @@ import {
PreparePayResultType,
} from "@gnu-taler/taler-util";
import { createTalerWithdrawReserve } from "./reserves.js";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { confirmPay, preparePayForUri } from "./pay.js";
import { getBalances } from "./balance.js";
import { applyRefund } from "./refund.js";

View File

@ -44,9 +44,9 @@ import {
import { j2s } from "@gnu-taler/taler-util";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import { guardOperationException, makeErrorDetail } from "../errors.js";
import { makeErrorDetail } from "../errors.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
getExchangeWithdrawalInfo,
updateWithdrawalDenoms,
@ -59,6 +59,7 @@ import {
readSuccessResponseJsonOrThrow,
} from "../util/http.js";
import { encodeCrock, getRandomBytes } from "@gnu-taler/taler-util";
import { guardOperationException } from "./common.js";
const logger = new Logger("operations/tip.ts");

View File

@ -31,7 +31,7 @@ import {
WithdrawalDetails,
WithdrawalType,
} from "@gnu-taler/taler-util";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
AbortStatus,
RefundState,

View File

@ -65,16 +65,15 @@ import {
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import {
getErrorDetailFromException,
guardOperationException,
makeErrorDetail,
makePendingOperationFailedError,
TalerError,
} from "../errors.js";
import { InternalWalletState } from "../common.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "../versions.js";
import { guardOperationException } from "./common.js";
/**
* Logger for this file.

View File

@ -89,7 +89,7 @@ import {
NotificationListener,
RecoupOperations,
ReserveOperations,
} from "./common.js";
} from "./internal-wallet-state.js";
import { CryptoApi, CryptoWorkerFactory } from "./crypto/workers/cryptoApi.js";
import {
AuditorTrustRecord,