wallet: cancellation for deposit

This commit is contained in:
Florian Dold 2022-03-28 23:59:16 +02:00
parent 80e43db2ca
commit f5d194dfc6
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
6 changed files with 103 additions and 26 deletions

View File

@ -550,7 +550,7 @@ export interface ExchangeRecord {
/** /**
* Retry status for fetching updated information about the exchange. * Retry status for fetching updated information about the exchange.
*/ */
retryInfo: RetryInfo; retryInfo?: RetryInfo;
} }
/** /**

View File

@ -35,6 +35,7 @@ import {
AmountJson, AmountJson,
DenominationPubKey, DenominationPubKey,
TalerProtocolTimestamp, TalerProtocolTimestamp,
CancellationToken,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { CryptoDispatcher } from "./crypto/workers/cryptoDispatcher.js"; import { CryptoDispatcher } from "./crypto/workers/cryptoDispatcher.js";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js"; import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
@ -200,9 +201,14 @@ export interface InternalWalletState {
memoGetBalance: AsyncOpMemoSingle<BalancesResponse>; memoGetBalance: AsyncOpMemoSingle<BalancesResponse>;
memoProcessRefresh: AsyncOpMemoMap<void>; memoProcessRefresh: AsyncOpMemoMap<void>;
memoProcessRecoup: AsyncOpMemoMap<void>; memoProcessRecoup: AsyncOpMemoMap<void>;
memoProcessDeposit: AsyncOpMemoMap<void>;
cryptoApi: TalerCryptoInterface; cryptoApi: TalerCryptoInterface;
/**
* Cancellation token for the currently running
* deposit operation, if any.
*/
taskCancellationSourceForDeposit?: CancellationToken.Source;
timerGroup: TimerGroup; timerGroup: TimerGroup;
stopped: boolean; stopped: boolean;

View File

