wallet-core: fix retryTransaction, improve tx/op identifier parsing/construction

This commit is contained in:
Florian Dold 2023-02-20 20:14:37 +01:00
parent 7bb81a008b
commit 3daa4dbb3f
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
14 changed files with 335 additions and 131 deletions

View File

@ -69,7 +69,7 @@ import {
StoreDescriptor,
StoreWithIndexes,
} from "./util/query.js";
import { RetryInfo, RetryTags } from "./util/retries.js";
import { RetryInfo, TaskIdentifiers } from "./util/retries.js";
/**
* This file contains the database schema of the Taler wallet together
@ -1945,7 +1945,7 @@ export interface OperationRetryRecord {
* Unique identifier for the operation. Typically of
* the format `${opType}-${opUniqueKey}`
*
* @see {@link RetryTags}
* @see {@link TaskIdentifiers}
*/
id: string;

View File

@ -99,7 +99,7 @@ import {
import {
OperationAttemptResult,
OperationAttemptResultType,
RetryTags,
TaskIdentifiers,
scheduleRetryInTx,
} from "../../util/retries.js";
import { addAttentionRequest, removeAttentionRequest } from "../attention.js";
@ -379,7 +379,7 @@ async function runBackupCycleForProvider(
logger.warn("backup provider not found anymore");
return;
}
const opId = RetryTags.forBackup(prov);
const opId = TaskIdentifiers.forBackup(prov);
await scheduleRetryInTx(ws, tx, opId);
prov.shouldRetryFreshProposal = true;
prov.state = {
@ -405,7 +405,7 @@ async function runBackupCycleForProvider(
logger.warn("backup provider not found anymore");
return;
}
const opId = RetryTags.forBackup(prov);
const opId = TaskIdentifiers.forBackup(prov);
await scheduleRetryInTx(ws, tx, opId);
prov.currentPaymentProposalId = result.proposalId;
prov.shouldRetryFreshProposal = false;
@ -479,7 +479,7 @@ async function runBackupCycleForProvider(
prov.lastBackupHash = encodeCrock(hash(backupEnc));
// FIXME: Allocate error code for this situation?
// FIXME: Add operation retry record!
const opId = RetryTags.forBackup(prov);
const opId = TaskIdentifiers.forBackup(prov);
await scheduleRetryInTx(ws, tx, opId);
prov.state = {
tag: BackupProviderStateTag.Retrying,
@ -920,7 +920,7 @@ export async function getBackupInfo(
.mktx((x) => [x.backupProviders, x.operationRetries])
.runReadOnly(async (tx) => {
return await tx.backupProviders.iter().mapAsync(async (bp) => {
const opId = RetryTags.forBackup(bp);
const opId = TaskIdentifiers.forBackup(bp);
const retryRecord = await tx.operationRetries.get(opId);
return {
provider: bp,

View File

@ -218,6 +218,23 @@ export async function storeOperationError(
});
}
export async function resetOperationTimeout(
ws: InternalWalletState,
pendingTaskId: string,
): Promise<void> {
await ws.db
.mktx((x) => [x.operationRetries])
.runReadWrite(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (retryRecord) {
// Note that we don't reset the lastError, it should still be visible
// while the retry runs.
retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
await tx.operationRetries.put(retryRecord);
}
});
}
export async function storeOperationPending(
ws: InternalWalletState,
pendingTaskId: string,

View File

@ -122,7 +122,6 @@ export async function processDepositGroup(
ws: InternalWalletState,
depositGroupId: string,
options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<OperationAttemptResult> {

View File

@ -73,7 +73,7 @@ import {
import {
OperationAttemptResult,
OperationAttemptResultType,
RetryTags,
TaskIdentifiers,
unwrapOperationHandlerResultOrThrow,
} from "../util/retries.js";
import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js";
@ -552,7 +552,7 @@ export async function updateExchangeFromUrl(
return unwrapOperationHandlerResultOrThrow(
await runOperationWithErrorReporting(
ws,
RetryTags.forExchangeUpdateFromUrl(canonUrl),
TaskIdentifiers.forExchangeUpdateFromUrl(canonUrl),
() => updateExchangeFromUrlHandler(ws, canonUrl, options),
),
);

View File

@ -95,7 +95,7 @@ import {
TalerError,
TalerProtocolViolationError,
} from "@gnu-taler/taler-util";
import { GetReadWriteAccess } from "../index.js";
import { GetReadWriteAccess, PendingTaskType } from "../index.js";
import {
EXCHANGE_COINS_LOCK,
InternalWalletState,
@ -119,8 +119,9 @@ import {
OperationAttemptResult,
OperationAttemptResultType,
RetryInfo,
RetryTags,
TaskIdentifiers,
scheduleRetry,
constructTaskIdentifier,
} from "../util/retries.js";
import {
makeTransactionId,
@ -360,7 +361,7 @@ export async function processDownloadProposal(
requestBody.token = proposal.claimToken;
}
const opId = RetryTags.forPay(proposal);
const opId = TaskIdentifiers.forPay(proposal);
const retryRecord = await ws.db
.mktx((x) => [x.operationRetries])
.runReadOnly(async (tx) => {
@ -1598,8 +1599,11 @@ export async function runPayForConfirmPay(
proposalId: string,
): Promise<ConfirmPayResult> {
logger.trace("processing proposal for confirmPay");
const opId = RetryTags.byPaymentProposalId(proposalId);
const res = await runOperationWithErrorReporting(ws, opId, async () => {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId,
});
const res = await runOperationWithErrorReporting(ws, taskId, async () => {
return await processPurchasePay(ws, proposalId, { forceNow: true });
});
logger.trace(`processPurchasePay response type ${res.type}`);
@ -1624,9 +1628,7 @@ export async function runPayForConfirmPay(
// We hide transient errors from the caller.
const opRetry = await ws.db
.mktx((x) => [x.operationRetries])
.runReadOnly(async (tx) =>
tx.operationRetries.get(RetryTags.byPaymentProposalId(proposalId)),
);
.runReadOnly(async (tx) => tx.operationRetries.get(taskId));
return {
type: ConfirmPayResultType.Pending,
lastError: opRetry?.lastError,
@ -1792,9 +1794,7 @@ export async function confirmPay(
export async function processPurchase(
ws: InternalWalletState,
proposalId: string,
options: {
forceNow?: boolean;
} = {},
options: Record<any, never> = {},
): Promise<OperationAttemptResult> {
const purchase = await ws.db
.mktx((x) => [x.purchases])
@ -1843,9 +1843,7 @@ export async function processPurchase(
export async function processPurchasePay(
ws: InternalWalletState,
proposalId: string,
options: {
forceNow?: boolean;
} = {},
options: unknown = {},
): Promise<OperationAttemptResult> {
const purchase = await ws.db
.mktx((x) => [x.purchases])
@ -1935,7 +1933,7 @@ export async function processPurchasePay(
handleInsufficientFunds(ws, proposalId, err).catch(async (e) => {
console.log("handling insufficient funds failed");
await scheduleRetry(ws, RetryTags.forPay(purchase), {
await scheduleRetry(ws, TaskIdentifiers.forPay(purchase), {
code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
when: AbsoluteTime.now(),
message: "unexpected exception",
@ -2830,7 +2828,10 @@ export async function abortPay(
proposalId: string,
cancelImmediately?: boolean,
): Promise<void> {
const opId = RetryTags.byPaymentProposalId(proposalId);
const opId = constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId,
});
await ws.db
.mktx((x) => [
x.purchases,

View File

@ -87,15 +87,17 @@ import { TalerError } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
makeTransactionId,
resetOperationTimeout,
runOperationWithErrorReporting,
spendCoins,
} from "../operations/common.js";
import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";
import { checkDbInvariant } from "../util/invariants.js";
import {
constructTaskIdentifier,
OperationAttemptResult,
OperationAttemptResultType,
RetryTags,
TaskIdentifiers,
} from "../util/retries.js";
import { getPeerPaymentBalanceDetailsInTx } from "./balance.js";
import { updateExchangeFromUrl } from "./exchanges.js";
@ -103,7 +105,10 @@ import { getTotalRefreshCost } from "./refresh.js";
import {
getExchangeWithdrawalInfo,
internalCreateWithdrawalGroup,
processWithdrawalGroup,
} from "./withdraw.js";
import { PendingTaskType } from "../pending-types.js";
import { stopLongpolling } from "./transactions.js";
const logger = new Logger("operations/peer-to-peer.ts");
@ -590,13 +595,14 @@ export async function initiatePeerPushPayment(
});
});
await runOperationWithErrorReporting(
ws,
RetryTags.byPeerPushPaymentInitiationPursePub(pursePair.pub),
async () => {
return await processPeerPushInitiation(ws, pursePair.pub);
},
);
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPushInitiation,
pursePub: pursePair.pub,
});
await runOperationWithErrorReporting(ws, taskId, async () => {
return await processPeerPushInitiation(ws, pursePair.pub);
});
return {
contractPriv: contractKeyPair.priv,
@ -951,7 +957,7 @@ export async function confirmPeerPushPayment(
await updateExchangeFromUrl(ws, peerInc.exchangeBaseUrl);
const retryTag = RetryTags.forPeerPushCredit(peerInc);
const retryTag = TaskIdentifiers.forPeerPushCredit(peerInc);
await runOperationWithErrorReporting(ws, retryTag, () =>
processPeerPushCredit(ws, req.peerPushPaymentIncomingId),
@ -1113,7 +1119,7 @@ export async function acceptIncomingPeerPullPayment(
await runOperationWithErrorReporting(
ws,
RetryTags.forPeerPullPaymentDebit(ppi),
TaskIdentifiers.forPeerPullPaymentDebit(ppi),
async () => {
return processPeerPullDebit(ws, ppi.peerPullPaymentIncomingId);
},
@ -1263,7 +1269,23 @@ export async function processPeerPullCredit(
}
if (pullIni.status === OperationStatus.Finished) {
logger.warn("peer pull payment initiation is already finished");
logger.warn(
"peer pull payment initiation is already finished, retrying withdrawal",
);
const withdrawalGroupId = pullIni.withdrawalGroupId;
if (withdrawalGroupId) {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId,
});
stopLongpolling(ws, taskId);
await resetOperationTimeout(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processWithdrawalGroup(ws, withdrawalGroupId),
);
}
return {
type: OperationAttemptResultType.Finished,
result: undefined,
@ -1514,19 +1536,19 @@ export async function initiatePeerPullPayment(
// whether purse creation has failed, or does the client/
// check this asynchronously from the transaction status?
await runOperationWithErrorReporting(
ws,
RetryTags.byPeerPullPaymentInitiationPursePub(pursePair.pub),
async () => {
return processPeerPullCredit(ws, pursePair.pub);
},
);
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullInitiation,
pursePub: pursePair.pub,
});
await runOperationWithErrorReporting(ws, taskId, async () => {
return processPeerPullCredit(ws, pursePair.pub);
});
// FIXME: Why do we create this only here?
// What if the previous operation didn't succeed?
// FIXME: Use a pre-computed withdrawal group ID
// so we don't create it multiple times.
// We actually should create it once we know the
// money arrived (via long-polling).
await internalCreateWithdrawalGroup(ws, {
amount: instructedAmount,

View File

@ -39,7 +39,7 @@ import {
import { AbsoluteTime } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadOnlyAccess } from "../util/query.js";
import { RetryTags } from "../util/retries.js";
import { TaskIdentifiers } from "../util/retries.js";
import { GlobalIDB } from "@gnu-taler/idb-bridge";
function getPendingCommon(
@ -74,7 +74,7 @@ async function gatherExchangePending(
): Promise<void> {
// FIXME: We should do a range query here based on the update time.
await tx.exchanges.iter().forEachAsync(async (exch) => {
const opTag = RetryTags.forExchangeUpdate(exch);
const opTag = TaskIdentifiers.forExchangeUpdate(exch);
let opr = await tx.operationRetries.get(opTag);
const timestampDue =
opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate);
@ -120,7 +120,7 @@ async function gatherRefreshPending(
if (r.timestampFinished) {
return;
}
const opId = RetryTags.forRefresh(r);
const opId = TaskIdentifiers.forRefresh(r);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
@ -158,7 +158,7 @@ async function gatherWithdrawalPending(
if (wsr.timestampFinish) {
return;
}
const opTag = RetryTags.forWithdrawal(wsr);
const opTag = TaskIdentifiers.forWithdrawal(wsr);
let opr = await tx.operationRetries.get(opTag);
const now = AbsoluteTime.now();
if (!opr) {
@ -208,7 +208,7 @@ async function gatherDepositPending(
deposited = false;
}
}
const opId = RetryTags.forDeposit(dg);
const opId = TaskIdentifiers.forDeposit(dg);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@ -239,7 +239,7 @@ async function gatherTipPending(
if (tip.pickedUpTimestamp) {
return;
}
const opId = RetryTags.forTipPickup(tip);
const opId = TaskIdentifiers.forTipPickup(tip);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
if (tip.acceptedTimestamp) {
@ -272,7 +272,7 @@ async function gatherPurchasePending(
await tx.purchases.indexes.byStatus
.iter(keyRange)
.forEachAsync(async (pr) => {
const opId = RetryTags.forPay(pr);
const opId = TaskIdentifiers.forPay(pr);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
@ -301,7 +301,7 @@ async function gatherRecoupPending(
if (rg.timestampFinished) {
return;
}
const opId = RetryTags.forRecoup(rg);
const opId = TaskIdentifiers.forRecoup(rg);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@ -325,7 +325,7 @@ async function gatherBackupPending(
resp: PendingOperationsResponse,
): Promise<void> {
await tx.backupProviders.iter().forEachAsync(async (bp) => {
const opId = RetryTags.forBackup(bp);
const opId = TaskIdentifiers.forBackup(bp);
const retryRecord = await tx.operationRetries.get(opId);
if (bp.state.tag === BackupProviderStateTag.Ready) {
const timestampDue = AbsoluteTime.fromTimestamp(
@ -366,7 +366,7 @@ async function gatherPeerPullInitiationPending(
if (pi.status === OperationStatus.Finished) {
return;
}
const opId = RetryTags.forPeerPullPaymentInitiation(pi);
const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@ -392,7 +392,7 @@ async function gatherPeerPullDebitPending(
if (pi.status === PeerPullPaymentIncomingStatus.Paid) {
return;
}
const opId = RetryTags.forPeerPullPaymentDebit(pi);
const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@ -418,7 +418,7 @@ async function gatherPeerPushInitiationPending(
if (pi.status === PeerPushPaymentInitiationStatus.PurseCreated) {
return;
}
const opId = RetryTags.forPeerPushPaymentInitiation(pi);
const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
@ -447,7 +447,7 @@ async function gatherPeerPushCreditPending(
case PeerPushPaymentIncomingStatus.WithdrawalCreated:
return;
}
const opId = RetryTags.forPeerPushCredit(pi);
const opId = TaskIdentifiers.forPeerPushCredit(pi);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({

View File

@ -734,9 +734,7 @@ async function refreshReveal(
export async function processRefreshGroup(
ws: InternalWalletState,
refreshGroupId: string,
options: {
forceNow?: boolean;
} = {},
options: Record<string, never> = {},
): Promise<OperationAttemptResult> {
logger.info(`processing refresh group ${refreshGroupId}`);

View File

@ -164,9 +164,7 @@ export async function prepareTip(
export async function processTip(
ws: InternalWalletState,
walletTipId: string,
options: {
forceNow?: boolean;
} = {},
options: Record<string, never> = {},
): Promise<OperationAttemptResult> {
const tipRecord = await ws.db
.mktx((x) => [x.tips])

View File

@ -63,12 +63,15 @@ import {
PeerPullPaymentInitiationRecord,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType } from "../pending-types.js";
import { checkDbInvariant } from "../util/invariants.js";
import { RetryTags } from "../util/retries.js";
import { constructTaskIdentifier, TaskIdentifiers } from "../util/retries.js";
import {
makeTombstoneId,
makeTransactionId,
parseId,
resetOperationTimeout,
runOperationWithErrorReporting,
TombstoneTag,
} from "./common.js";
import { processDepositGroup } from "./deposits.js";
@ -79,6 +82,7 @@ import {
extractContractData,
processPurchasePay,
} from "./pay-merchant.js";
import { processPeerPullCredit } from "./pay-peer.js";
import { processRefreshGroup } from "./refresh.js";
import { processTip } from "./tip.js";
import {
@ -152,7 +156,7 @@ export async function getTransactionById(
if (!withdrawalGroupRecord) throw Error("not found");
const opId = RetryTags.forWithdrawal(withdrawalGroupRecord);
const opId = TaskIdentifiers.forWithdrawal(withdrawalGroupRecord);
const ort = await tx.operationRetries.get(opId);
if (
@ -215,7 +219,7 @@ export async function getTransactionById(
Amounts.zeroOfAmount(contractData.amount),
);
const payOpId = RetryTags.forPay(purchase);
const payOpId = TaskIdentifiers.forPay(purchase);
const payRetryRecord = await tx.operationRetries.get(payOpId);
return buildTransactionForPurchase(
@ -237,7 +241,7 @@ export async function getTransactionById(
if (!tipRecord) throw Error("not found");
const retries = await tx.operationRetries.get(
RetryTags.forTipPickup(tipRecord),
TaskIdentifiers.forTipPickup(tipRecord),
);
return buildTransactionForTip(tipRecord, retries);
});
@ -250,7 +254,7 @@ export async function getTransactionById(
if (!depositRecord) throw Error("not found");
const retries = await tx.operationRetries.get(
RetryTags.forDeposit(depositRecord),
TaskIdentifiers.forDeposit(depositRecord),
);
return buildTransactionForDeposit(depositRecord, retries);
});
@ -359,11 +363,11 @@ export async function getTransactionById(
if (pushInc.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId);
if (wg) {
const withdrawalOpId = RetryTags.forWithdrawal(wg);
const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
const pushIncOpId = RetryTags.forPeerPushCredit(pushInc);
const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pushInc);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
return buildTransactionForPeerPushCredit(
@ -394,11 +398,12 @@ export async function getTransactionById(
if (pushInc.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId);
if (wg) {
const withdrawalOpId = RetryTags.forWithdrawal(wg);
const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pushInc);
const pushIncOpId =
TaskIdentifiers.forPeerPullPaymentInitiation(pushInc);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
return buildTransactionForPeerPullCredit(
@ -1109,11 +1114,11 @@ export async function getTransactions(
if (pi.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId);
if (wg) {
const withdrawalOpId = RetryTags.forWithdrawal(wg);
const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
const pushIncOpId = RetryTags.forPeerPushCredit(pi);
const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pi);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
checkDbInvariant(!!ct);
@ -1142,11 +1147,11 @@ export async function getTransactions(
if (pi.withdrawalGroupId) {
wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId);
if (wg) {
const withdrawalOpId = RetryTags.forWithdrawal(wg);
const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);
wgOrt = await tx.operationRetries.get(withdrawalOpId);
}
}
const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pi);
const pushIncOpId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);
let pushIncOrt = await tx.operationRetries.get(pushIncOpId);
checkDbInvariant(!!ct);
@ -1166,7 +1171,7 @@ export async function getTransactions(
return;
}
let required = false;
const opId = RetryTags.forRefresh(rg);
const opId = TaskIdentifiers.forRefresh(rg);
if (transactionsRequest?.includeRefreshes) {
required = true;
} else if (rg.operationStatus !== RefreshOperationStatus.Finished) {
@ -1195,7 +1200,7 @@ export async function getTransactions(
return;
}
const opId = RetryTags.forWithdrawal(wsr);
const opId = TaskIdentifiers.forWithdrawal(wsr);
const ort = await tx.operationRetries.get(opId);
switch (wsr.wgInfo.withdrawalType) {
@ -1238,7 +1243,7 @@ export async function getTransactions(
if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
return;
}
const opId = RetryTags.forDeposit(dg);
const opId = TaskIdentifiers.forDeposit(dg);
const retryRecord = await tx.operationRetries.get(opId);
transactions.push(buildTransactionForDeposit(dg, retryRecord));
@ -1309,7 +1314,7 @@ export async function getTransactions(
);
});
const payOpId = RetryTags.forPay(purchase);
const payOpId = TaskIdentifiers.forPay(purchase);
const payRetryRecord = await tx.operationRetries.get(payOpId);
transactions.push(
await buildTransactionForPurchase(
@ -1333,7 +1338,7 @@ export async function getTransactions(
if (!tipRecord.acceptedTimestamp) {
return;
}
const opId = RetryTags.forTipPickup(tipRecord);
const opId = TaskIdentifiers.forTipPickup(tipRecord);
const retryRecord = await tx.operationRetries.get(opId);
transactions.push(buildTransactionForTip(tipRecord, retryRecord));
});
@ -1359,6 +1364,77 @@ export async function getTransactions(
return { transactions: [...txNotPending, ...txPending] };
}
export type ParsedTransactionIdentifier =
| { tag: TransactionType.Deposit; depositGroupId: string }
| { tag: TransactionType.Payment; proposalId: string }
| { tag: TransactionType.PeerPullDebit; peerPullPaymentIncomingId: string }
| { tag: TransactionType.PeerPullCredit; pursePub: string }
| { tag: TransactionType.PeerPushCredit; peerPushPaymentIncomingId: string }
| { tag: TransactionType.PeerPushDebit; pursePub: string }
| { tag: TransactionType.Refresh; refreshGroupId: string }
| { tag: TransactionType.Refund; proposalId: string; executionTime: string }
| { tag: TransactionType.Tip; walletTipId: string }
| { tag: TransactionType.Withdrawal; withdrawalGroupId: string };
/**
* Parse a transaction identifier string into a typed, structured representation.
*/
export function parseTransactionIdentifier(
transactionId: string,
): ParsedTransactionIdentifier | undefined {
const { type, args: rest } = parseId("any", transactionId);
switch (type) {
case TransactionType.Deposit:
return { tag: TransactionType.Deposit, depositGroupId: rest[0] };
case TransactionType.Payment:
return { tag: TransactionType.Payment, proposalId: rest[0] };
case TransactionType.PeerPullCredit:
return { tag: TransactionType.PeerPullCredit, pursePub: rest[0] };
case TransactionType.PeerPullDebit:
return {
tag: TransactionType.PeerPullDebit,
peerPullPaymentIncomingId: rest[0],
};
case TransactionType.PeerPushCredit:
return {
tag: TransactionType.PeerPushCredit,
peerPushPaymentIncomingId: rest[0],
};
case TransactionType.PeerPushDebit:
return { tag: TransactionType.PeerPushDebit, pursePub: rest[0] };
case TransactionType.Refresh:
return { tag: TransactionType.Refresh, refreshGroupId: rest[0] };
case TransactionType.Refund:
return {
tag: TransactionType.Refund,
proposalId: rest[0],
executionTime: rest[1],
};
case TransactionType.Tip:
return {
tag: TransactionType.Tip,
walletTipId: rest[0],
};
case TransactionType.Withdrawal:
return {
tag: TransactionType.Withdrawal,
withdrawalGroupId: rest[0],
};
default:
return undefined;
}
}
export function stopLongpolling(ws: InternalWalletState, taskId: string) {
const longpoll = ws.activeLongpoll[taskId];
if (longpoll) {
logger.info(`cancelling long-polling for ${taskId}`);
longpoll.cancel();
delete ws.activeLongpoll[taskId];
}
}
/**
* Immediately retry the underlying operation
* of a transaction.
@ -1369,34 +1445,86 @@ export async function retryTransaction(
): Promise<void> {
logger.info(`retrying transaction ${transactionId}`);
const { type, args: rest } = parseId("any", transactionId);
const parsedTx = parseTransactionIdentifier(transactionId);
switch (type) {
case TransactionType.Deposit: {
const depositGroupId = rest[0];
processDepositGroup(ws, depositGroupId, {
forceNow: true,
if (!parsedTx) {
throw Error("invalid transaction identifier");
}
// FIXME: We currently don't cancel active long-polling tasks here.
switch (parsedTx.tag) {
case TransactionType.PeerPullCredit: {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.PeerPullInitiation,
pursePub: parsedTx.pursePub,
});
await resetOperationTimeout(ws, taskId);
stopLongpolling(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processPeerPullCredit(ws, parsedTx.pursePub),
);
break;
}
case TransactionType.Deposit: {
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Deposit,
depositGroupId: parsedTx.depositGroupId,
});
await resetOperationTimeout(ws, taskId);
stopLongpolling(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processDepositGroup(ws, parsedTx.depositGroupId),
);
break;
}
case TransactionType.Withdrawal: {
const withdrawalGroupId = rest[0];
await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true });
// FIXME: Abort current long-poller!
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Withdraw,
withdrawalGroupId: parsedTx.withdrawalGroupId,
});
await resetOperationTimeout(ws, taskId);
stopLongpolling(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processWithdrawalGroup(ws, parsedTx.withdrawalGroupId),
);
break;
}
case TransactionType.Payment: {
const proposalId = rest[0];
await processPurchasePay(ws, proposalId, { forceNow: true });
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Purchase,
proposalId: parsedTx.proposalId,
});
await resetOperationTimeout(ws, taskId);
stopLongpolling(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processPurchasePay(ws, parsedTx.proposalId),
);
break;
}
case TransactionType.Tip: {
const walletTipId = rest[0];
await processTip(ws, walletTipId, { forceNow: true });
const taskId = constructTaskIdentifier({
tag: PendingTaskType.TipPickup,
walletTipId: parsedTx.walletTipId,
});
await resetOperationTimeout(ws, taskId);
stopLongpolling(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processTip(ws, parsedTx.walletTipId),
);
break;
}
case TransactionType.Refresh: {
const refreshGroupId = rest[0];
await processRefreshGroup(ws, refreshGroupId, { forceNow: true });
const taskId = constructTaskIdentifier({
tag: PendingTaskType.Refresh,
refreshGroupId: parsedTx.refreshGroupId,
});
await resetOperationTimeout(ws, taskId);
stopLongpolling(ws, taskId);
await runOperationWithErrorReporting(ws, taskId, () =>
processRefreshGroup(ws, parsedTx.refreshGroupId),
);
break;
}
default:

View File

@ -109,7 +109,7 @@ import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
import {
OperationAttemptResult,
OperationAttemptResultType,
RetryTags,
TaskIdentifiers,
} from "../util/retries.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@ -1023,7 +1023,6 @@ export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<OperationAttemptResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
@ -1037,10 +1036,10 @@ export async function processWithdrawalGroup(
throw Error(`withdrawal group ${withdrawalGroupId} not found`);
}
const retryTag = RetryTags.forWithdrawal(withdrawalGroup);
const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup);
// We're already running!
if (ws.activeLongpoll[retryTag] && !options.forceNow) {
if (ws.activeLongpoll[retryTag]) {
logger.info("withdrawal group already in long-polling, returning!");
return {
type: OperationAttemptResultType.Longpoll,
@ -1532,7 +1531,7 @@ export async function getWithdrawalDetailsForUri(
.iter(r.baseUrl)
.toArray();
const retryRecord = await tx.operationRetries.get(
RetryTags.forExchangeUpdate(r),
TaskIdentifiers.forExchangeUpdate(r),
);
if (exchangeDetails && denominations) {
exchanges.push(
@ -2087,7 +2086,7 @@ export async function createManualWithdrawal(
// rely on retry handling to re-process the withdrawal group.
runOperationWithErrorReporting(
ws,
RetryTags.forWithdrawal(withdrawalGroup),
TaskIdentifiers.forWithdrawal(withdrawalGroup),
async () => {
return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,

View File

@ -46,6 +46,7 @@ import { TalerError } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType } from "../pending-types.js";
import { GetReadWriteAccess } from "./query.js";
import { assertUnreachable } from "./assertUnreachable.js";
const logger = new Logger("util/retries.ts");
@ -176,7 +177,66 @@ export namespace RetryInfo {
}
}
export namespace RetryTags {
/**
* Parsed representation of task identifiers.
*/
export type ParsedTaskIdentifier =
| {
tag: PendingTaskType.Withdraw;
withdrawalGroupId: string;
}
| { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string }
| { tag: PendingTaskType.Backup; backupProviderBaseUrl: string }
| { tag: PendingTaskType.Deposit; depositGroupId: string }
| { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string }
| { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string }
| { tag: PendingTaskType.PeerPullDebit; peerPullPaymentIncomingId: string }
| { tag: PendingTaskType.PeerPullInitiation; pursePub: string }
| { tag: PendingTaskType.PeerPushCredit; peerPushPaymentIncomingId: string }
| { tag: PendingTaskType.PeerPushInitiation; pursePub: string }
| { tag: PendingTaskType.Purchase; proposalId: string }
| { tag: PendingTaskType.Recoup; recoupGroupId: string }
| { tag: PendingTaskType.TipPickup; walletTipId: string }
| { tag: PendingTaskType.Refresh; refreshGroupId: string };
export function parseTaskIdentifier(x: string): ParsedTaskIdentifier {
throw Error("not yet implemented");
}
export function constructTaskIdentifier(p: ParsedTaskIdentifier): string {
switch (p.tag) {
case PendingTaskType.Backup:
return `${p.tag}:${p.backupProviderBaseUrl}`;
case PendingTaskType.Deposit:
return `${p.tag}:${p.depositGroupId}`;
case PendingTaskType.ExchangeCheckRefresh:
return `${p.tag}:${p.exchangeBaseUrl}`;
case PendingTaskType.ExchangeUpdate:
return `${p.tag}:${p.exchangeBaseUrl}`;
case PendingTaskType.PeerPullDebit:
return `${p.tag}:${p.peerPullPaymentIncomingId}`;
case PendingTaskType.PeerPushCredit:
return `${p.tag}:${p.peerPushPaymentIncomingId}`;
case PendingTaskType.PeerPullInitiation:
return `${p.tag}:${p.pursePub}`;
case PendingTaskType.PeerPushInitiation:
return `${p.tag}:${p.pursePub}`;
case PendingTaskType.Purchase:
return `${p.tag}:${p.proposalId}`;
case PendingTaskType.Recoup:
return `${p.tag}:${p.recoupGroupId}`;
case PendingTaskType.Refresh:
return `${p.tag}:${p.refreshGroupId}`;
case PendingTaskType.TipPickup:
return `${p.tag}:${p.walletTipId}`;
case PendingTaskType.Withdraw:
return `${p.tag}:${p.withdrawalGroupId}`;
default:
assertUnreachable(p);
}
}
export namespace TaskIdentifiers {
export function forWithdrawal(wg: WithdrawalGroupRecord): string {
return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}`;
}
@ -227,19 +287,6 @@ export namespace RetryTags {
): string {
return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushPaymentIncomingId}`;
}
export function byPaymentProposalId(proposalId: string): string {
return `${PendingTaskType.Purchase}:${proposalId}`;
}
export function byPeerPushPaymentInitiationPursePub(
pursePub: string,
): string {
return `${PendingTaskType.PeerPushInitiation}:${pursePub}`;
}
export function byPeerPullPaymentInitiationPursePub(
pursePub: string,
): string {
return `${PendingTaskType.PeerPullInitiation}:${pursePub}`;
}
}
export async function scheduleRetryInTx(

View File

@ -265,7 +265,7 @@ import {
GetReadOnlyAccess,
GetReadWriteAccess,
} from "./util/query.js";
import { OperationAttemptResult, RetryTags } from "./util/retries.js";
import { OperationAttemptResult, TaskIdentifiers } from "./util/retries.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@ -306,17 +306,15 @@ async function callOperationHandler(
forceNow,
});
case PendingTaskType.Refresh:
return await processRefreshGroup(ws, pending.refreshGroupId, {
forceNow,
});
return await processRefreshGroup(ws, pending.refreshGroupId);
case PendingTaskType.Withdraw:
return await processWithdrawalGroup(ws, pending.withdrawalGroupId, {
forceNow,
});
case PendingTaskType.TipPickup:
return await processTip(ws, pending.tipId, { forceNow });
return await processTip(ws, pending.tipId);
case PendingTaskType.Purchase:
return await processPurchase(ws, pending.proposalId, { forceNow });
return await processPurchase(ws, pending.proposalId);
case PendingTaskType.Recoup:
return await processRecoupGroupHandler(ws, pending.recoupGroupId, {
forceNow,
@ -324,9 +322,7 @@ async function callOperationHandler(
case PendingTaskType.ExchangeCheckRefresh:
return await autoRefresh(ws, pending.exchangeBaseUrl);
case PendingTaskType.Deposit: {
return await processDepositGroup(ws, pending.depositGroupId, {
forceNow,
});
return await processDepositGroup(ws, pending.depositGroupId);
}
case PendingTaskType.Backup:
return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
@ -691,7 +687,7 @@ async function getExchanges(
for (const r of exchangeRecords) {
const exchangeDetails = await getExchangeDetails(tx, r.baseUrl);
const opRetryRecord = await tx.operationRetries.get(
RetryTags.forExchangeUpdate(r),
TaskIdentifiers.forExchangeUpdate(r),
);
exchanges.push(
makeExchangeListItem(r, exchangeDetails, opRetryRecord?.lastError),
@ -1285,9 +1281,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(
RefreshReason.Manual,
);
});
processRefreshGroup(ws, refreshGroupId.refreshGroupId, {
forceNow: true,
}).catch((x) => {
processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((x) => {
logger.error(x);
});
return {
@ -1753,6 +1747,7 @@ class InternalWalletStateImpl implements InternalWalletState {
for (const key of Object.keys(this.activeLongpoll)) {
logger.trace(`cancelling active longpoll ${key}`);
this.activeLongpoll[key].cancel();
delete this.activeLongpoll[key];
}
}