wallet-core: do not block when accepting a manual withdrawal

This commit is contained in:
Florian Dold 2022-09-23 18:56:21 +02:00
parent 9811e19252
commit 72336b149b
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
8 changed files with 172 additions and 69 deletions

View File

@ -1378,6 +1378,7 @@ export interface DepositGroupFees {
wire: AmountJson;
refresh: AmountJson;
}
export interface CreateDepositGroupRequest {
depositPaytoUri: string;
amount: AmountString;

View File

@ -46,7 +46,7 @@
"dependencies": {
"@gnu-taler/taler-util": "workspace:*",
"@gnu-taler/taler-wallet-core": "workspace:*",
"axios": "^0.25.0",
"axios": "^0.27.2",
"source-map-support": "^0.5.21",
"tslib": "^2.3.1"
}

View File

@ -20,6 +20,11 @@
import { GlobalTestState } from "../harness/harness.js";
import { createSimpleTestkudosEnvironment } from "../harness/helpers.js";
import { WalletApiOperation, BankApi } from "@gnu-taler/taler-wallet-core";
import {
AbsoluteTime,
Duration,
TalerProtocolTimestamp,
} from "@gnu-taler/taler-util";
/**
* Run test for basic, bank-integrated withdrawal.
@ -38,6 +43,9 @@ export async function runTestWithdrawalManualTest(t: GlobalTestState) {
exchangeBaseUrl: exchange.baseUrl,
});
const tStart = AbsoluteTime.now();
// We expect this to return immediately.
const wres = await wallet.client.call(
WalletApiOperation.AcceptManualWithdrawal,
{
@ -46,9 +54,14 @@ export async function runTestWithdrawalManualTest(t: GlobalTestState) {
},
);
// Check that the request did not go into long-polling.
const duration = AbsoluteTime.difference(tStart, AbsoluteTime.now());
if (duration.d_ms > 5 * 1000) {
throw Error("withdrawal took too long (longpolling issue)");
}
const reservePub: string = wres.reservePub;
// Bug.
await BankApi.adminAddIncoming(bank, {
exchangeBankAccount,
amount: "TESTKUDOS:10",

View File

@ -64,7 +64,7 @@
"@gnu-taler/idb-bridge": "workspace:*",
"@gnu-taler/taler-util": "workspace:*",
"@types/node": "^17.0.17",
"axios": "^0.25.0",
"axios": "^0.27.2",
"big-integer": "^1.6.51",
"fflate": "^0.7.3",
"source-map-support": "^0.5.21",

View File

@ -127,6 +127,12 @@ export interface RecoupOperations {
export type NotificationListener = (n: WalletNotification) => void;
export interface ActiveLongpollInfo {
[opId: string]: {
cancel: () => void;
};
}
/**
* Internal, shard wallet state that is used by the implementation
* of wallet operations.
@ -135,12 +141,10 @@ export type NotificationListener = (n: WalletNotification) => void;
* as it's an opaque implementation detail.
*/
export interface InternalWalletState {
memoProcessReserve: AsyncOpMemoMap<void>;
memoMakePlanchet: AsyncOpMemoMap<void>;
memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>;
memoGetBalance: AsyncOpMemoSingle<BalancesResponse>;
memoProcessRefresh: AsyncOpMemoMap<void>;
memoProcessRecoup: AsyncOpMemoMap<void>;
/**
* Active longpoll operations.
*/
activeLongpoll: ActiveLongpollInfo;
cryptoApi: TalerCryptoInterface;

View File

@ -28,6 +28,7 @@ import {
Amounts,
AmountString,
BankWithdrawDetails,
CancellationToken,
canonicalizeBaseUrl,
codecForBankWithdrawalOperationPostResponse,
codecForReserveStatus,
@ -106,7 +107,12 @@ import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "../versions.js";
import { makeCoinAvailable, storeOperationPending } from "../wallet.js";
import {
makeCoinAvailable,
runOperationWithErrorReporting,
storeOperationError,
storeOperationPending,
} from "../wallet.js";
import {
getExchangeDetails,
getExchangePaytoUri,
@ -962,6 +968,7 @@ export async function updateWithdrawalDenoms(
async function queryReserve(
ws: InternalWalletState,
withdrawalGroupId: string,
cancellationToken: CancellationToken,
): Promise<{ ready: boolean }> {
const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, {
withdrawalGroupId,
@ -982,6 +989,7 @@ async function queryReserve(
const resp = await ws.http.get(reserveUrl.href, {
timeout: getReserveRequestTimeout(withdrawalGroup),
cancellationToken,
});
const result = await readSuccessResponseJsonOrErrorCode(
@ -1044,6 +1052,16 @@ export async function processWithdrawalGroup(
throw Error(`withdrawal group ${withdrawalGroupId} not found`);
}
const retryTag = RetryTags.forWithdrawal(withdrawalGroup);
// We're already running!
if (ws.activeLongpoll[retryTag]) {
logger.info("withdrawal group already in long-polling, returning!");
return {
type: OperationAttemptResultType.Longpoll,
};
}
switch (withdrawalGroup.status) {
case WithdrawalGroupStatus.RegisteringBank:
await processReserveBankStatus(ws, withdrawalGroupId);
@ -1051,15 +1069,45 @@ export async function processWithdrawalGroup(
forceNow: true,
});
case WithdrawalGroupStatus.QueryingStatus: {
const res = await queryReserve(ws, withdrawalGroupId);
if (res.ready) {
return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,
});
const doQueryAsync = async () => {
if (ws.stopped) {
logger.info("not long-polling reserve, wallet already stopped");
await storeOperationPending(ws, retryTag);
return;
}
const cts = CancellationToken.create();
let res: { ready: boolean } | undefined = undefined;
try {
ws.activeLongpoll[retryTag] = {
cancel: () => {
logger.info("cancel of reserve longpoll requested");
cts.cancel();
},
};
res = await queryReserve(ws, withdrawalGroupId, cts.token);
} catch (e) {
await storeOperationError(
ws,
retryTag,
getErrorDetailFromException(e),
);
return;
}
delete ws.activeLongpoll[retryTag];
logger.info(
`active longpoll keys (2) ${Object.keys(ws.activeLongpoll)}`,
);
if (!res.ready) {
await storeOperationPending(ws, retryTag);
}
ws.latch.trigger();
};
doQueryAsync();
logger.info(
"returning early from withdrawal for long-polling in background",
);
return {
type: OperationAttemptResultType.Pending,
result: undefined,
type: OperationAttemptResultType.Longpoll,
};
}
case WithdrawalGroupStatus.WaitConfirmBank: {
@ -1912,10 +1960,16 @@ export async function createManualWithdrawal(
return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId);
});
// Start withdrawal in the background.
await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true }).catch(
(err) => {
logger.error("Processing withdrawal (after creation) failed:", err);
// Start withdrawal in the background (do not await!)
// FIXME: We could also interrupt the task look if it is waiting and
// rely on retry handling to re-process the withdrawal group.
runOperationWithErrorReporting(
ws,
RetryTags.forWithdrawal(withdrawalGroup),
async () => {
return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,
});
},
);

View File

@ -64,38 +64,37 @@ import {
codecForSetWalletDeviceIdRequest,
codecForTestPayArgs,
codecForTrackDepositGroupRequest,
codecForTransactionByIdRequest,
codecForTransactionsRequest,
codecForWithdrawFakebankRequest,
codecForWithdrawTestBalance,
CoinDumpJson,
CoreApiResponse,
DenominationInfo,
Duration,
durationFromSpec,
durationMin,
ExchangeFullDetails,
ExchangeListItem,
ExchangesListResponse,
FeeDescription,
GetExchangeTosResult,
j2s,
KnownBankAccounts,
Logger,
ManualWithdrawalDetails,
NotificationType,
OperationMap,
parsePaytoUri,
PaytoUri,
RefreshReason,
TalerErrorCode,
URL,
WalletNotification,
WalletCoreVersion,
ExchangeListItem,
OperationMap,
FeeDescription,
TalerErrorDetail,
codecForTransactionByIdRequest,
DenominationInfo,
KnownBankAccountsInfo,
codecForAddKnownBankAccounts,
codecForForgetKnownBankAccounts,
URL,
WalletCoreVersion,
WalletNotification,
} from "@gnu-taler/taler-util";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
import {
@ -119,6 +118,7 @@ import {
TalerError,
} from "./errors.js";
import {
ActiveLongpollInfo,
ExchangeOperations,
InternalWalletState,
MerchantInfo,
@ -235,7 +235,6 @@ import {
OperationAttemptResult,
OperationAttemptResultType,
RetryInfo,
runOperationHandlerForResult,
} from "./util/retries.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
@ -392,27 +391,21 @@ export async function storeOperationPending(
});
}
/**
* Execute one operation based on the pending operation info record.
*
* Store success/failure result in the database.
*/
async function processOnePendingOperation(
export async function runOperationWithErrorReporting(
ws: InternalWalletState,
pending: PendingTaskInfo,
forceNow = false,
opId: string,
f: () => Promise<OperationAttemptResult>,
): Promise<void> {
logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`);
let maybeError: TalerErrorDetail | undefined;
try {
const resp = await callOperationHandler(ws, pending, forceNow);
const resp = await f();
switch (resp.type) {
case OperationAttemptResultType.Error:
return await storeOperationError(ws, pending.id, resp.errorDetail);
return await storeOperationError(ws, opId, resp.errorDetail);
case OperationAttemptResultType.Finished:
return await storeOperationFinished(ws, pending.id);
return await storeOperationFinished(ws, opId);
case OperationAttemptResultType.Pending:
return await storeOperationPending(ws, pending.id);
return await storeOperationPending(ws, opId);
case OperationAttemptResultType.Longpoll:
break;
}
@ -421,7 +414,7 @@ async function processOnePendingOperation(
logger.warn("operation processed resulted in error");
logger.warn(`error was: ${j2s(e.errorDetail)}`);
maybeError = e.errorDetail;
return await storeOperationError(ws, pending.id, maybeError!);
return await storeOperationError(ws, opId, maybeError!);
} else if (e instanceof Error) {
// This is a bug, as we expect pending operations to always
// do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
@ -435,7 +428,7 @@ async function processOnePendingOperation(
},
`unexpected exception (message: ${e.message})`,
);
return await storeOperationError(ws, pending.id, maybeError);
return await storeOperationError(ws, opId, maybeError);
} else {
logger.error("Uncaught exception, value is not even an error.");
maybeError = makeErrorDetail(
@ -443,7 +436,7 @@ async function processOnePendingOperation(
{},
`unexpected exception (not even an error)`,
);
return await storeOperationError(ws, pending.id, maybeError);
return await storeOperationError(ws, opId, maybeError);
}
}
}
@ -460,7 +453,10 @@ export async function runPending(
if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
await processOnePendingOperation(ws, p, forceNow);
await runOperationWithErrorReporting(ws, p.id, async () => {
logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
return await callOperationHandler(ws, p, forceNow);
});
}
}
@ -563,7 +559,10 @@ async function runTaskLoop(
if (!AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
await processOnePendingOperation(ws, p);
await runOperationWithErrorReporting(ws, p.id, async () => {
logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`);
return await callOperationHandler(ws, p);
});
ws.notify({
type: NotificationType.PendingOperationProcessed,
});
@ -1613,13 +1612,10 @@ export class Wallet {
* This ties together all the operation implementations.
*/
class InternalWalletStateImpl implements InternalWalletState {
memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> =
new AsyncOpMemoSingle();
memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle();
memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
/**
* @see {@link InternalWalletState.activeLongpoll}
*/
activeLongpoll: ActiveLongpollInfo = {};
cryptoApi: TalerCryptoInterface;
cryptoDispatcher: CryptoDispatcher;
@ -1719,9 +1715,14 @@ class InternalWalletStateImpl implements InternalWalletState {
* Stop ongoing processing.
*/
stop(): void {
logger.info("stopping (at internal wallet state)");
this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoDispatcher.stop();
for (const key of Object.keys(this.activeLongpoll)) {
logger.info(`cancelling active longpoll ${key}`);
this.activeLongpoll[key].cancel();
}
}
async runUntilDone(

View File

@ -197,7 +197,7 @@ importers:
'@rollup/plugin-node-resolve': ^13.3.0
'@rollup/plugin-replace': ^4.0.0
'@types/node': ^17.0.17
axios: ^0.25.0
axios: ^0.27.2
prettier: ^2.5.1
rimraf: ^3.0.2
rollup: ^2.79.0
@ -210,7 +210,7 @@ importers:
dependencies:
'@gnu-taler/taler-util': link:../taler-util
'@gnu-taler/taler-wallet-core': link:../taler-wallet-core
axios: 0.25.0
axios: 0.27.2
source-map-support: 0.5.21
tslib: 2.3.1
devDependencies:
@ -237,7 +237,7 @@ importers:
'@typescript-eslint/eslint-plugin': ^5.36.1
'@typescript-eslint/parser': ^5.36.1
ava: ^4.0.1
axios: ^0.25.0
axios: ^0.27.2
big-integer: ^1.6.51
c8: ^7.11.0
eslint: ^8.8.0
@ -262,7 +262,7 @@ importers:
'@gnu-taler/idb-bridge': link:../idb-bridge
'@gnu-taler/taler-util': link:../taler-util
'@types/node': 17.0.17
axios: 0.25.0
axios: 0.27.2
big-integer: 1.6.51
fflate: 0.7.3
source-map-support: 0.5.21
@ -4380,8 +4380,7 @@ packages:
dev: true
/asynckit/0.4.0:
resolution: {integrity: sha1-x57Zf380y48robyXkLzDZkdLS3k=}
dev: true
resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==}
/at-least-node/1.0.0:
resolution: {integrity: sha512-+q/t7Ekv1EDY2l6Gda6LLiX14rU9TV20Wa3ofeQmwPFZbOMo9DXrLbOjFaaclkXKWidIaopwAObQDqwWtGUjqg==}
@ -4550,10 +4549,11 @@ packages:
- debug
dev: true
/axios/0.25.0:
resolution: {integrity: sha512-cD8FOb0tRH3uuEe6+evtAbgJtfxr7ly3fQjYcMcuPlgkwVS9xboaVIpcDV+cYQe+yGykgwZCs1pzjntcGa6l5g==}
/axios/0.27.2:
resolution: {integrity: sha512-t+yRIyySRTp/wua5xEr+z1q60QmLq8ABsS5O9Me1AsE5dfKqgnCFzwiCZZ/cGNd1lq4/7akDWMxdhVlucjmnOQ==}
dependencies:
follow-redirects: 1.14.8
follow-redirects: 1.15.2
form-data: 4.0.0
transitivePeerDependencies:
- debug
dev: false
@ -5535,7 +5535,6 @@ packages:
engines: {node: '>= 0.8'}
dependencies:
delayed-stream: 1.0.0
dev: true
/commander/2.17.1:
resolution: {integrity: sha512-wPMUt6FnH2yzG95SA6mzjQOEKUU3aLaDEmzs1ti+1E9h+CsrZghRlqEM/EJ4KscsQVG8uNN4uVreUeT8+drlgg==}
@ -6382,9 +6381,8 @@ packages:
dev: true
/delayed-stream/1.0.0:
resolution: {integrity: sha1-3zrhmayt+31ECqrgsp4icrJOxhk=}
resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==}
engines: {node: '>=0.4.0'}
dev: true
/depd/1.1.2:
resolution: {integrity: sha1-m81S4UwJd2PnSbJ0xDRu0uVgtak=}
@ -7627,6 +7625,17 @@ packages:
peerDependenciesMeta:
debug:
optional: true
dev: true
/follow-redirects/1.15.2:
resolution: {integrity: sha512-VQLG33o04KaQ8uYi2tVNbdrWp1QWxNNea+nmIB4EVM28v0hmP17z7aG1+wAkNzVq4KeXTq3221ye5qTJP91JwA==}
engines: {node: '>=4.0'}
peerDependencies:
debug: '*'
peerDependenciesMeta:
debug:
optional: true
dev: false
/for-each/0.3.3:
resolution: {integrity: sha512-jqYfLp7mo9vIyQf8ykW2v7A+2N4QjeCeI5+Dz9XraiO1ign81wjiH7Fb9vSOWvQfNtmSa4H2RoQTrrXivdUZmw==}
@ -7687,6 +7696,15 @@ packages:
mime-types: 2.1.34
dev: true
/form-data/4.0.0:
resolution: {integrity: sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==}
engines: {node: '>= 6'}
dependencies:
asynckit: 0.4.0
combined-stream: 1.0.8
mime-types: 2.1.35
dev: false
/formdata-polyfill/4.0.10:
resolution: {integrity: sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==}
engines: {node: '>=12.20.0'}
@ -9593,6 +9611,11 @@ packages:
engines: {node: '>= 0.6'}
dev: true
/mime-db/1.52.0:
resolution: {integrity: sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==}
engines: {node: '>= 0.6'}
dev: false
/mime-types/2.1.34:
resolution: {integrity: sha512-6cP692WwGIs9XXdOO4++N+7qjqv0rqxxVvJ3VHPh/Sc9mVZcQP+ZGhkKiTvWMQRr2tbHkJP/Yn7Y0npb3ZBs4A==}
engines: {node: '>= 0.6'}
@ -9600,6 +9623,13 @@ packages:
mime-db: 1.51.0
dev: true
/mime-types/2.1.35:
resolution: {integrity: sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==}
engines: {node: '>= 0.6'}
dependencies:
mime-db: 1.52.0
dev: false
/mime/1.6.0:
resolution: {integrity: sha512-x0Vn8spI+wuJ1O6S7gnbaQg8Pxh4NNHb7KSINmEWKiPE4RKOplvijn+NkmYmmRgP68mc70j2EbeTFRsrswaQeg==}
engines: {node: '>=4'}