@ -21,6 +21,7 @@ import {
AbsoluteTime, AbsoluteTime,
AmountJson, AmountJson,
Amounts, Amounts,
CancellationToken,
canonicalJson, canonicalJson,
codecForDepositSuccess, codecForDepositSuccess,
ContractTerms, ContractTerms,
@ -125,23 +126,34 @@ async function reportDepositGroupError(
export async function processDepositGroup( export async function processDepositGroup(
ws: InternalWalletState, ws: InternalWalletState,
depositGroupId: string, depositGroupId: string,
forceNow = false, options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<void> { ): Promise<void> {
await ws.memoProcessDeposit.memo(depositGroupId, async () => { if (ws.taskCancellationSourceForDeposit) {
const onOpErr = (err: TalerErrorDetail): Promise<void> => ws.taskCancellationSourceForDeposit.cancel();
reportDepositGroupError(ws, depositGroupId, err); }
return await guardOperationException( const onOpErr = (err: TalerErrorDetail): Promise<void> =>
async () => await processDepositGroupImpl(ws, depositGroupId, forceNow), reportDepositGroupError(ws, depositGroupId, err);
onOpErr, return await guardOperationException(
); async () => await processDepositGroupImpl(ws, depositGroupId, options),
}); onOpErr,
);
} }
/**
* @see {processDepositGroup}
*/
async function processDepositGroupImpl( async function processDepositGroupImpl(
ws: InternalWalletState, ws: InternalWalletState,
depositGroupId: string, depositGroupId: string,
forceNow = false, options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<void> { ): Promise<void> {
const forceNow = options.forceNow ?? false;
const depositGroup = await ws.db const depositGroup = await ws.db
.mktx((x) => ({ .mktx((x) => ({
depositGroups: x.depositGroups, depositGroups: x.depositGroups,
@ -170,6 +182,8 @@ async function processDepositGroupImpl(
"", "",
); );
// Check for cancellation before expensive operations.
options.cancellationToken?.throwIfCancelled();
const depositPermissions = await generateDepositPermissions( const depositPermissions = await generateDepositPermissions(
ws, ws,
depositGroup.payCoinSelection, depositGroup.payCoinSelection,
@ -196,9 +210,13 @@ async function processDepositGroupImpl(
denom_pub_hash: perm.h_denom, denom_pub_hash: perm.h_denom,
merchant_pub: depositGroup.merchantPub, merchant_pub: depositGroup.merchantPub,
}; };
// Check for cancellation before making network request.
options.cancellationToken?.throwIfCancelled();
const url = new URL(`coins/${perm.coin_pub}/deposit`, perm.exchange_url); const url = new URL(`coins/${perm.coin_pub}/deposit`, perm.exchange_url);
logger.info(`depositing to ${url}`); logger.info(`depositing to ${url}`);
const httpResp = await ws.http.postJson(url.href, requestBody); const httpResp = await ws.http.postJson(url.href, requestBody, {
cancellationToken: options.cancellationToken,
});
await readSuccessResponseJsonOrThrow(httpResp, codecForDepositSuccess()); await readSuccessResponseJsonOrThrow(httpResp, codecForDepositSuccess());
await ws.db await ws.db
.mktx((x) => ({ depositGroups: x.depositGroups })) .mktx((x) => ({ depositGroups: x.depositGroups }))

View File

@ -61,7 +61,11 @@ import {
readSuccessResponseTextOrThrow, readSuccessResponseTextOrThrow,
} from "../util/http.js"; } from "../util/http.js";
import { DbAccess, GetReadOnlyAccess } from "../util/query.js"; import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js"; import {
initRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import { import {
WALLET_CACHE_BREAKER_CLIENT_VERSION, WALLET_CACHE_BREAKER_CLIENT_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION, WALLET_EXCHANGE_PROTOCOL_VERSION,
@ -102,7 +106,7 @@ function denominationRecordFromKeys(
return d; return d;
} }
async function handleExchangeUpdateError( async function reportExchangeUpdateError(
ws: InternalWalletState, ws: InternalWalletState,
baseUrl: string, baseUrl: string,
err: TalerErrorDetail, err: TalerErrorDetail,
@ -114,14 +118,44 @@ async function handleExchangeUpdateError(
if (!exchange) { if (!exchange) {
return; return;
} }
exchange.retryInfo.retryCounter++;
updateRetryInfoTimeout(exchange.retryInfo);
exchange.lastError = err; exchange.lastError = err;
await tx.exchanges.put(exchange); await tx.exchanges.put(exchange);
}); });
if (err) { ws.notify({ type: NotificationType.ExchangeOperationError, error: err });
ws.notify({ type: NotificationType.ExchangeOperationError, error: err }); }
}
async function resetExchangeUpdateRetry(
ws: InternalWalletState,
baseUrl: string,
): Promise<void> {
await ws.db
.mktx((x) => ({ exchanges: x.exchanges }))
.runReadWrite(async (tx) => {
const exchange = await tx.exchanges.get(baseUrl);
if (!exchange) {
return;
}
delete exchange.lastError;
exchange.retryInfo = initRetryInfo();
await tx.exchanges.put(exchange);
});
}
async function incrementExchangeUpdateRetry(
ws: InternalWalletState,
baseUrl: string,
): Promise<void> {
await ws.db
.mktx((x) => ({ exchanges: x.exchanges }))
.runReadWrite(async (tx) => {
const exchange = await tx.exchanges.get(baseUrl);
if (!exchange) {
return;
}
delete exchange.lastError;
exchange.retryInfo = RetryInfo.increment(exchange.retryInfo);
await tx.exchanges.put(exchange);
});
} }
export function getExchangeRequestTimeout(): Duration { export function getExchangeRequestTimeout(): Duration {
@ -349,7 +383,7 @@ export async function updateExchangeFromUrl(
exchangeDetails: ExchangeDetailsRecord; exchangeDetails: ExchangeDetailsRecord;
}> { }> {
const onOpErr = (e: TalerErrorDetail): Promise<void> => const onOpErr = (e: TalerErrorDetail): Promise<void> =>
handleExchangeUpdateError(ws, baseUrl, e); reportExchangeUpdateError(ws, baseUrl, e);
return await guardOperationException( return await guardOperationException(
() => updateExchangeFromUrlImpl(ws, baseUrl, acceptedFormat, forceNow), () => updateExchangeFromUrlImpl(ws, baseUrl, acceptedFormat, forceNow),
onOpErr, onOpErr,
@ -543,6 +577,12 @@ async function updateExchangeFromUrlImpl(
return { exchange, exchangeDetails }; return { exchange, exchangeDetails };
} }
if (forceNow) {
await resetExchangeUpdateRetry(ws, baseUrl);
} else {
await incrementExchangeUpdateRetry(ws, baseUrl);
}
logger.info("updating exchange /keys info"); logger.info("updating exchange /keys info");
const timeout = getExchangeRequestTimeout(); const timeout = getExchangeRequestTimeout();
@ -624,8 +664,8 @@ async function updateExchangeFromUrlImpl(
termsOfServiceAcceptedTimestamp: TalerProtocolTimestamp.now(), termsOfServiceAcceptedTimestamp: TalerProtocolTimestamp.now(),
}; };
// FIXME: only update if pointer got updated // FIXME: only update if pointer got updated
r.lastError = undefined; delete r.lastError;
r.retryInfo = initRetryInfo(); delete r.retryInfo;
r.lastUpdate = TalerProtocolTimestamp.now(); r.lastUpdate = TalerProtocolTimestamp.now();
r.nextUpdate = keysInfo.expiry; r.nextUpdate = keysInfo.expiry;
// New denominations might be available. // New denominations might be available.

View File

@ -444,7 +444,9 @@ export async function retryTransaction(
switch (type) { switch (type) {
case TransactionType.Deposit: case TransactionType.Deposit:
const depositGroupId = rest[0]; const depositGroupId = rest[0];
processDepositGroup(ws, depositGroupId, true); processDepositGroup(ws, depositGroupId, {
forceNow: true,
});
break; break;
case TransactionType.Withdrawal: case TransactionType.Withdrawal:
const withdrawalGroupId = rest[0]; const withdrawalGroupId = rest[0];

View File

@ -78,6 +78,7 @@ import {
URL, URL,
WalletNotification, WalletNotification,
Duration, Duration,
CancellationToken,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { timeStamp } from "console"; import { timeStamp } from "console";
import { import {
@ -271,9 +272,19 @@ async function processOnePendingOperation(
case PendingTaskType.ExchangeCheckRefresh: case PendingTaskType.ExchangeCheckRefresh:
await autoRefresh(ws, pending.exchangeBaseUrl); await autoRefresh(ws, pending.exchangeBaseUrl);
break; break;
case PendingTaskType.Deposit: case PendingTaskType.Deposit: {
await processDepositGroup(ws, pending.depositGroupId); const cts = CancellationToken.create();
ws.taskCancellationSourceForDeposit = cts;
try {
await processDepositGroup(ws, pending.depositGroupId, {
cancellationToken: cts.token,
});
} finally {
cts.dispose();
delete ws.taskCancellationSourceForDeposit;
}
break; break;
}
case PendingTaskType.Backup: case PendingTaskType.Backup:
await processBackupForProvider(ws, pending.backupProviderBaseUrl); await processBackupForProvider(ws, pending.backupProviderBaseUrl);
break; break;