wallet: make retries more robust and consistent

This commit is contained in:
Florian Dold 2022-03-29 13:47:32 +02:00
parent be489b6b3e
commit c265e7d019
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
23 changed files with 476 additions and 443 deletions

View File

@ -205,4 +205,12 @@ export class Logger {
);
}
}
reportBreak(): void {
if (!this.shouldLogError()) {
return;
}
const location = new Error("programming error");
this.error(`assertion failed: ${location.stack}`);
}
}

View File

@ -20,6 +20,7 @@
* Imports.
*/
import {
DEFAULT_REQUEST_TIMEOUT_MS,
Headers,
HttpRequestLibrary,
HttpRequestOptions,
@ -65,13 +66,16 @@ export class NodeHttpLib implements HttpRequestLibrary {
`request to origin ${parsedUrl.origin} was throttled`,
);
}
let timeout: number | undefined;
let timeoutMs: number | undefined;
if (typeof opt?.timeout?.d_ms === "number") {
timeout = opt.timeout.d_ms;
timeoutMs = opt.timeout.d_ms;
} else {
timeoutMs = DEFAULT_REQUEST_TIMEOUT_MS;
}
// FIXME: Use AbortController / etc. to handle cancellation
let resp: AxiosResponse;
try {
resp = await Axios({
let respPromise = Axios({
method,
url: url,
responseType: "arraybuffer",
@ -79,9 +83,13 @@ export class NodeHttpLib implements HttpRequestLibrary {
validateStatus: () => true,
transformResponse: (x) => x,
data: body,
timeout,
timeout: timeoutMs,
maxRedirects: 0,
});
if (opt?.cancellationToken) {
respPromise = opt.cancellationToken.racePromise(respPromise);
}
resp = await respPromise;
} catch (e: any) {
throw TalerError.fromDetail(
TalerErrorCode.WALLET_NETWORK_ERROR,
@ -94,11 +102,13 @@ export class NodeHttpLib implements HttpRequestLibrary {
}
const makeText = async (): Promise<string> => {
opt?.cancellationToken?.throwIfCancelled();
const respText = new Uint8Array(resp.data);
return bytesToString(respText);
};
const makeJson = async (): Promise<any> => {
opt?.cancellationToken?.throwIfCancelled();
let responseJson;
const respText = await makeText();
try {
@ -130,6 +140,7 @@ export class NodeHttpLib implements HttpRequestLibrary {
return responseJson;
};
const makeBytes = async () => {
opt?.cancellationToken?.throwIfCancelled();
if (typeof resp.data.byteLength !== "number") {
throw Error("expected array buffer");
}
@ -150,6 +161,7 @@ export class NodeHttpLib implements HttpRequestLibrary {
bytes: makeBytes,
};
}
async get(url: string, opt?: HttpRequestOptions): Promise<HttpResponse> {
return this.fetch(url, {
method: "GET",

View File

@ -77,7 +77,9 @@ export interface ReserveOperations {
processReserve(
ws: InternalWalletState,
reservePub: string,
forceNow?: boolean,
options?: {
forceNow?: boolean;
},
): Promise<void>;
}
@ -101,8 +103,10 @@ export interface ExchangeOperations {
updateExchangeFromUrl(
ws: InternalWalletState,
baseUrl: string,
acceptedFormat?: string[],
forceNow?: boolean,
options?: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
},
): Promise<{
exchange: ExchangeRecord;
exchangeDetails: ExchangeDetailsRecord;
@ -123,7 +127,9 @@ export interface RecoupOperations {
processRecoupGroup(
ws: InternalWalletState,
recoupGroupId: string,
forceNow?: boolean,
options?: {
forceNow?: boolean;
},
): Promise<void>;
}
@ -201,13 +207,8 @@ export interface InternalWalletState {
memoGetBalance: AsyncOpMemoSingle<BalancesResponse>;
memoProcessRefresh: AsyncOpMemoMap<void>;
memoProcessRecoup: AsyncOpMemoMap<void>;
cryptoApi: TalerCryptoInterface;
/**
* Cancellation token for the currently running
* deposit operation, if any.
*/
taskCancellationSourceForDeposit?: CancellationToken.Source;
cryptoApi: TalerCryptoInterface;
timerGroup: TimerGroup;
stopped: boolean;

View File

@ -17,7 +17,7 @@ Generally, the code to process a pending operation should first increment the
retryInfo (and reset the lastError) and then process the operation. This way,
it is impossble to forget incrementing the retryInfo.
For each retriable operation, there are usually `reset<Op>Retry`, `increment<Op>Retry` and
For each retriable operation, there are usually `setup<Op>Retry`, `increment<Op>Retry` and
`report<Op>Error` operations.
Note that this means that _during_ some operation, lastError will be cleared. The UI

View File

@ -57,7 +57,7 @@ import {
checkLogicInvariant,
} from "../../util/invariants.js";
import { Logger } from "@gnu-taler/taler-util";
import { initRetryInfo } from "../../util/retries.js";
import { resetRetryInfo } from "../../util/retries.js";
import { InternalWalletState } from "../../internal-wallet-state.js";
import { provideBackupState } from "./state.js";
import { makeEventId, TombstoneTag } from "../transactions.js";
@ -276,7 +276,7 @@ export async function importBackup(
protocolVersionRange: backupExchange.protocol_version_range,
},
permanent: true,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
lastUpdate: undefined,
nextUpdate: TalerProtocolTimestamp.now(),
nextRefreshCheck: TalerProtocolTimestamp.now(),
@ -464,7 +464,7 @@ export async function importBackup(
timestampReserveInfoPosted:
backupReserve.bank_info?.timestamp_reserve_info_posted,
senderWire: backupReserve.sender_wire,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
lastError: undefined,
initialWithdrawalGroupId:
backupReserve.initial_withdrawal_group_id,
@ -505,7 +505,7 @@ export async function importBackup(
backupWg.raw_withdrawal_amount,
),
reservePub,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
secretSeed: backupWg.secret_seed,
timestampStart: backupWg.timestamp_created,
timestampFinish: backupWg.timestamp_finish,
@ -618,7 +618,7 @@ export async function importBackup(
cryptoComp.proposalNoncePrivToPub[backupProposal.nonce_priv],
proposalId: backupProposal.proposal_id,
repurchaseProposalId: backupProposal.repurchase_proposal_id,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
download,
proposalStatus,
});
@ -753,7 +753,7 @@ export async function importBackup(
cryptoComp.proposalNoncePrivToPub[backupPurchase.nonce_priv],
lastPayError: undefined,
autoRefundDeadline: TalerProtocolTimestamp.never(),
refundStatusRetryInfo: initRetryInfo(),
refundStatusRetryInfo: resetRetryInfo(),
lastRefundStatusError: undefined,
timestampAccept: backupPurchase.timestamp_accept,
timestampFirstSuccessfulPay:
@ -763,7 +763,7 @@ export async function importBackup(
lastSessionId: undefined,
abortStatus,
// FIXME!
payRetryInfo: initRetryInfo(),
payRetryInfo: resetRetryInfo(),
download,
paymentSubmitPending:
!backupPurchase.timestamp_first_successful_pay,
@ -864,7 +864,7 @@ export async function importBackup(
Amounts.parseOrThrow(x.estimated_output_amount),
),
refreshSessionPerCoin,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
});
}
}
@ -890,7 +890,7 @@ export async function importBackup(
merchantBaseUrl: backupTip.exchange_base_url,
merchantTipId: backupTip.merchant_tip_id,
pickedUpTimestamp: backupTip.timestamp_finished,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
secretSeed: backupTip.secret_seed,
tipAmountEffective: denomsSel.totalCoinValue,
tipAmountRaw: Amounts.parseOrThrow(backupTip.tip_amount_raw),

View File

@ -89,7 +89,7 @@ import {
checkLogicInvariant,
} from "../../util/invariants.js";
import { GetReadWriteAccess } from "../../util/query.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../../util/retries.js";
import { resetRetryInfo, updateRetryInfoTimeout } from "../../util/retries.js";
import {
checkPaymentByProposalId,
confirmPay,
@ -434,7 +434,7 @@ async function runBackupCycleForProvider(
// FIXME: Allocate error code for this situation?
prov.state = {
tag: BackupProviderStateTag.Retrying,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
};
await tx.backupProvider.put(prov);
});
@ -478,7 +478,7 @@ async function incrementBackupRetryInTx(
} else if (pr.state.tag === BackupProviderStateTag.Ready) {
pr.state = {
tag: BackupProviderStateTag.Retrying,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
lastError: err,
};
}

View File

@ -47,7 +47,7 @@ export async function guardOperationException<T>(
throw TalerError.fromDetail(
TalerErrorCode.WALLET_PENDING_OPERATION_FAILED,
{
innerError: e.errorDetail,
innerError: opErr,
},
);
}

View File

@ -41,11 +41,11 @@ import {
TrackDepositGroupResponse,
URL,
} from "@gnu-taler/taler-util";
import { DepositGroupRecord, OperationStatus } from "../db.js";
import { DepositGroupRecord, OperationStatus, WireFee } from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { PayCoinSelection, selectPayCoins } from "../util/coinSelection.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { initRetryInfo, RetryInfo } from "../util/retries.js";
import { resetRetryInfo, RetryInfo } from "../util/retries.js";
import { guardOperationException } from "./common.js";
import { getExchangeDetails } from "./exchanges.js";
import {
@ -63,9 +63,15 @@ import { getTotalRefreshCost } from "./refresh.js";
*/
const logger = new Logger("deposits.ts");
async function resetDepositGroupRetry(
/**
* Set up the retry timeout for a deposit group.
*/
async function setupDepositGroupRetry(
ws: InternalWalletState,
depositGroupId: string,
options: {
resetRetry: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
@ -76,29 +82,19 @@ async function resetDepositGroupRetry(
if (!x) {
return;
}
x.retryInfo = initRetryInfo();
if (options.resetRetry) {
x.retryInfo = resetRetryInfo();
} else {
x.retryInfo = RetryInfo.increment(x.retryInfo);
}
delete x.lastError;
await tx.depositGroups.put(x);
});
}
async function incrementDepositGroupRetry(
ws: InternalWalletState,
depositGroupId: string,
): Promise<void> {
await ws.db
.mktx((x) => ({ depositGroups: x.depositGroups }))
.runReadWrite(async (tx) => {
const r = await tx.depositGroups.get(depositGroupId);
if (!r) {
return;
}
r.retryInfo = RetryInfo.increment(r.retryInfo);
delete r.lastError;
await tx.depositGroups.put(r);
});
}
/**
* Report an error that occurred while processing the deposit group.
*/
async function reportDepositGroupError(
ws: InternalWalletState,
depositGroupId: string,
@ -131,9 +127,6 @@ export async function processDepositGroup(
cancellationToken?: CancellationToken;
} = {},
): Promise<void> {
if (ws.taskCancellationSourceForDeposit) {
ws.taskCancellationSourceForDeposit.cancel();
}
const onOpErr = (err: TalerErrorDetail): Promise<void> =>
reportDepositGroupError(ws, depositGroupId, err);
return await guardOperationException(
@ -170,11 +163,7 @@ async function processDepositGroupImpl(
return;
}
if (forceNow) {
await resetDepositGroupRetry(ws, depositGroupId);
} else {
await incrementDepositGroupRetry(ws, depositGroupId);
}
await setupDepositGroupRetry(ws, depositGroupId, { resetRetry: forceNow });
const contractData = extractContractData(
depositGroup.contractTermsRaw,
@ -315,7 +304,7 @@ export async function trackDepositGroup(
export async function getFeeForDeposit(
ws: InternalWalletState,
req: GetFeeForDepositRequest,
): Promise<DepositFee> {
): Promise<DepositGroupFees> {
const p = parsePaytoUri(req.depositPaytoUri);
if (!p) {
throw Error("invalid payto URI");
@ -370,7 +359,7 @@ export async function getFeeForDeposit(
throw Error("insufficient funds");
}
return await getTotalFeeForDepositAmount(
return await getTotalFeesForDepositAmount(
ws,
p.targetType,
amount,
@ -429,14 +418,12 @@ export async function createDepositGroup(
nonce: noncePair.pub,
wire_transfer_deadline: nowRounded,
order_id: "",
// This is always the v2 wire hash, as we're the "merchant" and support v2.
h_wire: wireHash,
// Required for older exchanges.
pay_deadline: AbsoluteTime.toTimestamp(
AbsoluteTime.addDuration(now, durationFromSpec({ hours: 1 })),
),
merchant: {
name: "",
name: "(wallet)",
},
merchant_pub: merchantPair.pub,
refund_deadline: TalerProtocolTimestamp.zero(),
@ -505,7 +492,7 @@ export async function createDepositGroup(
payto_uri: req.depositPaytoUri,
salt: wireSalt,
},
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
operationStatus: OperationStatus.Pending,
lastError: undefined,
};
@ -594,8 +581,7 @@ export async function getEffectiveDepositAmount(
return Amounts.sub(Amounts.sum(amt).amount, Amounts.sum(fees).amount).amount;
}
// FIXME: rename to DepositGroupFee
export interface DepositFee {
export interface DepositGroupFees {
coin: AmountJson;
wire: AmountJson;
refresh: AmountJson;
@ -605,12 +591,12 @@ export interface DepositFee {
* Get the fee amount that will be charged when trying to deposit the
* specified amount using the selected coins and the wire method.
*/
export async function getTotalFeeForDepositAmount(
export async function getTotalFeesForDepositAmount(
ws: InternalWalletState,
wireType: string,
total: AmountJson,
pcs: PayCoinSelection,
): Promise<DepositFee> {
): Promise<DepositGroupFees> {
const wireFee: AmountJson[] = [];
const coinFee: AmountJson[] = [];
const refreshFee: AmountJson[] = [];
@ -638,8 +624,6 @@ export async function getTotalFeeForDepositAmount(
if (!denom) {
throw Error("can't find denomination to calculate deposit amount");
}
// const cc = pcs.coinContributions[i]
// acc = Amounts.add(acc, cc).amount
coinFee.push(denom.feeDeposit);
exchangeSet.add(coin.exchangeBaseUrl);
@ -661,16 +645,15 @@ export async function getTotalFeeForDepositAmount(
if (!exchangeDetails) {
continue;
}
// FIXME/NOTE: the line below _likely_ throws exception
// about "find method not found on undefined" when the wireType
// is not supported by the Exchange.
const fee = exchangeDetails.wireInfo.feesForType[wireType].find((x) => {
return AbsoluteTime.isBetween(
AbsoluteTime.now(),
AbsoluteTime.fromTimestamp(x.startStamp),
AbsoluteTime.fromTimestamp(x.endStamp),
);
})?.wireFee;
const fee = exchangeDetails.wireInfo.feesForType[wireType]?.find(
(x) => {
return AbsoluteTime.isBetween(
AbsoluteTime.now(),
AbsoluteTime.fromTimestamp(x.startStamp),
AbsoluteTime.fromTimestamp(x.endStamp),
);
},
)?.wireFee;
if (fee) {
wireFee.push(fee);
}

View File

@ -20,6 +20,7 @@
import {
AbsoluteTime,
Amounts,
CancellationToken,
canonicalizeBaseUrl,
codecForExchangeKeysJson,
codecForExchangeWireJson,
@ -61,11 +62,7 @@ import {
readSuccessResponseTextOrThrow,
} from "../util/http.js";
import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
import {
initRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import { resetRetryInfo, RetryInfo } from "../util/retries.js";
import {
WALLET_CACHE_BREAKER_CLIENT_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
@ -124,9 +121,12 @@ async function reportExchangeUpdateError(
ws.notify({ type: NotificationType.ExchangeOperationError, error: err });
}
async function resetExchangeUpdateRetry(
async function setupExchangeUpdateRetry(
ws: InternalWalletState,
baseUrl: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({ exchanges: x.exchanges }))
@ -135,25 +135,12 @@ async function resetExchangeUpdateRetry(
if (!exchange) {
return;
}
delete exchange.lastError;
exchange.retryInfo = initRetryInfo();
await tx.exchanges.put(exchange);
});
}
async function incrementExchangeUpdateRetry(
ws: InternalWalletState,
baseUrl: string,
): Promise<void> {
await ws.db
.mktx((x) => ({ exchanges: x.exchanges }))
.runReadWrite(async (tx) => {
const exchange = await tx.exchanges.get(baseUrl);
if (!exchange) {
return;
if (options.reset) {
exchange.retryInfo = resetRetryInfo();
} else {
exchange.retryInfo = RetryInfo.increment(exchange.retryInfo);
}
delete exchange.lastError;
exchange.retryInfo = RetryInfo.increment(exchange.retryInfo);
await tx.exchanges.put(exchange);
});
}
@ -376,8 +363,10 @@ async function downloadExchangeWireInfo(
export async function updateExchangeFromUrl(
ws: InternalWalletState,
baseUrl: string,
acceptedFormat?: string[],
forceNow = false,
options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<{
exchange: ExchangeRecord;
exchangeDetails: ExchangeDetailsRecord;
@ -385,7 +374,7 @@ export async function updateExchangeFromUrl(
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportExchangeUpdateError(ws, baseUrl, e);
return await guardOperationException(
() => updateExchangeFromUrlImpl(ws, baseUrl, acceptedFormat, forceNow),
() => updateExchangeFromUrlImpl(ws, baseUrl, options),
onOpErr,
);
}
@ -409,7 +398,7 @@ async function provideExchangeRecord(
const r: ExchangeRecord = {
permanent: true,
baseUrl: baseUrl,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
detailsPointer: undefined,
lastUpdate: undefined,
nextUpdate: AbsoluteTime.toTimestamp(now),
@ -552,12 +541,15 @@ export async function downloadTosFromAcceptedFormat(
async function updateExchangeFromUrlImpl(
ws: InternalWalletState,
baseUrl: string,
acceptedFormat?: string[],
forceNow = false,
options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<{
exchange: ExchangeRecord;
exchangeDetails: ExchangeDetailsRecord;
}> {
const forceNow = options.forceNow ?? false;
logger.info(`updating exchange info for ${baseUrl}, forced: ${forceNow}`);
const now = AbsoluteTime.now();
baseUrl = canonicalizeBaseUrl(baseUrl);
@ -577,11 +569,7 @@ async function updateExchangeFromUrlImpl(
return { exchange, exchangeDetails };
}
if (forceNow) {
await resetExchangeUpdateRetry(ws, baseUrl);
} else {
await incrementExchangeUpdateRetry(ws, baseUrl);
}
await setupExchangeUpdateRetry(ws, baseUrl, { reset: forceNow });
logger.info("updating exchange /keys info");
@ -617,7 +605,7 @@ async function updateExchangeFromUrlImpl(
ws,
baseUrl,
timeout,
acceptedFormat,
["text/plain"],
);
const tosHasBeenAccepted =
exchangeDetails?.termsOfServiceAcceptedEtag === tosDownload.tosEtag;

View File

@ -97,7 +97,7 @@ import {
import { GetReadWriteAccess } from "../util/query.js";
import {
getRetryDuration,
initRetryInfo,
resetRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
@ -428,8 +428,8 @@ async function recordConfirmPay(
proposalId: proposal.proposalId,
lastPayError: undefined,
lastRefundStatusError: undefined,
payRetryInfo: initRetryInfo(),
refundStatusRetryInfo: initRetryInfo(),
payRetryInfo: resetRetryInfo(),
refundStatusRetryInfo: resetRetryInfo(),
refundQueryRequested: false,
timestampFirstSuccessfulPay: undefined,
autoRefundDeadline: undefined,
@ -453,7 +453,7 @@ async function recordConfirmPay(
if (p) {
p.proposalStatus = ProposalStatus.Accepted;
delete p.lastError;
p.retryInfo = initRetryInfo();
delete p.retryInfo;
await tx.proposals.put(p);
}
await tx.purchases.put(t);
@ -491,9 +491,12 @@ async function reportProposalError(
ws.notify({ type: NotificationType.ProposalOperationError, error: err });
}
async function incrementProposalRetry(
async function setupProposalRetry(
ws: InternalWalletState,
proposalId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({ proposals: x.proposals }))
@ -502,47 +505,37 @@ async function incrementProposalRetry(
if (!pr) {
return;
}
if (!pr.retryInfo) {
return;
if (options.reset) {
pr.retryInfo = resetRetryInfo();
} else {
pr.retryInfo.retryCounter++;
updateRetryInfoTimeout(pr.retryInfo);
pr.retryInfo = RetryInfo.increment(pr.retryInfo);
}
delete pr.lastError;
await tx.proposals.put(pr);
});
}
async function resetPurchasePayRetry(
async function setupPurchasePayRetry(
ws: InternalWalletState,
proposalId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({ purchases: x.purchases }))
.runReadWrite(async (tx) => {
const p = await tx.purchases.get(proposalId);
if (p) {
p.payRetryInfo = initRetryInfo();
delete p.lastPayError;
await tx.purchases.put(p);
}
});
}
async function incrementPurchasePayRetry(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
await ws.db
.mktx((x) => ({ purchases: x.purchases }))
.runReadWrite(async (tx) => {
const pr = await tx.purchases.get(proposalId);
if (!pr) {
if (!p) {
return;
}
pr.payRetryInfo = RetryInfo.increment(pr.payRetryInfo);
delete pr.lastPayError;
await tx.purchases.put(pr);
if (options.reset) {
p.payRetryInfo = resetRetryInfo();
} else {
p.payRetryInfo = RetryInfo.increment(p.payRetryInfo);
}
delete p.lastPayError;
await tx.purchases.put(p);
});
}
@ -572,32 +565,18 @@ async function reportPurchasePayError(
export async function processDownloadProposal(
ws: InternalWalletState,
proposalId: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const onOpErr = (err: TalerErrorDetail): Promise<void> =>
reportProposalError(ws, proposalId, err);
await guardOperationException(
() => processDownloadProposalImpl(ws, proposalId, forceNow),
() => processDownloadProposalImpl(ws, proposalId, options),
onOpErr,
);
}
async function resetDownloadProposalRetry(
ws: InternalWalletState,
proposalId: string,
): Promise<void> {
await ws.db
.mktx((x) => ({ proposals: x.proposals }))
.runReadWrite(async (tx) => {
const p = await tx.proposals.get(proposalId);
if (p) {
p.retryInfo = initRetryInfo();
delete p.lastError;
await tx.proposals.put(p);
}
});
}
async function failProposalPermanently(
ws: InternalWalletState,
proposalId: string,
@ -678,8 +657,11 @@ export function extractContractData(
async function processDownloadProposalImpl(
ws: InternalWalletState,
proposalId: string,
forceNow: boolean,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
const proposal = await ws.db
.mktx((x) => ({ proposals: x.proposals }))
.runReadOnly(async (tx) => {
@ -694,11 +676,7 @@ async function processDownloadProposalImpl(
return;
}
if (forceNow) {
await resetDownloadProposalRetry(ws, proposalId);
} else {
await incrementProposalRetry(ws, proposalId);
}
await setupProposalRetry(ws, proposalId, { reset: forceNow });
const orderClaimUrl = new URL(
`orders/${proposal.orderId}/claim`,
@ -946,7 +924,7 @@ async function startDownloadProposal(
proposalId: proposalId,
proposalStatus: ProposalStatus.Downloading,
repurchaseProposalId: undefined,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
lastError: undefined,
downloadSessionId: sessionId,
};
@ -994,7 +972,7 @@ async function storeFirstPaySuccess(
purchase.paymentSubmitPending = false;
purchase.lastPayError = undefined;
purchase.lastSessionId = sessionId;
purchase.payRetryInfo = initRetryInfo();
purchase.payRetryInfo = resetRetryInfo();
purchase.merchantPaySig = paySig;
if (isFirst) {
const protoAr = purchase.download.contractData.autoRefund;
@ -1002,7 +980,7 @@ async function storeFirstPaySuccess(
const ar = Duration.fromTalerProtocolDuration(protoAr);
logger.info("auto_refund present");
purchase.refundQueryRequested = true;
purchase.refundStatusRetryInfo = initRetryInfo();
purchase.refundStatusRetryInfo = resetRetryInfo();
purchase.lastRefundStatusError = undefined;
purchase.autoRefundDeadline = AbsoluteTime.toTimestamp(
AbsoluteTime.addDuration(AbsoluteTime.now(), ar),
@ -1033,7 +1011,7 @@ async function storePayReplaySuccess(
}
purchase.paymentSubmitPending = false;
purchase.lastPayError = undefined;
purchase.payRetryInfo = initRetryInfo();
purchase.payRetryInfo = resetRetryInfo();
purchase.lastSessionId = sessionId;
await tx.purchases.put(purchase);
});
@ -1289,7 +1267,7 @@ export async function checkPaymentByProposalId(
p.paymentSubmitPending = true;
await tx.purchases.put(p);
});
const r = await processPurchasePay(ws, proposalId, true);
const r = await processPurchasePay(ws, proposalId, { forceNow: true });
if (r.type !== ConfirmPayResultType.Done) {
throw Error("submitting pay failed");
}
@ -1466,7 +1444,7 @@ export async function confirmPay(
if (existingPurchase) {
logger.trace("confirmPay: submitting payment for existing purchase");
return await processPurchasePay(ws, proposalId, true);
return await processPurchasePay(ws, proposalId, { forceNow: true });
}
logger.trace("confirmPay: purchase record does not exist yet");
@ -1516,18 +1494,20 @@ export async function confirmPay(
sessionIdOverride,
);
return await processPurchasePay(ws, proposalId, true);
return await processPurchasePay(ws, proposalId, { forceNow: true });
}
export async function processPurchasePay(
ws: InternalWalletState,
proposalId: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<ConfirmPayResult> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportPurchasePayError(ws, proposalId, e);
return await guardOperationException(
() => processPurchasePayImpl(ws, proposalId, forceNow),
() => processPurchasePayImpl(ws, proposalId, options),
onOpErr,
);
}
@ -1535,8 +1515,11 @@ export async function processPurchasePay(
async function processPurchasePayImpl(
ws: InternalWalletState,
proposalId: string,
forceNow: boolean,
options: {
forceNow?: boolean;
} = {},
): Promise<ConfirmPayResult> {
const forceNow = options.forceNow ?? false;
const purchase = await ws.db
.mktx((x) => ({ purchases: x.purchases }))
.runReadOnly(async (tx) => {
@ -1559,11 +1542,7 @@ async function processPurchasePayImpl(
lastError: purchase.lastPayError,
};
}
if (forceNow) {
await resetPurchasePayRetry(ws, proposalId);
} else {
await incrementPurchasePayRetry(ws, proposalId);
}
await setupPurchasePayRetry(ws, proposalId, { reset: forceNow });
logger.trace(`processing purchase pay ${proposalId}`);
const sessionId = purchase.lastSessionId;

View File

@ -51,18 +51,17 @@ async function gatherExchangePending(
resp.pendingOperations.push({
type: PendingTaskType.ExchangeUpdate,
givesLifeness: false,
timestampDue: e.lastError
? e.retryInfo.nextRetry
: AbsoluteTime.fromTimestamp(e.nextUpdate),
timestampDue:
e.retryInfo?.nextRetry ?? AbsoluteTime.fromTimestamp(e.nextUpdate),
exchangeBaseUrl: e.baseUrl,
lastError: e.lastError,
});
resp.pendingOperations.push({
type: PendingTaskType.ExchangeCheckRefresh,
timestampDue: e.lastError
? e.retryInfo.nextRetry
: AbsoluteTime.fromTimestamp(e.nextRefreshCheck),
timestampDue:
e.retryInfo?.nextRetry ??
AbsoluteTime.fromTimestamp(e.nextRefreshCheck),
givesLifeness: false,
exchangeBaseUrl: e.baseUrl,
});

View File

@ -48,7 +48,11 @@ import {
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { Logger, URL } from "@gnu-taler/taler-util";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import {
resetRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import { createRefreshGroup, processRefreshGroup } from "./refresh.js";
import { getReserveRequestTimeout, processReserve } from "./reserves.js";
import { InternalWalletState } from "../internal-wallet-state.js";
@ -57,10 +61,36 @@ import { guardOperationException } from "./common.js";
const logger = new Logger("operations/recoup.ts");
async function incrementRecoupRetry(
async function setupRecoupRetry(
ws: InternalWalletState,
recoupGroupId: string,
err: TalerErrorDetail | undefined,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
recoupGroups: x.recoupGroups,
}))
.runReadWrite(async (tx) => {
const r = await tx.recoupGroups.get(recoupGroupId);
if (!r) {
return;
}
if (options.reset) {
r.retryInfo = resetRetryInfo();
} else {
r.retryInfo = RetryInfo.increment(r.retryInfo);
}
delete r.lastError;
await tx.recoupGroups.put(r);
});
}
async function reportRecoupError(
ws: InternalWalletState,
recoupGroupId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({
@ -72,16 +102,14 @@ async function incrementRecoupRetry(
return;
}
if (!r.retryInfo) {
return;
logger.error(
"reporting error for inactive recoup group (no retry info)",
);
}
r.retryInfo.retryCounter++;
updateRetryInfoTimeout(r.retryInfo);
r.lastError = err;
await tx.recoupGroups.put(r);
});
if (err) {
ws.notify({ type: NotificationType.RecoupOperationError, error: err });
}
ws.notify({ type: NotificationType.RecoupOperationError, error: err });
}
async function putGroupAsFinished(
@ -111,7 +139,7 @@ async function putGroupAsFinished(
if (allFinished) {
logger.info("all recoups of recoup group are finished");
recoupGroup.timestampFinished = TalerProtocolTimestamp.now();
recoupGroup.retryInfo = initRetryInfo();
recoupGroup.retryInfo = resetRetryInfo();
recoupGroup.lastError = undefined;
if (recoupGroup.scheduleRefreshCoins.length > 0) {
const refreshGroupId = await createRefreshGroup(
@ -250,7 +278,7 @@ async function recoupWithdrawCoin(
const currency = updatedCoin.currentAmount.currency;
updatedCoin.currentAmount = Amounts.getZero(currency);
updatedReserve.reserveStatus = ReserveRecordStatus.QueryingStatus;
updatedReserve.retryInfo = initRetryInfo();
updatedReserve.retryInfo = resetRetryInfo();
updatedReserve.operationStatus = OperationStatus.Pending;
await tx.coins.put(updatedCoin);
await tx.reserves.put(updatedReserve);
@ -361,33 +389,18 @@ async function recoupRefreshCoin(
});
}
async function resetRecoupGroupRetry(
ws: InternalWalletState,
recoupGroupId: string,
): Promise<void> {
await ws.db
.mktx((x) => ({
recoupGroups: x.recoupGroups,
}))
.runReadWrite(async (tx) => {
const x = await tx.recoupGroups.get(recoupGroupId);
if (x) {
x.retryInfo = initRetryInfo();
await tx.recoupGroups.put(x);
}
});
}
export async function processRecoupGroup(
ws: InternalWalletState,
recoupGroupId: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
await ws.memoProcessRecoup.memo(recoupGroupId, async () => {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
incrementRecoupRetry(ws, recoupGroupId, e);
reportRecoupError(ws, recoupGroupId, e);
return await guardOperationException(
async () => await processRecoupGroupImpl(ws, recoupGroupId, forceNow),
async () => await processRecoupGroupImpl(ws, recoupGroupId, options),
onOpErr,
);
});
@ -396,11 +409,12 @@ export async function processRecoupGroup(
async function processRecoupGroupImpl(
ws: InternalWalletState,
recoupGroupId: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
if (forceNow) {
await resetRecoupGroupRetry(ws, recoupGroupId);
}
const forceNow = options.forceNow ?? false;
await setupRecoupRetry(ws, recoupGroupId, { reset: forceNow });
const recoupGroup = await ws.db
.mktx((x) => ({
recoupGroups: x.recoupGroups,
@ -444,7 +458,7 @@ async function processRecoupGroupImpl(
}
for (const r of reserveSet.values()) {
processReserve(ws, r, true).catch((e) => {
processReserve(ws, r, { forceNow: true }).catch((e) => {
logger.error(`processing reserve ${r} after recoup failed`);
});
}
@ -468,7 +482,7 @@ export async function createRecoupGroup(
lastError: undefined,
timestampFinished: undefined,
timestampStarted: TalerProtocolTimestamp.now(),
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
recoupFinishedPerCoin: coinPubs.map(() => false),
// Will be populated later
oldAmountPerCoin: [],

View File

@ -53,7 +53,11 @@ import {
} from "../util/http.js";
import { checkDbInvariant } from "../util/invariants.js";
import { Logger } from "@gnu-taler/taler-util";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import {
resetRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import {
Duration,
durationFromSpec,
@ -130,11 +134,11 @@ function updateGroupStatus(rg: RefreshGroupRecord): void {
if (allDone) {
if (anyFrozen) {
rg.frozen = true;
rg.retryInfo = initRetryInfo();
rg.retryInfo = resetRetryInfo();
} else {
rg.timestampFinished = AbsoluteTime.toTimestamp(AbsoluteTime.now());
rg.operationStatus = OperationStatus.Finished;
rg.retryInfo = initRetryInfo();
rg.retryInfo = resetRetryInfo();
}
}
}
@ -712,7 +716,33 @@ async function refreshReveal(
});
}
async function incrementRefreshRetry(
async function setupRefreshRetry(
ws: InternalWalletState,
refreshGroupId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
refreshGroups: x.refreshGroups,
}))
.runReadWrite(async (tx) => {
const r = await tx.refreshGroups.get(refreshGroupId);
if (!r) {
return;
}
if (options.reset) {
r.retryInfo = resetRetryInfo();
} else {
r.retryInfo = RetryInfo.increment(r.retryInfo);
}
delete r.lastError;
await tx.refreshGroups.put(r);
});
}
async function reportRefreshError(
ws: InternalWalletState,
refreshGroupId: string,
err: TalerErrorDetail | undefined,
@ -727,10 +757,10 @@ async function incrementRefreshRetry(
return;
}
if (!r.retryInfo) {
return;
logger.error(
"reported error for inactive refresh group (no retry info)",
);
}
r.retryInfo.retryCounter++;
updateRetryInfoTimeout(r.retryInfo);
r.lastError = err;
await tx.refreshGroups.put(r);
});
@ -745,44 +775,31 @@ async function incrementRefreshRetry(
export async function processRefreshGroup(
ws: InternalWalletState,
refreshGroupId: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
await ws.memoProcessRefresh.memo(refreshGroupId, async () => {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
incrementRefreshRetry(ws, refreshGroupId, e);
reportRefreshError(ws, refreshGroupId, e);
return await guardOperationException(
async () => await processRefreshGroupImpl(ws, refreshGroupId, forceNow),
async () => await processRefreshGroupImpl(ws, refreshGroupId, options),
onOpErr,
);
});
}
async function resetRefreshGroupRetry(
ws: InternalWalletState,
refreshGroupId: string,
): Promise<void> {
await ws.db
.mktx((x) => ({
refreshGroups: x.refreshGroups,
}))
.runReadWrite(async (tx) => {
const x = await tx.refreshGroups.get(refreshGroupId);
if (x) {
x.retryInfo = initRetryInfo();
await tx.refreshGroups.put(x);
}
});
}
async function processRefreshGroupImpl(
ws: InternalWalletState,
refreshGroupId: string,
forceNow: boolean,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
logger.info(`processing refresh group ${refreshGroupId}`);
if (forceNow) {
await resetRefreshGroupRetry(ws, refreshGroupId);
}
await setupRefreshRetry(ws, refreshGroupId, { reset: forceNow });
const refreshGroup = await ws.db
.mktx((x) => ({
refreshGroups: x.refreshGroups,
@ -939,7 +956,7 @@ export async function createRefreshGroup(
reason,
refreshGroupId,
refreshSessionPerCoin: oldCoinPubs.map(() => undefined),
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
inputPerCoin,
estimatedOutputPerCoin,
timestampCreated: TalerProtocolTimestamp.now(),
@ -994,7 +1011,9 @@ export async function autoRefresh(
exchangeBaseUrl: string,
): Promise<void> {
logger.info(`doing auto-refresh check for '${exchangeBaseUrl}'`);
await updateExchangeFromUrl(ws, exchangeBaseUrl, undefined, true);
await updateExchangeFromUrl(ws, exchangeBaseUrl, {
forceNow: true,
});
let minCheckThreshold = AbsoluteTime.addDuration(
AbsoluteTime.now(),
durationFromSpec({ days: 1 }),

View File

@ -58,37 +58,54 @@ import {
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import {
resetRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { guardOperationException } from "./common.js";
const logger = new Logger("refund.ts");
async function resetPurchaseQueryRefundRetry(
/**
* Retry querying and applying refunds for an order later.
*/
async function setupPurchaseQueryRefundRetry(
ws: InternalWalletState,
proposalId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
purchases: x.purchases,
}))
.runReadWrite(async (tx) => {
const x = await tx.purchases.get(proposalId);
if (x) {
x.refundStatusRetryInfo = initRetryInfo();
await tx.purchases.put(x);
const pr = await tx.purchases.get(proposalId);
if (!pr) {
return;
}
if (options.reset) {
pr.refundStatusRetryInfo = resetRetryInfo();
} else {
pr.refundStatusRetryInfo = RetryInfo.increment(
pr.refundStatusRetryInfo,
);
}
await tx.purchases.put(pr);
});
}
/**
* Retry querying and applying refunds for an order later.
* Report an error that happending when querying for a purchase's refund.
*/
async function incrementPurchaseQueryRefundRetry(
async function reportPurchaseQueryRefundError(
ws: InternalWalletState,
proposalId: string,
err: TalerErrorDetail | undefined,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({
@ -100,10 +117,10 @@ async function incrementPurchaseQueryRefundRetry(
return;
}
if (!pr.refundStatusRetryInfo) {
return;
logger.error(
"reported error on an inactive purchase (no refund status retry info)",
);
}
pr.refundStatusRetryInfo.retryCounter++;
updateRetryInfoTimeout(pr.refundStatusRetryInfo);
pr.lastRefundStatusError = err;
await tx.purchases.put(pr);
});
@ -425,7 +442,7 @@ async function acceptRefunds(
if (queryDone) {
p.timestampLastRefundStatus = now;
p.lastRefundStatusError = undefined;
p.refundStatusRetryInfo = initRetryInfo();
p.refundStatusRetryInfo = resetRetryInfo();
p.refundQueryRequested = false;
if (p.abortStatus === AbortStatus.AbortRefund) {
p.abortStatus = AbortStatus.AbortFinished;
@ -506,7 +523,7 @@ export async function applyRefund(
}
p.refundQueryRequested = true;
p.lastRefundStatusError = undefined;
p.refundStatusRetryInfo = initRetryInfo();
p.refundStatusRetryInfo = resetRetryInfo();
await tx.purchases.put(p);
return true;
});
@ -515,7 +532,10 @@ export async function applyRefund(
ws.notify({
type: NotificationType.RefundStarted,
});
await processPurchaseQueryRefundImpl(ws, proposalId, true, false);
await processPurchaseQueryRefundImpl(ws, proposalId, {
forceNow: true,
waitForAutoRefund: false,
});
}
purchase = await ws.db
@ -590,12 +610,15 @@ export async function applyRefund(
export async function processPurchaseQueryRefund(
ws: InternalWalletState,
proposalId: string,
forceNow = false,
options: {
forceNow?: boolean;
waitForAutoRefund?: boolean;
} = {},
): Promise<void> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
incrementPurchaseQueryRefundRetry(ws, proposalId, e);
reportPurchaseQueryRefundError(ws, proposalId, e);
await guardOperationException(
() => processPurchaseQueryRefundImpl(ws, proposalId, forceNow, true),
() => processPurchaseQueryRefundImpl(ws, proposalId, options),
onOpErr,
);
}
@ -603,12 +626,14 @@ export async function processPurchaseQueryRefund(
async function processPurchaseQueryRefundImpl(
ws: InternalWalletState,
proposalId: string,
forceNow: boolean,
waitForAutoRefund: boolean,
options: {
forceNow?: boolean;
waitForAutoRefund?: boolean;
} = {},
): Promise<void> {
if (forceNow) {
await resetPurchaseQueryRefundRetry(ws, proposalId);
}
const forceNow = options.forceNow ?? false;
const waitForAutoRefund = options.waitForAutoRefund ?? false;
await setupPurchaseQueryRefundRetry(ws, proposalId, { reset: forceNow });
const purchase = await ws.db
.mktx((x) => ({
purchases: x.purchases,
@ -650,7 +675,7 @@ async function processPurchaseQueryRefundImpl(
codecForMerchantOrderStatusPaid(),
);
if (!orderStatus.refunded) {
incrementPurchaseQueryRefundRetry(ws, proposalId, undefined);
// Wait for retry ...
return;
}
}
@ -666,11 +691,6 @@ async function processPurchaseQueryRefundImpl(
h_contract: purchase.download.contractData.contractTermsHash,
});
logger.trace(
"got json",
JSON.stringify(await request.json(), undefined, 2),
);
const refundResponse = await readSuccessResponseJsonOrThrow(
request,
codecForMerchantOrderRefundPickupResponse(),
@ -777,10 +797,12 @@ export async function abortFailedPayWithRefund(
purchase.paymentSubmitPending = false;
purchase.abortStatus = AbortStatus.AbortRefund;
purchase.lastPayError = undefined;
purchase.payRetryInfo = initRetryInfo();
purchase.payRetryInfo = resetRetryInfo();
await tx.purchases.put(purchase);
});
processPurchaseQueryRefund(ws, proposalId, true).catch((e) => {
processPurchaseQueryRefund(ws, proposalId, {
forceNow: true,
}).catch((e) => {
logger.trace(`error during refund processing after abort pay: ${e}`);
});
}

View File

@ -57,7 +57,8 @@ import {
import { GetReadOnlyAccess } from "../util/query.js";
import {
getRetryDuration,
initRetryInfo,
resetRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import {
@ -79,34 +80,15 @@ import { guardOperationException } from "./common.js";
const logger = new Logger("taler-wallet-core:reserves.ts");
/**
* Reset the retry counter for the reserve
* and reset the last error.
* Set up the reserve's retry timeout in preparation for
* processing the reserve.
*/
async function resetReserveRetry(
ws: InternalWalletState,
reservePub: string,
): Promise<void> {
await ws.db
.mktx((x) => ({
reserves: x.reserves,
}))
.runReadWrite(async (tx) => {
const x = await tx.reserves.get(reservePub);
if (x) {
x.retryInfo = initRetryInfo();
delete x.lastError;
await tx.reserves.put(x);
}
});
}
/**
* Increment the retry counter for the reserve and
* reset the last eror.
*/
async function incrementReserveRetry(
async function setupReserveRetry(
ws: InternalWalletState,
reservePub: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
@ -117,11 +99,10 @@ async function incrementReserveRetry(
if (!r) {
return;
}
if (!r.retryInfo) {
r.retryInfo = initRetryInfo();
if (options.reset) {
r.retryInfo = resetRetryInfo();
} else {
r.retryInfo.retryCounter++;
updateRetryInfoTimeout(r.retryInfo);
r.retryInfo = RetryInfo.increment(r.retryInfo);
}
delete r.lastError;
await tx.reserves.put(r);
@ -216,7 +197,7 @@ export async function createReserve(
timestampReserveInfoPosted: undefined,
bankInfo,
reserveStatus,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
lastError: undefined,
currency: req.amount.currency,
operationStatus: OperationStatus.Pending,
@ -288,7 +269,7 @@ export async function createReserve(
// Asynchronously process the reserve, but return
// to the caller already.
processReserve(ws, resp.reservePub, true).catch((e) => {
processReserve(ws, resp.reservePub, { forceNow: true }).catch((e) => {
logger.error("Processing reserve (after createReserve) failed:", e);
});
@ -316,14 +297,14 @@ export async function forceQueryReserve(
case ReserveRecordStatus.Dormant:
reserve.reserveStatus = ReserveRecordStatus.QueryingStatus;
reserve.operationStatus = OperationStatus.Pending;
reserve.retryInfo = initRetryInfo();
reserve.retryInfo = resetRetryInfo();
break;
default:
break;
}
await tx.reserves.put(reserve);
});
await processReserve(ws, reservePub, true);
await processReserve(ws, reservePub, { forceNow: true });
}
/**
@ -336,13 +317,15 @@ export async function forceQueryReserve(
export async function processReserve(
ws: InternalWalletState,
reservePub: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
return ws.memoProcessReserve.memo(reservePub, async () => {
const onOpError = (err: TalerErrorDetail): Promise<void> =>
reportReserveError(ws, reservePub, err);
await guardOperationException(
() => processReserveImpl(ws, reservePub, forceNow),
() => processReserveImpl(ws, reservePub, options),
onOpError,
);
});
@ -409,7 +392,7 @@ async function registerReserveWithBank(
if (!r.bankInfo) {
throw Error("invariant failed");
}
r.retryInfo = initRetryInfo();
r.retryInfo = resetRetryInfo();
await tx.reserves.put(r);
});
ws.notify({ type: NotificationType.ReserveRegisteredWithBank });
@ -476,7 +459,7 @@ async function processReserveBankStatus(
r.timestampBankConfirmed = now;
r.reserveStatus = ReserveRecordStatus.BankAborted;
r.operationStatus = OperationStatus.Finished;
r.retryInfo = initRetryInfo();
r.retryInfo = resetRetryInfo();
await tx.reserves.put(r);
});
return;
@ -513,7 +496,7 @@ async function processReserveBankStatus(
r.timestampBankConfirmed = now;
r.reserveStatus = ReserveRecordStatus.QueryingStatus;
r.operationStatus = OperationStatus.Pending;
r.retryInfo = initRetryInfo();
r.retryInfo = resetRetryInfo();
} else {
switch (r.reserveStatus) {
case ReserveRecordStatus.WaitConfirmBank:
@ -684,7 +667,7 @@ async function updateReserve(
reservePub: reserve.reservePub,
rawWithdrawalAmount: remainingAmount,
timestampStart: AbsoluteTime.toTimestamp(AbsoluteTime.now()),
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
lastError: undefined,
denomsSel: denomSelectionInfoToState(denomSelInfo),
secretSeed: encodeCrock(getRandomBytes(64)),
@ -717,8 +700,12 @@ async function updateReserve(
async function processReserveImpl(
ws: InternalWalletState,
reservePub: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
await setupReserveRetry(ws, reservePub, { reset: forceNow });
const reserve = await ws.db
.mktx((x) => ({
reserves: x.reserves,
@ -732,27 +719,17 @@ async function processReserveImpl(
);
return;
}
if (forceNow) {
await resetReserveRetry(ws, reservePub);
} else if (
reserve.retryInfo &&
!AbsoluteTime.isExpired(reserve.retryInfo.nextRetry)
) {
logger.trace("processReserve retry not due yet");
return;
}
await incrementReserveRetry(ws, reservePub);
logger.trace(
`Processing reserve ${reservePub} with status ${reserve.reserveStatus}`,
);
switch (reserve.reserveStatus) {
case ReserveRecordStatus.RegisteringBank:
await processReserveBankStatus(ws, reservePub);
return await processReserveImpl(ws, reservePub, true);
return await processReserveImpl(ws, reservePub, { forceNow: true });
case ReserveRecordStatus.QueryingStatus:
const res = await updateReserve(ws, reservePub);
if (res.ready) {
return await processReserveImpl(ws, reservePub, true);
return await processReserveImpl(ws, reservePub, { forceNow: true });
}
break;
case ReserveRecordStatus.Dormant:

View File

@ -43,7 +43,11 @@ import {
} from "../db.js";
import { j2s } from "@gnu-taler/taler-util";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import {
resetRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import { makeErrorDetail } from "../errors.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import { InternalWalletState } from "../internal-wallet-state.js";
@ -127,7 +131,7 @@ export async function prepareTip(
createdTimestamp: TalerProtocolTimestamp.now(),
merchantTipId: res.merchantTipId,
tipAmountEffective: selectedDenoms.totalCoinValue,
retryInfo: initRetryInfo(),
retryInfo: resetRetryInfo(),
lastError: undefined,
denomsSel: denomSelectionInfoToState(selectedDenoms),
pickedUpTimestamp: undefined,
@ -157,10 +161,10 @@ export async function prepareTip(
return tipStatus;
}
async function incrementTipRetry(
async function reportTipError(
ws: InternalWalletState,
walletTipId: string,
err: TalerErrorDetail | undefined,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({
@ -172,10 +176,8 @@ async function incrementTipRetry(
return;
}
if (!t.retryInfo) {
return;
logger.reportBreak();
}
t.retryInfo.retryCounter++;
updateRetryInfoTimeout(t.retryInfo);
t.lastError = err;
await tx.tips.put(t);
});
@ -184,15 +186,43 @@ async function incrementTipRetry(
}
}
async function setupTipRetry(
ws: InternalWalletState,
walletTipId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
tips: x.tips,
}))
.runReadWrite(async (tx) => {
const t = await tx.tips.get(walletTipId);
if (!t) {
return;
}
if (options.reset) {
t.retryInfo = resetRetryInfo();
} else {
t.retryInfo = RetryInfo.increment(t.retryInfo);
}
delete t.lastError;
await tx.tips.put(t);
});
}
export async function processTip(
ws: InternalWalletState,
tipId: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
incrementTipRetry(ws, tipId, e);
reportTipError(ws, tipId, e);
await guardOperationException(
() => processTipImpl(ws, tipId, forceNow),
() => processTipImpl(ws, tipId, options),
onOpErr,
);
}
@ -208,7 +238,7 @@ async function resetTipRetry(
.runReadWrite(async (tx) => {
const x = await tx.tips.get(tipId);
if (x) {
x.retryInfo = initRetryInfo();
x.retryInfo = resetRetryInfo();
await tx.tips.put(x);
}
});
@ -217,8 +247,11 @@ async function resetTipRetry(
async function processTipImpl(
ws: InternalWalletState,
walletTipId: string,
forceNow: boolean,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
if (forceNow) {
await resetTipRetry(ws, walletTipId);
}
@ -293,12 +326,13 @@ async function processTipImpl(
merchantResp.status === 424)
) {
logger.trace(`got transient tip error`);
// FIXME: wrap in another error code that indicates a transient error
const err = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
getHttpResponseErrorDetails(merchantResp),
"tip pickup failed (transient)",
);
await incrementTipRetry(ws, tipRecord.walletTipId, err);
await reportTipError(ws, tipRecord.walletTipId, err);
// FIXME: Maybe we want to signal to the caller that the transient error happened?
return;
}
@ -397,7 +431,7 @@ async function processTipImpl(
}
tr.pickedUpTimestamp = TalerProtocolTimestamp.now();
tr.lastError = undefined;
tr.retryInfo = initRetryInfo();
tr.retryInfo = resetRetryInfo();
await tx.tips.put(tr);
for (const cr of newCoinRecords) {
await tx.coins.put(cr);

View File

@ -68,7 +68,11 @@ import {
HttpRequestLibrary,
readSuccessResponseJsonOrThrow,
} from "../util/http.js";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
import {
resetRetryInfo,
RetryInfo,
updateRetryInfoTimeout,
} from "../util/retries.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
@ -792,10 +796,12 @@ export async function updateWithdrawalDenoms(
}
}
async function incrementWithdrawalRetry(
async function setupWithdrawalRetry(
ws: InternalWalletState,
withdrawalGroupId: string,
err: TalerErrorDetail | undefined,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
@ -804,56 +810,61 @@ async function incrementWithdrawalRetry(
if (!wsr) {
return;
}
wsr.retryInfo.retryCounter++;
updateRetryInfoTimeout(wsr.retryInfo);
if (options.reset) {
wsr.retryInfo = resetRetryInfo();
} else {
wsr.retryInfo = RetryInfo.increment(wsr.retryInfo);
}
await tx.withdrawalGroups.put(wsr);
});
}
async function reportWithdrawalError(
ws: InternalWalletState,
withdrawalGroupId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
.runReadWrite(async (tx) => {
const wsr = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wsr) {
return;
}
if (!wsr.retryInfo) {
logger.reportBreak();
}
wsr.lastError = err;
await tx.withdrawalGroups.put(wsr);
});
if (err) {
ws.notify({ type: NotificationType.WithdrawOperationError, error: err });
}
ws.notify({ type: NotificationType.WithdrawOperationError, error: err });
}
export async function processWithdrawGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
forceNow = false,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
incrementWithdrawalRetry(ws, withdrawalGroupId, e);
reportWithdrawalError(ws, withdrawalGroupId, e);
await guardOperationException(
() => processWithdrawGroupImpl(ws, withdrawalGroupId, forceNow),
() => processWithdrawGroupImpl(ws, withdrawalGroupId, options),
onOpErr,
);
}
async function resetWithdrawalGroupRetry(
ws: InternalWalletState,
withdrawalGroupId: string,
): Promise<void> {
await ws.db
.mktx((x) => ({
withdrawalGroups: x.withdrawalGroups,
reserves: x.reserves,
}))
.runReadWrite(async (tx) => {
const x = await tx.withdrawalGroups.get(withdrawalGroupId);
if (x) {
x.retryInfo = initRetryInfo();
await tx.withdrawalGroups.put(x);
}
});
}
async function processWithdrawGroupImpl(
ws: InternalWalletState,
withdrawalGroupId: string,
forceNow: boolean,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
logger.trace("processing withdraw group", withdrawalGroupId);
if (forceNow) {
await resetWithdrawalGroupRetry(ws, withdrawalGroupId);
}
await setupWithdrawalRetry(ws, withdrawalGroupId, { reset: forceNow });
const withdrawalGroup = await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
.runReadOnly(async (tx) => {
@ -876,7 +887,7 @@ async function processWithdrawGroupImpl(
);
return;
}
return await ws.reserveOps.processReserve(ws, reservePub, forceNow);
return await ws.reserveOps.processReserve(ws, reservePub, { forceNow });
}
await ws.exchangeOps.updateExchangeFromUrl(
@ -948,7 +959,7 @@ async function processWithdrawGroupImpl(
wg.timestampFinish = TalerProtocolTimestamp.now();
wg.operationStatus = OperationStatus.Finished;
delete wg.lastError;
wg.retryInfo = initRetryInfo();
wg.retryInfo = resetRetryInfo();
}
await tx.withdrawalGroups.put(wg);

View File

@ -51,6 +51,8 @@ export interface HttpResponse {
bytes(): Promise<ArrayBuffer>;
}
export const DEFAULT_REQUEST_TIMEOUT_MS = 60000;
export interface HttpRequestOptions {
method?: "POST" | "PUT" | "GET";
headers?: { [name: string]: string };

View File

@ -82,7 +82,7 @@ export function getRetryDuration(
};
}
export function initRetryInfo(p: RetryPolicy = defaultRetryPolicy): RetryInfo {
export function resetRetryInfo(p: RetryPolicy = defaultRetryPolicy): RetryInfo {
const now = AbsoluteTime.now();
const info = {
firstTry: now,
@ -99,7 +99,7 @@ export namespace RetryInfo {
p: RetryPolicy = defaultRetryPolicy,
) {
if (!r) {
return initRetryInfo(p);
return resetRetryInfo(p);
}
const r2 = { ...r };
r2.retryCounter++;

View File

@ -238,51 +238,41 @@ async function processOnePendingOperation(
logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`);
switch (pending.type) {
case PendingTaskType.ExchangeUpdate:
await updateExchangeFromUrl(
ws,
pending.exchangeBaseUrl,
undefined,
await updateExchangeFromUrl(ws, pending.exchangeBaseUrl, {
forceNow,
);
});
break;
case PendingTaskType.Refresh:
await processRefreshGroup(ws, pending.refreshGroupId, forceNow);
await processRefreshGroup(ws, pending.refreshGroupId, { forceNow });
break;
case PendingTaskType.Reserve:
await processReserve(ws, pending.reservePub, forceNow);
await processReserve(ws, pending.reservePub, { forceNow });
break;
case PendingTaskType.Withdraw:
await processWithdrawGroup(ws, pending.withdrawalGroupId, forceNow);
await processWithdrawGroup(ws, pending.withdrawalGroupId, { forceNow });
break;
case PendingTaskType.ProposalDownload:
await processDownloadProposal(ws, pending.proposalId, forceNow);
await processDownloadProposal(ws, pending.proposalId, { forceNow });
break;
case PendingTaskType.TipPickup:
await processTip(ws, pending.tipId, forceNow);
await processTip(ws, pending.tipId, { forceNow });
break;
case PendingTaskType.Pay:
await processPurchasePay(ws, pending.proposalId, forceNow);
await processPurchasePay(ws, pending.proposalId, { forceNow });
break;
case PendingTaskType.RefundQuery:
await processPurchaseQueryRefund(ws, pending.proposalId, forceNow);
await processPurchaseQueryRefund(ws, pending.proposalId, { forceNow });
break;
case PendingTaskType.Recoup:
await processRecoupGroup(ws, pending.recoupGroupId, forceNow);
await processRecoupGroup(ws, pending.recoupGroupId, { forceNow });
break;
case PendingTaskType.ExchangeCheckRefresh:
await autoRefresh(ws, pending.exchangeBaseUrl);
break;
case PendingTaskType.Deposit: {
const cts = CancellationToken.create();
ws.taskCancellationSourceForDeposit = cts;
try {
await processDepositGroup(ws, pending.depositGroupId, {
cancellationToken: cts.token,
});
} finally {
cts.dispose();
delete ws.taskCancellationSourceForDeposit;
}
await processDepositGroup(ws, pending.depositGroupId, {
forceNow,
});
break;
}
case PendingTaskType.Backup:
@ -497,11 +487,8 @@ async function getExchangeTos(
exchangeBaseUrl: string,
acceptedFormat?: string[],
): Promise<GetExchangeTosResult> {
const { exchangeDetails } = await updateExchangeFromUrl(
ws,
exchangeBaseUrl,
acceptedFormat,
);
// FIXME: download ToS in acceptable format if passed!
const { exchangeDetails } = await updateExchangeFromUrl(ws, exchangeBaseUrl);
const content = exchangeDetails.termsOfServiceText;
const currentEtag = exchangeDetails.termsOfServiceLastEtag;
const contentType = exchangeDetails.termsOfServiceContentType;
@ -802,12 +789,9 @@ async function dispatchRequestInternal(
}
case "addExchange": {
const req = codecForAddExchangeRequest().decode(payload);
await updateExchangeFromUrl(
ws,
req.exchangeBaseUrl,
undefined,
req.forceUpdate,
);
await updateExchangeFromUrl(ws, req.exchangeBaseUrl, {
forceNow: req.forceUpdate,
});
return {};
}
case "listExchanges": {
@ -919,11 +903,11 @@ async function dispatchRequestInternal(
RefreshReason.Manual,
);
});
processRefreshGroup(ws, refreshGroupId.refreshGroupId, true).catch(
(x) => {
logger.error(x);
},
);
processRefreshGroup(ws, refreshGroupId.refreshGroupId, {
forceNow: true,
}).catch((x) => {
logger.error(x);
});
return {
refreshGroupId,
};
@ -1170,7 +1154,7 @@ class InternalWalletStateImpl implements InternalWalletState {
memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle();
memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoProcessDeposit: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
cryptoApi: TalerCryptoInterface;
cryptoDispatcher: CryptoDispatcher;

View File

@ -20,7 +20,7 @@
*/
import { Balance, parsePaytoUri } from "@gnu-taler/taler-util";
import { DepositFee } from "@gnu-taler/taler-wallet-core/src/operations/deposits";
import type { DepositGroupFees } from "@gnu-taler/taler-wallet-core/src/operations/deposits.js";
import { createExample } from "../test-utils.js";
import { View as TestedComponent } from "./DepositPage.js";
@ -30,7 +30,7 @@ export default {
argTypes: {},
};
async function alwaysReturnFeeToOne(): Promise<DepositFee> {
async function alwaysReturnFeeToOne(): Promise<DepositGroupFees> {
const fee = {
currency: "EUR",
value: 1,

View File

@ -21,7 +21,7 @@ import {
Balance,
PaytoUri,
} from "@gnu-taler/taler-util";
import { DepositFee } from "@gnu-taler/taler-wallet-core/src/operations/deposits";
import { DepositGroupFees } from "@gnu-taler/taler-wallet-core/src/operations/deposits";
import { Fragment, h, VNode } from "preact";
import { useEffect, useState } from "preact/hooks";
import { Loading } from "../components/Loading.js";
@ -68,7 +68,7 @@ export function DepositPage({ currency, onCancel, onSuccess }: Props): VNode {
async function getFeeForAmount(
p: PaytoUri,
a: AmountJson,
): Promise<DepositFee> {
): Promise<DepositGroupFees> {
const account = `payto://${p.targetType}/${p.targetPath}`;
const amount = Amounts.stringify(a);
return await wxApi.getFeeForDeposit(account, amount);
@ -106,7 +106,7 @@ interface ViewProps {
onCalculateFee: (
account: PaytoUri,
amount: AmountJson,
) => Promise<DepositFee>;
) => Promise<DepositGroupFees>;
}
type State = NoBalanceState | NoAccountsState | DepositState;
@ -135,12 +135,12 @@ export function useComponentState(
onCalculateFee: (
account: PaytoUri,
amount: AmountJson,
) => Promise<DepositFee>,
) => Promise<DepositGroupFees>,
): State {
const accountMap = createLabelsForBankAccount(accounts);
const [accountIdx, setAccountIdx] = useState(0);
const [amount, setAmount] = useState<number | undefined>(undefined);
const [fee, setFee] = useState<DepositFee | undefined>(undefined);
const [fee, setFee] = useState<DepositGroupFees | undefined>(undefined);
function updateAmount(num: number | undefined) {
setAmount(num);
setFee(undefined);

View File

@ -59,7 +59,7 @@ import {
RemoveBackupProviderRequest,
TalerError,
} from "@gnu-taler/taler-wallet-core";
import type { DepositFee } from "@gnu-taler/taler-wallet-core/src/operations/deposits";
import type { DepositGroupFees } from "@gnu-taler/taler-wallet-core/src/operations/deposits";
import type { ExchangeWithdrawDetails } from "@gnu-taler/taler-wallet-core/src/operations/withdraw";
import { platform, MessageFromBackend } from "./platform/api.js";
@ -143,7 +143,7 @@ export function resetDb(): Promise<void> {
export function getFeeForDeposit(
depositPaytoUri: string,
amount: AmountString,
): Promise<DepositFee> {
): Promise<DepositGroupFees> {
return callBackend("getFeeForDeposit", {
depositPaytoUri,
amount,