wallet-core: uniform retry handling

This commit is contained in:
Florian Dold 2022-09-05 18:12:30 +02:00
parent f9f2911c76
commit 13e7a67477
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
19 changed files with 1029 additions and 1285 deletions

View File

@ -100,6 +100,10 @@ export namespace Duration {
return durationMin(d1, d2);
}
export function multiply(d1: Duration, n: number): Duration {
return durationMul(d1, n);
}
export function toIntegerYears(d: Duration): number {
if (typeof d.d_ms !== "number") {
throw Error("infinite duration");
@ -357,7 +361,6 @@ export const codecForAbsoluteTime: Codec<AbsoluteTime> = {
},
};
export const codecForTimestamp: Codec<TalerProtocolTimestamp> = {
decode(x: any, c?: Context): TalerProtocolTimestamp {
// Compatibility, should be removed soon.

View File

@ -32,7 +32,12 @@ import {
codecForAmountJson,
codecForAmountString,
} from "./amounts.js";
import { AbsoluteTime, codecForAbsoluteTime, codecForTimestamp, TalerProtocolTimestamp } from "./time.js";
import {
AbsoluteTime,
codecForAbsoluteTime,
codecForTimestamp,
TalerProtocolTimestamp,
} from "./time.js";
import {
buildCodecForObject,
codecForString,
@ -797,46 +802,43 @@ const codecForExchangeTos = (): Codec<ExchangeTos> =>
.property("content", codecOptional(codecForString()))
.build("ExchangeTos");
export const codecForFeeDescriptionPair =
(): Codec<FeeDescriptionPair> =>
buildCodecForObject<FeeDescriptionPair>()
.property("value", codecForAmountJson())
.property("from", codecForAbsoluteTime)
.property("until", codecForAbsoluteTime)
.property("left", codecOptional(codecForAmountJson()))
.property("right", codecOptional(codecForAmountJson()))
.build("FeeDescriptionPair");
export const codecForFeeDescriptionPair = (): Codec<FeeDescriptionPair> =>
buildCodecForObject<FeeDescriptionPair>()
.property("value", codecForAmountJson())
.property("from", codecForAbsoluteTime)
.property("until", codecForAbsoluteTime)
.property("left", codecOptional(codecForAmountJson()))
.property("right", codecOptional(codecForAmountJson()))
.build("FeeDescriptionPair");
export const codecForFeeDescription =
(): Codec<FeeDescription> =>
buildCodecForObject<FeeDescription>()
.property("value", codecForAmountJson())
.property("from", codecForAbsoluteTime)
.property("until", codecForAbsoluteTime)
.property("fee", codecOptional(codecForAmountJson()))
.build("FeeDescription");
export const codecForFeeDescription = (): Codec<FeeDescription> =>
buildCodecForObject<FeeDescription>()
.property("value", codecForAmountJson())
.property("from", codecForAbsoluteTime)
.property("until", codecForAbsoluteTime)
.property("fee", codecOptional(codecForAmountJson()))
.build("FeeDescription");
export const codecForFeesByOperations = (): Codec<
OperationMap<FeeDescription[]>
> =>
buildCodecForObject<OperationMap<FeeDescription[]>>()
.property("deposit", codecForList(codecForFeeDescription()))
.property("withdraw", codecForList(codecForFeeDescription()))
.property("refresh", codecForList(codecForFeeDescription()))
.property("refund", codecForList(codecForFeeDescription()))
.build("FeesByOperations");
export const codecForFeesByOperations =
(): Codec<OperationMap<FeeDescription[]>> =>
buildCodecForObject<OperationMap<FeeDescription[]>>()
.property("deposit", codecForList(codecForFeeDescription()))
.property("withdraw", codecForList(codecForFeeDescription()))
.property("refresh", codecForList(codecForFeeDescription()))
.property("refund", codecForList(codecForFeeDescription()))
.build("FeesByOperations");
export const codecForExchangeFullDetails =
(): Codec<ExchangeFullDetails> =>
buildCodecForObject<ExchangeFullDetails>()
.property("currency", codecForString())
.property("exchangeBaseUrl", codecForString())
.property("paytoUris", codecForList(codecForString()))
.property("tos", codecForExchangeTos())
.property("auditors", codecForList(codecForExchangeAuditor()))
.property("wireInfo", codecForWireInfo())
.property("feesDescription", codecForFeesByOperations())
.build("ExchangeFullDetails");
export const codecForExchangeFullDetails = (): Codec<ExchangeFullDetails> =>
buildCodecForObject<ExchangeFullDetails>()
.property("currency", codecForString())
.property("exchangeBaseUrl", codecForString())
.property("paytoUris", codecForList(codecForString()))
.property("tos", codecForExchangeTos())
.property("auditors", codecForList(codecForExchangeAuditor()))
.property("wireInfo", codecForWireInfo())
.property("feesDescription", codecForFeesByOperations())
.build("ExchangeFullDetails");
export const codecForExchangeListItem = (): Codec<ExchangeListItem> =>
buildCodecForObject<ExchangeListItem>()

View File

@ -361,14 +361,14 @@ export interface ExchangeDetailsRecord {
* Terms of service text or undefined if not downloaded yet.
*
* This is just used as a cache of the last downloaded ToS.
*
*
* FIXME: Put in separate object store!
*/
termsOfServiceText: string | undefined;
/**
* content-type of the last downloaded termsOfServiceText.
*
*
* * FIXME: Put in separate object store!
*/
termsOfServiceContentType: string | undefined;
@ -454,17 +454,6 @@ export interface ExchangeRecord {
*/
nextRefreshCheck: TalerProtocolTimestamp;
/**
* Last error (if any) for fetching updated information about the
* exchange.
*/
lastError?: TalerErrorDetail;
/**
* Retry status for fetching updated information about the exchange.
*/
retryInfo?: RetryInfo;
/**
* Public key of the reserve that we're currently using for
* receiving P2P payments.
@ -734,24 +723,12 @@ export interface ProposalRecord {
* Session ID we got when downloading the contract.
*/
downloadSessionId?: string;
/**
* Retry info, even present when the operation isn't active to allow indexing
* on the next retry timestamp.
*
* FIXME: Clarify what we even retry.
*/
retryInfo?: RetryInfo;
lastError: TalerErrorDetail | undefined;
}
/**
* Status of a tip we got from a merchant.
*/
export interface TipRecord {
lastError: TalerErrorDetail | undefined;
/**
* Has the user accepted the tip? Only after the tip has been accepted coins
* withdrawn from the tip may be used.
@ -810,12 +787,6 @@ export interface TipRecord {
* from the merchant.
*/
pickedUpTimestamp: TalerProtocolTimestamp | undefined;
/**
* Retry info, even present when the operation isn't active to allow indexing
* on the next retry timestamp.
*/
retryInfo: RetryInfo;
}
export enum RefreshCoinStatus {
@ -837,16 +808,7 @@ export enum OperationStatus {
export interface RefreshGroupRecord {
operationStatus: OperationStatus;
/**
* Retry info, even present when the operation isn't active to allow indexing
* on the next retry timestamp.
*
* FIXME: No, this can be optional, indexing is still possible
*/
retryInfo: RetryInfo;
lastError: TalerErrorDetail | undefined;
// FIXME: Put this into a different object store?
lastErrorPerCoin: { [coinIndex: number]: TalerErrorDetail };
/**
@ -1117,6 +1079,8 @@ export interface PurchaseRecord {
/**
* Pending refunds for the purchase. A refund is pending
* when the merchant reports a transient error from the exchange.
*
* FIXME: Put this into a separate object store?
*/
refunds: { [refundKey: string]: WalletRefundItem };
@ -1132,6 +1096,7 @@ export interface PurchaseRecord {
lastSessionId: string | undefined;
/**
* Do we still need to post the deposit permissions to the merchant?
* Set for the first payment, or on re-plays.
*/
paymentSubmitPending: boolean;
@ -1142,22 +1107,6 @@ export interface PurchaseRecord {
*/
refundQueryRequested: boolean;
abortStatus: AbortStatus;
payRetryInfo?: RetryInfo;
lastPayError: TalerErrorDetail | undefined;
/**
* Retry information for querying the refund status with the merchant.
*/
refundStatusRetryInfo: RetryInfo;
/**
* Last error (or undefined) for querying the refund status with the merchant.
*/
lastRefundStatusError: TalerErrorDetail | undefined;
/**
* Continue querying the refund status until this deadline has expired.
*/
@ -1174,6 +1123,11 @@ export interface PurchaseRecord {
* an error where it doesn't make sense to retry.
*/
payFrozen?: boolean;
/**
* FIXME: How does this interact with payFrozen?
*/
abortStatus: AbortStatus;
}
export const WALLET_BACKUP_STATE_KEY = "walletBackupState";
@ -1184,9 +1138,9 @@ export const WALLET_BACKUP_STATE_KEY = "walletBackupState";
*/
export type ConfigRecord =
| {
key: typeof WALLET_BACKUP_STATE_KEY;
value: WalletBackupConfState;
}
key: typeof WALLET_BACKUP_STATE_KEY;
value: WalletBackupConfState;
}
| { key: "currencyDefaultsApplied"; value: boolean };
export interface WalletBackupConfState {
@ -1368,13 +1322,6 @@ export interface WithdrawalGroupRecord {
* FIXME: Should this not also include a timestamp for more logical merging?
*/
denomSelUid: string;
/**
* Retry info.
*/
retryInfo?: RetryInfo;
lastError: TalerErrorDetail | undefined;
}
export interface BankWithdrawUriRecord {
@ -1432,16 +1379,6 @@ export interface RecoupGroupRecord {
* after all individual recoups are done.
*/
scheduleRefreshCoins: string[];
/**
* Retry info.
*/
retryInfo: RetryInfo;
/**
* Last error that occurred, if any.
*/
lastError: TalerErrorDetail | undefined;
}
export enum BackupProviderStateTag {
@ -1452,17 +1389,15 @@ export enum BackupProviderStateTag {
export type BackupProviderState =
| {
tag: BackupProviderStateTag.Provisional;
}
tag: BackupProviderStateTag.Provisional;
}
| {
tag: BackupProviderStateTag.Ready;
nextBackupTimestamp: TalerProtocolTimestamp;
}
tag: BackupProviderStateTag.Ready;
nextBackupTimestamp: TalerProtocolTimestamp;
}
| {
tag: BackupProviderStateTag.Retrying;
retryInfo: RetryInfo;
lastError?: TalerErrorDetail;
};
tag: BackupProviderStateTag.Retrying;
};
export interface BackupProviderTerms {
supportedProtocolVersion: string;
@ -1573,13 +1508,6 @@ export interface DepositGroupRecord {
timestampFinished: TalerProtocolTimestamp | undefined;
operationStatus: OperationStatus;
lastError: TalerErrorDetail | undefined;
/**
* Retry info.
*/
retryInfo?: RetryInfo;
}
/**
@ -1749,6 +1677,60 @@ export interface ReserveRecord {
reservePriv: string;
}
export interface OperationRetryRecord {
/**
* Unique identifier for the operation. Typically of
* the format `${opType}-${opUniqueKey}`
*/
id: string;
lastError?: TalerErrorDetail;
retryInfo: RetryInfo;
}
export enum OperationAttemptResultType {
Finished = "finished",
Pending = "pending",
Error = "error",
Longpoll = "longpoll",
}
// FIXME: not part of DB!
export type OperationAttemptResult<TSuccess = unknown, TPending = unknown> =
| OperationAttemptFinishedResult<TSuccess>
| OperationAttemptErrorResult
| OperationAttemptLongpollResult
| OperationAttemptPendingResult<TPending>;
export namespace OperationAttemptResult {
export function finishedEmpty(): OperationAttemptResult<unknown, unknown> {
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
}
export interface OperationAttemptFinishedResult<T> {
type: OperationAttemptResultType.Finished;
result: T;
}
export interface OperationAttemptPendingResult<T> {
type: OperationAttemptResultType.Pending;
result: T;
}
export interface OperationAttemptErrorResult {
type: OperationAttemptResultType.Error;
errorDetail: TalerErrorDetail;
}
export interface OperationAttemptLongpollResult {
type: OperationAttemptResultType.Longpoll;
}
export const WalletStoresV1 = {
coins: describeStore(
describeContents<CoinRecord>("coins", {
@ -1913,6 +1895,12 @@ export const WalletStoresV1 = {
describeContents<TombstoneRecord>("tombstones", { keyPath: "id" }),
{},
),
operationRetries: describeStore(
describeContents<OperationRetryRecord>("operationRetries", {
keyPath: "id",
}),
{},
),
ghostDepositGroups: describeStore(
describeContents<GhostDepositGroupRecord>("ghostDepositGroups", {
keyPath: "contractTermsHash",

View File

@ -274,7 +274,6 @@ export async function importBackup(
protocolVersionRange: backupExchange.protocol_version_range,
},
permanent: true,
retryInfo: RetryInfo.reset(),
lastUpdate: undefined,
nextUpdate: TalerProtocolTimestamp.now(),
nextRefreshCheck: TalerProtocolTimestamp.now(),
@ -341,7 +340,7 @@ export async function importBackup(
}
const denomPubHash =
cryptoComp.rsaDenomPubToHash[
backupDenomination.denom_pub.rsa_public_key
backupDenomination.denom_pub.rsa_public_key
];
checkLogicInvariant(!!denomPubHash);
const existingDenom = await tx.denominations.get([
@ -426,7 +425,6 @@ export async function importBackup(
}
}
// FIXME: import reserves with new schema
// for (const backupReserve of backupExchangeDetails.reserves) {
@ -517,7 +515,6 @@ export async function importBackup(
// }
// }
// }
}
for (const backupProposal of backupBlob.proposals) {
@ -560,7 +557,7 @@ export async function importBackup(
const amount = Amounts.parseOrThrow(parsedContractTerms.amount);
const contractTermsHash =
cryptoComp.proposalIdToContractTermsHash[
backupProposal.proposal_id
backupProposal.proposal_id
];
let maxWireFee: AmountJson;
if (parsedContractTerms.max_wire_fee) {
@ -611,7 +608,6 @@ export async function importBackup(
}
await tx.proposals.put({
claimToken: backupProposal.claim_token,
lastError: undefined,
merchantBaseUrl: backupProposal.merchant_base_url,
timestamp: backupProposal.timestamp,
orderId: backupProposal.order_id,
@ -620,7 +616,6 @@ export async function importBackup(
cryptoComp.proposalNoncePrivToPub[backupProposal.nonce_priv],
proposalId: backupProposal.proposal_id,
repurchaseProposalId: backupProposal.repurchase_proposal_id,
retryInfo: RetryInfo.reset(),
download,
proposalStatus,
});
@ -706,7 +701,7 @@ export async function importBackup(
const amount = Amounts.parseOrThrow(parsedContractTerms.amount);
const contractTermsHash =
cryptoComp.proposalIdToContractTermsHash[
backupPurchase.proposal_id
backupPurchase.proposal_id
];
let maxWireFee: AmountJson;
if (parsedContractTerms.max_wire_fee) {
@ -755,10 +750,7 @@ export async function importBackup(
noncePriv: backupPurchase.nonce_priv,
noncePub:
cryptoComp.proposalNoncePrivToPub[backupPurchase.nonce_priv],
lastPayError: undefined,
autoRefundDeadline: TalerProtocolTimestamp.never(),
refundStatusRetryInfo: RetryInfo.reset(),
lastRefundStatusError: undefined,
refundAwaiting: undefined,
timestampAccept: backupPurchase.timestamp_accept,
timestampFirstSuccessfulPay:
@ -767,8 +759,6 @@ export async function importBackup(
merchantPaySig: backupPurchase.merchant_pay_sig,
lastSessionId: undefined,
abortStatus,
// FIXME!
payRetryInfo: RetryInfo.reset(),
download,
paymentSubmitPending:
!backupPurchase.timestamp_first_successful_pay,
@ -851,7 +841,6 @@ export async function importBackup(
timestampCreated: backupRefreshGroup.timestamp_created,
refreshGroupId: backupRefreshGroup.refresh_group_id,
reason,
lastError: undefined,
lastErrorPerCoin: {},
oldCoinPubs: backupRefreshGroup.old_coins.map((x) => x.coin_pub),
statusPerCoin: backupRefreshGroup.old_coins.map((x) =>
@ -869,7 +858,6 @@ export async function importBackup(
Amounts.parseOrThrow(x.estimated_output_amount),
),
refreshSessionPerCoin,
retryInfo: RetryInfo.reset(),
});
}
}
@ -891,11 +879,9 @@ export async function importBackup(
createdTimestamp: backupTip.timestamp_created,
denomsSel,
exchangeBaseUrl: backupTip.exchange_base_url,
lastError: undefined,
merchantBaseUrl: backupTip.exchange_base_url,
merchantTipId: backupTip.merchant_tip_id,
pickedUpTimestamp: backupTip.timestamp_finished,
retryInfo: RetryInfo.reset(),
secretSeed: backupTip.secret_seed,
tipAmountEffective: denomsSel.totalCoinValue,
tipAmountRaw: Amounts.parseOrThrow(backupTip.tip_amount_raw),

View File

@ -25,9 +25,12 @@
* Imports.
*/
import {
AbsoluteTime, AmountString,
AbsoluteTime,
AmountString,
BackupRecovery,
buildCodecForObject, bytesToString, canonicalizeBaseUrl,
buildCodecForObject,
bytesToString,
canonicalizeBaseUrl,
canonicalJson,
Codec,
codecForAmountString,
@ -36,19 +39,32 @@ import {
codecForNumber,
codecForString,
codecOptional,
ConfirmPayResultType, decodeCrock, DenomKeyType,
durationFromSpec, eddsaGetPublic,
ConfirmPayResultType,
decodeCrock,
DenomKeyType,
durationFromSpec,
eddsaGetPublic,
EddsaKeyPair,
encodeCrock,
getRandomBytes,
hash, hashDenomPub,
hash,
hashDenomPub,
HttpStatusCode,
j2s, kdf, Logger,
j2s,
kdf,
Logger,
notEmpty,
PreparePayResultType,
RecoveryLoadRequest,
RecoveryMergeStrategy, rsaBlind, secretbox, secretbox_open, stringToBytes, TalerErrorDetail, TalerProtocolTimestamp, URL,
WalletBackupContentV1
RecoveryMergeStrategy,
rsaBlind,
secretbox,
secretbox_open,
stringToBytes,
TalerErrorDetail,
TalerProtocolTimestamp,
URL,
WalletBackupContentV1,
} from "@gnu-taler/taler-util";
import { gunzipSync, gzipSync } from "fflate";
import { TalerCryptoInterface } from "../../crypto/cryptoImplementation.js";
@ -58,26 +74,28 @@ import {
BackupProviderStateTag,
BackupProviderTerms,
ConfigRecord,
OperationAttemptResult,
OperationAttemptResultType,
WalletBackupConfState,
WalletStoresV1,
WALLET_BACKUP_STATE_KEY
WALLET_BACKUP_STATE_KEY,
} from "../../db.js";
import { InternalWalletState } from "../../internal-wallet-state.js";
import {
readSuccessResponseJsonOrThrow,
readTalerErrorResponse
readTalerErrorResponse,
} from "../../util/http.js";
import {
checkDbInvariant,
checkLogicInvariant
checkLogicInvariant,
} from "../../util/invariants.js";
import { GetReadWriteAccess } from "../../util/query.js";
import { RetryInfo } from "../../util/retries.js";
import { RetryInfo, RetryTags, scheduleRetryInTx } from "../../util/retries.js";
import { guardOperationException } from "../common.js";
import {
checkPaymentByProposalId,
confirmPay,
preparePayForUri
preparePayForUri,
} from "../pay.js";
import { exportBackup } from "./export.js";
import { BackupCryptoPrecomputedData, importBackup } from "./import.js";
@ -244,8 +262,7 @@ function getNextBackupTimestamp(): TalerProtocolTimestamp {
async function runBackupCycleForProvider(
ws: InternalWalletState,
args: BackupForProviderArgs,
): Promise<void> {
): Promise<OperationAttemptResult> {
const provider = await ws.db
.mktx((x) => ({ backupProviders: x.backupProviders }))
.runReadOnly(async (tx) => {
@ -254,7 +271,10 @@ async function runBackupCycleForProvider(
if (!provider) {
logger.warn("provider disappeared");
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
const backupJson = await exportBackup(ws);
@ -292,8 +312,8 @@ async function runBackupCycleForProvider(
"if-none-match": newHash,
...(provider.lastBackupHash
? {
"if-match": provider.lastBackupHash,
}
"if-match": provider.lastBackupHash,
}
: {}),
},
});
@ -315,7 +335,10 @@ async function runBackupCycleForProvider(
};
await tx.backupProvider.put(prov);
});
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
if (resp.status === HttpStatusCode.PaymentRequired) {
@ -344,7 +367,10 @@ async function runBackupCycleForProvider(
// FIXME: check if the provider is overcharging us!
await ws.db
.mktx((x) => ({ backupProviders: x.backupProviders }))
.mktx((x) => ({
backupProviders: x.backupProviders,
operationRetries: x.operationRetries,
}))
.runReadWrite(async (tx) => {
const provRec = await tx.backupProviders.get(provider.baseUrl);
checkDbInvariant(!!provRec);
@ -354,11 +380,8 @@ async function runBackupCycleForProvider(
provRec.currentPaymentProposalId = proposalId;
// FIXME: allocate error code for this!
await tx.backupProviders.put(provRec);
await incrementBackupRetryInTx(
tx,
args.backupProviderBaseUrl,
undefined,
);
const opId = RetryTags.forBackup(provRec);
await scheduleRetryInTx(ws, tx, opId);
});
if (doPay) {
@ -371,12 +394,15 @@ async function runBackupCycleForProvider(
}
if (args.retryAfterPayment) {
await runBackupCycleForProvider(ws, {
return await runBackupCycleForProvider(ws, {
...args,
retryAfterPayment: false,
});
}
return;
return {
type: OperationAttemptResultType.Pending,
result: undefined,
};
}
if (resp.status === HttpStatusCode.NoContent) {
@ -395,7 +421,10 @@ async function runBackupCycleForProvider(
};
await tx.backupProviders.put(prov);
});
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
if (resp.status === HttpStatusCode.Conflict) {
@ -406,7 +435,10 @@ async function runBackupCycleForProvider(
const cryptoData = await computeBackupCryptoData(ws.cryptoApi, blob);
await importBackup(ws, blob, cryptoData);
await ws.db
.mktx((x) => ({ backupProvider: x.backupProviders }))
.mktx((x) => ({
backupProvider: x.backupProviders,
operationRetries: x.operationRetries,
}))
.runReadWrite(async (tx) => {
const prov = await tx.backupProvider.get(provider.baseUrl);
if (!prov) {
@ -414,20 +446,21 @@ async function runBackupCycleForProvider(
return;
}
prov.lastBackupHash = encodeCrock(hash(backupEnc));
// FIXME: Allocate error code for this situation?
// FIXME: Allocate error code for this situation?
// FIXME: Add operation retry record!
const opId = RetryTags.forBackup(prov);
await scheduleRetryInTx(ws, tx, opId);
prov.state = {
tag: BackupProviderStateTag.Retrying,
retryInfo: RetryInfo.reset(),
};
await tx.backupProvider.put(prov);
});
logger.info("processed existing backup");
// Now upload our own, merged backup.
await runBackupCycleForProvider(ws, {
return await runBackupCycleForProvider(ws, {
...args,
retryAfterPayment: false,
});
return;
}
// Some other response that we did not expect!
@ -436,53 +469,16 @@ async function runBackupCycleForProvider(
const err = await readTalerErrorResponse(resp);
logger.error(`got error response from backup provider: ${j2s(err)}`);
await ws.db
.mktx((x) => ({ backupProviders: x.backupProviders }))
.runReadWrite(async (tx) => {
incrementBackupRetryInTx(tx, args.backupProviderBaseUrl, err);
});
}
async function incrementBackupRetryInTx(
tx: GetReadWriteAccess<{
backupProviders: typeof WalletStoresV1.backupProviders;
}>,
backupProviderBaseUrl: string,
err: TalerErrorDetail | undefined,
): Promise<void> {
const pr = await tx.backupProviders.get(backupProviderBaseUrl);
if (!pr) {
return;
}
if (pr.state.tag === BackupProviderStateTag.Retrying) {
pr.state.lastError = err;
pr.state.retryInfo = RetryInfo.increment(pr.state.retryInfo);
} else if (pr.state.tag === BackupProviderStateTag.Ready) {
pr.state = {
tag: BackupProviderStateTag.Retrying,
retryInfo: RetryInfo.reset(),
lastError: err,
};
}
await tx.backupProviders.put(pr);
}
async function incrementBackupRetry(
ws: InternalWalletState,
backupProviderBaseUrl: string,
err: TalerErrorDetail | undefined,
): Promise<void> {
await ws.db
.mktx((x) => ({ backupProviders: x.backupProviders }))
.runReadWrite(async (tx) =>
incrementBackupRetryInTx(tx, backupProviderBaseUrl, err),
);
return {
type: OperationAttemptResultType.Error,
errorDetail: err,
};
}
export async function processBackupForProvider(
ws: InternalWalletState,
backupProviderBaseUrl: string,
): Promise<void> {
): Promise<OperationAttemptResult> {
const provider = await ws.db
.mktx((x) => ({ backupProviders: x.backupProviders }))
.runReadOnly(async (tx) => {
@ -492,17 +488,10 @@ export async function processBackupForProvider(
throw Error("unknown backup provider");
}
const onOpErr = (err: TalerErrorDetail): Promise<void> =>
incrementBackupRetry(ws, backupProviderBaseUrl, err);
const run = async () => {
await runBackupCycleForProvider(ws, {
backupProviderBaseUrl: provider.baseUrl,
retryAfterPayment: true,
});
};
await guardOperationException(run, onOpErr);
return await runBackupCycleForProvider(ws, {
backupProviderBaseUrl: provider.baseUrl,
retryAfterPayment: true,
});
}
export interface RemoveBackupProviderRequest {
@ -818,24 +807,34 @@ export async function getBackupInfo(
): Promise<BackupInfo> {
const backupConfig = await provideBackupState(ws);
const providerRecords = await ws.db
.mktx((x) => ({ backupProviders: x.backupProviders }))
.mktx((x) => ({
backupProviders: x.backupProviders,
operationRetries: x.operationRetries,
}))
.runReadOnly(async (tx) => {
return await tx.backupProviders.iter().toArray();
return await tx.backupProviders.iter().mapAsync(async (bp) => {
const opId = RetryTags.forBackup(bp);
const retryRecord = await tx.operationRetries.get(opId);
return {
provider: bp,
retryRecord,
};
});
});
const providers: ProviderInfo[] = [];
for (const x of providerRecords) {
providers.push({
active: x.state.tag !== BackupProviderStateTag.Provisional,
syncProviderBaseUrl: x.baseUrl,
lastSuccessfulBackupTimestamp: x.lastBackupCycleTimestamp,
paymentProposalIds: x.paymentProposalIds,
active: x.provider.state.tag !== BackupProviderStateTag.Provisional,
syncProviderBaseUrl: x.provider.baseUrl,
lastSuccessfulBackupTimestamp: x.provider.lastBackupCycleTimestamp,
paymentProposalIds: x.provider.paymentProposalIds,
lastError:
x.state.tag === BackupProviderStateTag.Retrying
? x.state.lastError
x.provider.state.tag === BackupProviderStateTag.Retrying
? x.retryRecord?.lastError
: undefined,
paymentStatus: await getProviderPaymentInfo(ws, x),
terms: x.terms,
name: x.name,
paymentStatus: await getProviderPaymentInfo(ws, x.provider),
terms: x.provider.terms,
name: x.provider.name,
});
}
return {

View File

@ -44,7 +44,12 @@ import {
TrackDepositGroupResponse,
URL,
} from "@gnu-taler/taler-util";
import { DepositGroupRecord, OperationStatus } from "../db.js";
import {
DepositGroupRecord,
OperationAttemptErrorResult,
OperationAttemptResult,
OperationStatus,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { selectPayCoins } from "../util/coinSelection.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
@ -67,61 +72,8 @@ import { getTotalRefreshCost } from "./refresh.js";
const logger = new Logger("deposits.ts");
/**
* Set up the retry timeout for a deposit group.
* @see {processDepositGroup}
*/
async function setupDepositGroupRetry(
ws: InternalWalletState,
depositGroupId: string,
options: {
resetRetry: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
depositGroups: x.depositGroups,
}))
.runReadWrite(async (tx) => {
const x = await tx.depositGroups.get(depositGroupId);
if (!x) {
return;
}
if (options.resetRetry) {
x.retryInfo = RetryInfo.reset();
} else {
x.retryInfo = RetryInfo.increment(x.retryInfo);
}
delete x.lastError;
await tx.depositGroups.put(x);
});
}
/**
* Report an error that occurred while processing the deposit group.
*/
async function reportDepositGroupError(
ws: InternalWalletState,
depositGroupId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({ depositGroups: x.depositGroups }))
.runReadWrite(async (tx) => {
const r = await tx.depositGroups.get(depositGroupId);
if (!r) {
return;
}
if (!r.retryInfo) {
logger.error(
`deposit group record (${depositGroupId}) reports error, but no retry active`,
);
return;
}
r.lastError = err;
await tx.depositGroups.put(r);
});
ws.notify({ type: NotificationType.DepositOperationError, error: err });
}
export async function processDepositGroup(
ws: InternalWalletState,
depositGroupId: string,
@ -129,29 +81,7 @@ export async function processDepositGroup(
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<void> {
const onOpErr = (err: TalerErrorDetail): Promise<void> =>
reportDepositGroupError(ws, depositGroupId, err);
return await guardOperationException(
async () => await processDepositGroupImpl(ws, depositGroupId, options),
onOpErr,
);
}
/**
* @see {processDepositGroup}
*/
async function processDepositGroupImpl(
ws: InternalWalletState,
depositGroupId: string,
options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
await setupDepositGroupRetry(ws, depositGroupId, { resetRetry: forceNow });
): Promise<OperationAttemptResult> {
const depositGroup = await ws.db
.mktx((x) => ({
depositGroups: x.depositGroups,
@ -161,11 +91,11 @@ async function processDepositGroupImpl(
});
if (!depositGroup) {
logger.warn(`deposit group ${depositGroupId} not found`);
return;
return OperationAttemptResult.finishedEmpty();
}
if (depositGroup.timestampFinished) {
logger.trace(`deposit group ${depositGroupId} already finished`);
return;
return OperationAttemptResult.finishedEmpty();
}
const contractData = extractContractData(
@ -240,11 +170,10 @@ async function processDepositGroupImpl(
if (allDeposited) {
dg.timestampFinished = TalerProtocolTimestamp.now();
dg.operationStatus = OperationStatus.Finished;
delete dg.lastError;
delete dg.retryInfo;
await tx.depositGroups.put(dg);
}
});
return OperationAttemptResult.finishedEmpty();
}
export async function trackDepositGroup(
@ -338,9 +267,9 @@ export async function getFeeForDeposit(
const csr: CoinSelectionRequest = {
allowedAuditors: [],
allowedExchanges: Object.values(exchangeInfos).map(v => ({
allowedExchanges: Object.values(exchangeInfos).map((v) => ({
exchangeBaseUrl: v.url,
exchangePub: v.master_pub
exchangePub: v.master_pub,
})),
amount: Amounts.parseOrThrow(req.amount),
maxDepositFee: Amounts.parseOrThrow(req.amount),
@ -383,7 +312,6 @@ export async function prepareDepositGroup(
}
const amount = Amounts.parseOrThrow(req.amount);
const exchangeInfos: { url: string; master_pub: string }[] = [];
await ws.db
@ -473,7 +401,7 @@ export async function prepareDepositGroup(
payCoinSel,
);
return { totalDepositCost, effectiveDepositAmount }
return { totalDepositCost, effectiveDepositAmount };
}
export async function createDepositGroup(
ws: InternalWalletState,
@ -600,9 +528,7 @@ export async function createDepositGroup(
payto_uri: req.depositPaytoUri,
salt: wireSalt,
},
retryInfo: RetryInfo.reset(),
operationStatus: OperationStatus.Pending,
lastError: undefined,
};
await ws.db

View File

@ -53,6 +53,8 @@ import {
DenominationVerificationStatus,
ExchangeDetailsRecord,
ExchangeRecord,
OperationAttemptResult,
OperationAttemptResultType,
WalletStoresV1,
} from "../db.js";
import { TalerError } from "../errors.js";
@ -64,7 +66,7 @@ import {
readSuccessResponseTextOrThrow,
} from "../util/http.js";
import { DbAccess, GetReadOnlyAccess } from "../util/query.js";
import { RetryInfo } from "../util/retries.js";
import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";
import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js";
import { guardOperationException } from "./common.js";
@ -102,51 +104,6 @@ function denominationRecordFromKeys(
return d;
}
async function reportExchangeUpdateError(
ws: InternalWalletState,
baseUrl: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({ exchanges: x.exchanges }))
.runReadWrite(async (tx) => {
const exchange = await tx.exchanges.get(baseUrl);
if (!exchange) {
return;
}
if (!exchange.retryInfo) {
logger.reportBreak();
}
exchange.lastError = err;
await tx.exchanges.put(exchange);
});
ws.notify({ type: NotificationType.ExchangeOperationError, error: err });
}
async function setupExchangeUpdateRetry(
ws: InternalWalletState,
baseUrl: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({ exchanges: x.exchanges }))
.runReadWrite(async (tx) => {
const exchange = await tx.exchanges.get(baseUrl);
if (!exchange) {
return;
}
if (options.reset) {
exchange.retryInfo = RetryInfo.reset();
} else {
exchange.retryInfo = RetryInfo.increment(exchange.retryInfo);
}
delete exchange.lastError;
await tx.exchanges.put(exchange);
});
}
export function getExchangeRequestTimeout(): Duration {
return Duration.fromSpec({
seconds: 5,
@ -360,25 +317,6 @@ async function downloadExchangeWireInfo(
return wireInfo;
}
export async function updateExchangeFromUrl(
ws: InternalWalletState,
baseUrl: string,
options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<{
exchange: ExchangeRecord;
exchangeDetails: ExchangeDetailsRecord;
}> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportExchangeUpdateError(ws, baseUrl, e);
return await guardOperationException(
() => updateExchangeFromUrlImpl(ws, baseUrl, options),
onOpErr,
);
}
async function provideExchangeRecord(
ws: InternalWalletState,
baseUrl: string,
@ -398,7 +336,6 @@ async function provideExchangeRecord(
const r: ExchangeRecord = {
permanent: true,
baseUrl: baseUrl,
retryInfo: RetryInfo.reset(),
detailsPointer: undefined,
lastUpdate: undefined,
nextUpdate: AbsoluteTime.toTimestamp(now),
@ -530,12 +467,7 @@ export async function downloadTosFromAcceptedFormat(
);
}
/**
* Update or add exchange DB entry by fetching the /keys and /wire information.
* Optionally link the reserve entry to the new or existing
* exchange entry in then DB.
*/
async function updateExchangeFromUrlImpl(
export async function updateExchangeFromUrl(
ws: InternalWalletState,
baseUrl: string,
options: {
@ -546,9 +478,31 @@ async function updateExchangeFromUrlImpl(
exchange: ExchangeRecord;
exchangeDetails: ExchangeDetailsRecord;
}> {
return runOperationHandlerForResult(
await updateExchangeFromUrlHandler(ws, baseUrl, options),
);
}
/**
* Update or add exchange DB entry by fetching the /keys and /wire information.
* Optionally link the reserve entry to the new or existing
* exchange entry in then DB.
*/
export async function updateExchangeFromUrlHandler(
ws: InternalWalletState,
baseUrl: string,
options: {
forceNow?: boolean;
cancellationToken?: CancellationToken;
} = {},
): Promise<
OperationAttemptResult<{
exchange: ExchangeRecord;
exchangeDetails: ExchangeDetailsRecord;
}>
> {
const forceNow = options.forceNow ?? false;
logger.info(`updating exchange info for ${baseUrl}, forced: ${forceNow}`);
await setupExchangeUpdateRetry(ws, baseUrl, { reset: forceNow });
const now = AbsoluteTime.now();
baseUrl = canonicalizeBaseUrl(baseUrl);
@ -565,7 +519,10 @@ async function updateExchangeFromUrlImpl(
!AbsoluteTime.isExpired(AbsoluteTime.fromTimestamp(exchange.nextUpdate))
) {
logger.info("using existing exchange info");
return { exchange, exchangeDetails };
return {
type: OperationAttemptResultType.Finished,
result: { exchange, exchangeDetails },
};
}
logger.info("updating exchange /keys info");
@ -649,8 +606,6 @@ async function updateExchangeFromUrlImpl(
termsOfServiceAcceptedTimestamp: TalerProtocolTimestamp.now(),
};
// FIXME: only update if pointer got updated
delete r.lastError;
delete r.retryInfo;
r.lastUpdate = TalerProtocolTimestamp.now();
r.nextUpdate = keysInfo.expiry;
// New denominations might be available.
@ -771,8 +726,11 @@ async function updateExchangeFromUrlImpl(
type: NotificationType.ExchangeAdded,
});
return {
exchange: updated.exchange,
exchangeDetails: updated.exchangeDetails,
type: OperationAttemptResultType.Finished,
result: {
exchange: updated.exchange,
exchangeDetails: updated.exchangeDetails,
},
};
}

View File

@ -37,9 +37,6 @@ import {
ContractTerms,
ContractTermsUtil,
Duration,
durationMax,
durationMin,
durationMul,
encodeCrock,
ForcedCoinSel,
getRandomBytes,
@ -59,10 +56,7 @@ import {
TransactionType,
URL,
} from "@gnu-taler/taler-util";
import {
EXCHANGE_COINS_LOCK,
InternalWalletState,
} from "../internal-wallet-state.js";
import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
import {
AbortStatus,
AllowedAuditorInfo,
@ -71,6 +65,8 @@ import {
CoinRecord,
CoinStatus,
DenominationRecord,
OperationAttemptResult,
OperationAttemptResultType,
ProposalRecord,
ProposalStatus,
PurchaseRecord,
@ -82,6 +78,11 @@ import {
makePendingOperationFailedError,
TalerError,
} from "../errors.js";
import {
EXCHANGE_COINS_LOCK,
InternalWalletState,
} from "../internal-wallet-state.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import {
AvailableCoinInfo,
CoinCandidateSelection,
@ -98,11 +99,9 @@ import {
throwUnexpectedRequestError,
} from "../util/http.js";
import { GetReadWriteAccess } from "../util/query.js";
import { RetryInfo } from "../util/retries.js";
import { RetryInfo, RetryTags, scheduleRetry } from "../util/retries.js";
import { getExchangeDetails } from "./exchanges.js";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js";
import { guardOperationException } from "./common.js";
import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
/**
* Logger.
@ -448,10 +447,6 @@ async function recordConfirmPay(
timestampAccept: AbsoluteTime.toTimestamp(AbsoluteTime.now()),
timestampLastRefundStatus: undefined,
proposalId: proposal.proposalId,
lastPayError: undefined,
lastRefundStatusError: undefined,
payRetryInfo: RetryInfo.reset(),
refundStatusRetryInfo: RetryInfo.reset(),
refundQueryRequested: false,
timestampFirstSuccessfulPay: undefined,
autoRefundDeadline: undefined,
@ -475,8 +470,6 @@ async function recordConfirmPay(
const p = await tx.proposals.get(proposal.proposalId);
if (p) {
p.proposalStatus = ProposalStatus.Accepted;
delete p.lastError;
delete p.retryInfo;
await tx.proposals.put(p);
}
await tx.purchases.put(t);
@ -490,117 +483,6 @@ async function recordConfirmPay(
return t;
}
async function reportProposalError(
ws: InternalWalletState,
proposalId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({ proposals: x.proposals }))
.runReadWrite(async (tx) => {
const pr = await tx.proposals.get(proposalId);
if (!pr) {
return;
}
if (!pr.retryInfo) {
logger.error(
`Asked to report an error for a proposal (${proposalId}) that is not active (no retryInfo)`,
);
logger.reportBreak();
return;
}
pr.lastError = err;
await tx.proposals.put(pr);
});
ws.notify({ type: NotificationType.ProposalOperationError, error: err });
}
async function setupProposalRetry(
ws: InternalWalletState,
proposalId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({ proposals: x.proposals }))
.runReadWrite(async (tx) => {
const pr = await tx.proposals.get(proposalId);
if (!pr) {
return;
}
if (options.reset) {
pr.retryInfo = RetryInfo.reset();
} else {
pr.retryInfo = RetryInfo.increment(pr.retryInfo);
}
delete pr.lastError;
await tx.proposals.put(pr);
});
}
async function setupPurchasePayRetry(
ws: InternalWalletState,
proposalId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({ purchases: x.purchases }))
.runReadWrite(async (tx) => {
const p = await tx.purchases.get(proposalId);
if (!p) {
return;
}
if (options.reset) {
p.payRetryInfo = RetryInfo.reset();
} else {
p.payRetryInfo = RetryInfo.increment(p.payRetryInfo);
}
delete p.lastPayError;
await tx.purchases.put(p);
});
}
async function reportPurchasePayError(
ws: InternalWalletState,
proposalId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({ purchases: x.purchases }))
.runReadWrite(async (tx) => {
const pr = await tx.purchases.get(proposalId);
if (!pr) {
return;
}
if (!pr.payRetryInfo) {
logger.error(
`purchase record (${proposalId}) reports error, but no retry active`,
);
}
pr.lastPayError = err;
await tx.purchases.put(pr);
});
ws.notify({ type: NotificationType.PayOperationError, error: err });
}
export async function processDownloadProposal(
ws: InternalWalletState,
proposalId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const onOpErr = (err: TalerErrorDetail): Promise<void> =>
reportProposalError(ws, proposalId, err);
await guardOperationException(
() => processDownloadProposalImpl(ws, proposalId, options),
onOpErr,
);
}
async function failProposalPermanently(
ws: InternalWalletState,
proposalId: string,
@ -613,23 +495,21 @@ async function failProposalPermanently(
if (!p) {
return;
}
delete p.retryInfo;
p.lastError = err;
p.proposalStatus = ProposalStatus.PermanentlyFailed;
await tx.proposals.put(p);
});
}
function getProposalRequestTimeout(proposal: ProposalRecord): Duration {
function getProposalRequestTimeout(retryInfo?: RetryInfo): Duration {
return Duration.clamp({
lower: Duration.fromSpec({ seconds: 1 }),
upper: Duration.fromSpec({ seconds: 60 }),
value: RetryInfo.getDuration(proposal.retryInfo),
value: retryInfo ? RetryInfo.getDuration(retryInfo) : Duration.fromSpec({}),
});
}
function getPayRequestTimeout(purchase: PurchaseRecord): Duration {
return durationMul(
return Duration.multiply(
{ d_ms: 15000 },
1 + purchase.payCoinSelection.coinPubs.length / 5,
);
@ -682,15 +562,13 @@ export function extractContractData(
};
}
async function processDownloadProposalImpl(
export async function processDownloadProposal(
ws: InternalWalletState,
proposalId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
await setupProposalRetry(ws, proposalId, { reset: forceNow });
options: {} = {},
): Promise<OperationAttemptResult> {
const res = ws.db.mktx2((x) => [x.auditorTrust, x.coins])
const proposal = await ws.db
.mktx((x) => ({ proposals: x.proposals }))
@ -699,11 +577,17 @@ async function processDownloadProposalImpl(
});
if (!proposal) {
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
if (proposal.proposalStatus != ProposalStatus.Downloading) {
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
const orderClaimUrl = new URL(
@ -722,8 +606,16 @@ async function processDownloadProposalImpl(
requestBody.token = proposal.claimToken;
}
const opId = RetryTags.forProposalClaim(proposal);
const retryRecord = await ws.db
.mktx((x) => ({ operationRetries: x.operationRetries }))
.runReadOnly(async (tx) => {
return tx.operationRetries.get(opId);
});
// FIXME: Do this in the background using the new return value
const httpResponse = await ws.http.postJson(orderClaimUrl, requestBody, {
timeout: getProposalRequestTimeout(proposal),
timeout: getProposalRequestTimeout(retryRecord?.retryInfo),
});
const r = await readSuccessResponseJsonOrErrorCode(
httpResponse,
@ -892,6 +784,11 @@ async function processDownloadProposalImpl(
type: NotificationType.ProposalDownloaded,
proposalId: proposal.proposalId,
});
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
/**
@ -954,8 +851,6 @@ async function startDownloadProposal(
proposalId: proposalId,
proposalStatus: ProposalStatus.Downloading,
repurchaseProposalId: undefined,
retryInfo: RetryInfo.reset(),
lastError: undefined,
downloadSessionId: sessionId,
};
@ -1000,17 +895,13 @@ async function storeFirstPaySuccess(
}
purchase.timestampFirstSuccessfulPay = now;
purchase.paymentSubmitPending = false;
purchase.lastPayError = undefined;
purchase.lastSessionId = sessionId;
purchase.payRetryInfo = RetryInfo.reset();
purchase.merchantPaySig = paySig;
const protoAr = purchase.download.contractData.autoRefund;
if (protoAr) {
const ar = Duration.fromTalerProtocolDuration(protoAr);
logger.info("auto_refund present");
purchase.refundQueryRequested = true;
purchase.refundStatusRetryInfo = RetryInfo.reset();
purchase.lastRefundStatusError = undefined;
purchase.autoRefundDeadline = AbsoluteTime.toTimestamp(
AbsoluteTime.addDuration(AbsoluteTime.now(), ar),
);
@ -1038,8 +929,6 @@ async function storePayReplaySuccess(
throw Error("invalid payment state");
}
purchase.paymentSubmitPending = false;
purchase.lastPayError = undefined;
purchase.payRetryInfo = RetryInfo.reset();
purchase.lastSessionId = sessionId;
await tx.purchases.put(purchase);
});
@ -1298,7 +1187,8 @@ export async function checkPaymentByProposalId(
await tx.purchases.put(p);
});
const r = await processPurchasePay(ws, proposalId, { forceNow: true });
if (r.type !== ConfirmPayResultType.Done) {
if (r.type !== OperationAttemptResultType.Finished) {
// FIXME: This does not surface the original error
throw Error("submitting pay failed");
}
return {
@ -1457,6 +1347,45 @@ export async function generateDepositPermissions(
return depositPermissions;
}
/**
* Run the operation handler for a payment
* and return the result as a {@link ConfirmPayResult}.
*/
export async function runPayForConfirmPay(
ws: InternalWalletState,
proposalId: string,
): Promise<ConfirmPayResult> {
const res = await processPurchasePay(ws, proposalId, { forceNow: true });
switch (res.type) {
case OperationAttemptResultType.Finished: {
const purchase = await ws.db
.mktx((x) => ({ purchases: x.purchases }))
.runReadOnly(async (tx) => {
return tx.purchases.get(proposalId);
});
if (!purchase?.download) {
throw Error("purchase record not available anymore");
}
return {
type: ConfirmPayResultType.Done,
contractTerms: purchase.download.contractTermsRaw,
};
}
case OperationAttemptResultType.Error:
// FIXME: allocate error code!
throw Error("payment failed");
case OperationAttemptResultType.Pending:
return {
type: ConfirmPayResultType.Pending,
lastError: undefined,
};
case OperationAttemptResultType.Longpoll:
throw Error("unexpected processPurchasePay result (longpoll)");
default:
assertUnreachable(res);
}
}
/**
* Add a contract to the wallet and sign coins, and send them.
*/
@ -1503,7 +1432,7 @@ export async function confirmPay(
if (existingPurchase) {
logger.trace("confirmPay: submitting payment for existing purchase");
return await processPurchasePay(ws, proposalId, { forceNow: true });
return runPayForConfirmPay(ws, proposalId);
}
logger.trace("confirmPay: purchase record does not exist yet");
@ -1559,6 +1488,7 @@ export async function confirmPay(
res,
d.contractData,
);
await recordConfirmPay(
ws,
proposal,
@ -1567,7 +1497,7 @@ export async function confirmPay(
sessionIdOverride,
);
return await processPurchasePay(ws, proposalId, { forceNow: true });
return runPayForConfirmPay(ws, proposalId);
}
export async function processPurchasePay(
@ -1576,24 +1506,7 @@ export async function processPurchasePay(
options: {
forceNow?: boolean;
} = {},
): Promise<ConfirmPayResult> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportPurchasePayError(ws, proposalId, e);
return await guardOperationException(
() => processPurchasePayImpl(ws, proposalId, options),
onOpErr,
);
}
async function processPurchasePayImpl(
ws: InternalWalletState,
proposalId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<ConfirmPayResult> {
const forceNow = options.forceNow ?? false;
await setupPurchasePayRetry(ws, proposalId, { reset: forceNow });
): Promise<OperationAttemptResult> {
const purchase = await ws.db
.mktx((x) => ({ purchases: x.purchases }))
.runReadOnly(async (tx) => {
@ -1601,8 +1514,8 @@ async function processPurchasePayImpl(
});
if (!purchase) {
return {
type: ConfirmPayResultType.Pending,
lastError: {
type: OperationAttemptResultType.Error,
errorDetail: {
// FIXME: allocate more specific error code
code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
hint: `trying to pay for purchase that is not in the database`,
@ -1611,10 +1524,7 @@ async function processPurchasePayImpl(
};
}
if (!purchase.paymentSubmitPending) {
return {
type: ConfirmPayResultType.Pending,
lastError: purchase.lastPayError,
};
OperationAttemptResult.finishedEmpty();
}
logger.trace(`processing purchase pay ${proposalId}`);
@ -1659,23 +1569,12 @@ async function processPurchasePayImpl(
logger.trace(`got resp ${JSON.stringify(resp)}`);
// Hide transient errors.
if (
(purchase.payRetryInfo?.retryCounter ?? 0) <= 5 &&
resp.status >= 500 &&
resp.status <= 599
) {
logger.trace("treating /pay error as transient");
const err = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
getHttpResponseErrorDetails(resp),
"/pay failed",
);
return {
type: ConfirmPayResultType.Pending,
lastError: err,
};
}
const payOpId = RetryTags.forPay(purchase);
const payRetryRecord = await ws.db
.mktx((x) => ({ operationRetries: x.operationRetries }))
.runReadOnly(async (tx) => {
return await tx.operationRetries.get(payOpId);
});
if (resp.status === HttpStatusCode.BadRequest) {
const errDetails = await readUnexpectedResponseDetails(resp);
@ -1689,8 +1588,6 @@ async function processPurchasePayImpl(
return;
}
purch.payFrozen = true;
purch.lastPayError = errDetails;
delete purch.payRetryInfo;
await tx.purchases.put(purch);
});
throw makePendingOperationFailedError(
@ -1708,7 +1605,9 @@ async function processPurchasePayImpl(
) {
// Do this in the background, as it might take some time
handleInsufficientFunds(ws, proposalId, err).catch(async (e) => {
reportPurchasePayError(ws, proposalId, {
console.log("handling insufficient funds failed");
await scheduleRetry(ws, RetryTags.forPay(purchase), {
code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
message: "unexpected exception",
hint: "unexpected exception",
@ -1719,9 +1618,8 @@ async function processPurchasePayImpl(
});
return {
type: ConfirmPayResultType.Pending,
// FIXME: should we return something better here?
lastError: err,
type: OperationAttemptResultType.Pending,
result: undefined,
};
}
}
@ -1761,22 +1659,6 @@ async function processPurchasePayImpl(
const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () =>
ws.http.postJson(payAgainUrl, reqBody),
);
// Hide transient errors.
if (
(purchase.payRetryInfo?.retryCounter ?? 0) <= 5 &&
resp.status >= 500 &&
resp.status <= 599
) {
const err = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
getHttpResponseErrorDetails(resp),
"/paid failed",
);
return {
type: ConfirmPayResultType.Pending,
lastError: err,
};
}
if (resp.status !== 204) {
throw TalerError.fromDetail(
TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
@ -1793,10 +1675,7 @@ async function processPurchasePayImpl(
proposalId: purchase.proposalId,
});
return {
type: ConfirmPayResultType.Done,
contractTerms: purchase.download.contractTermsRaw,
};
return OperationAttemptResult.finishedEmpty();
}
export async function refuseProposal(

View File

@ -36,40 +36,50 @@ import {
import { AbsoluteTime } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { GetReadOnlyAccess } from "../util/query.js";
import { RetryTags } from "../util/retries.js";
import { Wallet } from "../wallet.js";
async function gatherExchangePending(
tx: GetReadOnlyAccess<{
exchanges: typeof WalletStoresV1.exchanges;
exchangeDetails: typeof WalletStoresV1.exchangeDetails;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.exchanges.iter().forEachAsync(async (e) => {
await tx.exchanges.iter().forEachAsync(async (exch) => {
const opTag = RetryTags.forExchangeUpdate(exch);
let opr = await tx.operationRetries.get(opTag);
resp.pendingOperations.push({
type: PendingTaskType.ExchangeUpdate,
id: opTag,
givesLifeness: false,
timestampDue:
e.retryInfo?.nextRetry ?? AbsoluteTime.fromTimestamp(e.nextUpdate),
exchangeBaseUrl: e.baseUrl,
lastError: e.lastError,
opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate),
exchangeBaseUrl: exch.baseUrl,
lastError: opr?.lastError,
});
// We only schedule a check for auto-refresh if the exchange update
// was successful.
if (!e.lastError) {
if (!opr?.lastError) {
resp.pendingOperations.push({
type: PendingTaskType.ExchangeCheckRefresh,
timestampDue: AbsoluteTime.fromTimestamp(e.nextRefreshCheck),
id: RetryTags.forExchangeCheckRefresh(exch),
timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck),
givesLifeness: false,
exchangeBaseUrl: e.baseUrl,
exchangeBaseUrl: exch.baseUrl,
});
}
});
}
async function gatherRefreshPending(
tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>,
tx: GetReadOnlyAccess<{
refreshGroups: typeof WalletStoresV1.refreshGroups;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
@ -83,15 +93,19 @@ async function gatherRefreshPending(
if (r.frozen) {
return;
}
const opId = RetryTags.forRefresh(r);
const retryRecord = await tx.operationRetries.get(opId);
resp.pendingOperations.push({
type: PendingTaskType.Refresh,
id: opId,
givesLifeness: true,
timestampDue: r.retryInfo.nextRetry,
timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
refreshGroupId: r.refreshGroupId,
finishedPerCoin: r.statusPerCoin.map(
(x) => x === RefreshCoinStatus.Finished,
),
retryInfo: r.retryInfo,
retryInfo: retryRecord?.retryInfo,
});
}
}
@ -100,6 +114,7 @@ async function gatherWithdrawalPending(
tx: GetReadOnlyAccess<{
withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;
planchets: typeof WalletStoresV1.planchets;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
@ -111,54 +126,68 @@ async function gatherWithdrawalPending(
if (wsr.timestampFinish) {
return;
}
let numCoinsWithdrawn = 0;
let numCoinsTotal = 0;
await tx.planchets.indexes.byGroup
.iter(wsr.withdrawalGroupId)
.forEach((x) => {
numCoinsTotal++;
if (x.withdrawalDone) {
numCoinsWithdrawn++;
}
});
const opTag = RetryTags.forWithdrawal(wsr);
let opr = await tx.operationRetries.get(opTag);
const now = AbsoluteTime.now();
if (!opr) {
opr = {
id: opTag,
retryInfo: {
firstTry: now,
nextRetry: now,
retryCounter: 0,
},
};
}
resp.pendingOperations.push({
type: PendingTaskType.Withdraw,
id: opTag,
givesLifeness: true,
timestampDue: wsr.retryInfo?.nextRetry ?? AbsoluteTime.now(),
timestampDue: opr.retryInfo?.nextRetry ?? AbsoluteTime.now(),
withdrawalGroupId: wsr.withdrawalGroupId,
lastError: wsr.lastError,
retryInfo: wsr.retryInfo,
lastError: opr.lastError,
retryInfo: opr.retryInfo,
});
}
}
async function gatherProposalPending(
tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>,
tx: GetReadOnlyAccess<{
proposals: typeof WalletStoresV1.proposals;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.proposals.iter().forEach((proposal) => {
await tx.proposals.iter().forEachAsync(async (proposal) => {
if (proposal.proposalStatus == ProposalStatus.Proposed) {
// Nothing to do, user needs to choose.
} else if (proposal.proposalStatus == ProposalStatus.Downloading) {
const timestampDue = proposal.retryInfo?.nextRetry ?? AbsoluteTime.now();
const opId = RetryTags.forProposalClaim(proposal);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue =
retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.ProposalDownload,
id: opId,
givesLifeness: true,
timestampDue,
merchantBaseUrl: proposal.merchantBaseUrl,
orderId: proposal.orderId,
proposalId: proposal.proposalId,
proposalTimestamp: proposal.timestamp,
lastError: proposal.lastError,
retryInfo: proposal.retryInfo,
lastError: retryRecord?.lastError,
retryInfo: retryRecord?.retryInfo,
});
}
});
}
async function gatherDepositPending(
tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>,
tx: GetReadOnlyAccess<{
depositGroups: typeof WalletStoresV1.depositGroups;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
@ -169,32 +198,42 @@ async function gatherDepositPending(
if (dg.timestampFinished) {
return;
}
const timestampDue = dg.retryInfo?.nextRetry ?? AbsoluteTime.now();
const opId = RetryTags.forDeposit(dg);
const retryRecord = await tx.operationRetries.get(opId);
const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.Deposit,
id: opId,
givesLifeness: true,
timestampDue,
depositGroupId: dg.depositGroupId,
lastError: dg.lastError,
retryInfo: dg.retryInfo,
lastError: retryRecord?.lastError,
retryInfo: retryRecord?.retryInfo,
});
}
}
async function gatherTipPending(
tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>,
tx: GetReadOnlyAccess<{
tips: typeof WalletStoresV1.tips;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.tips.iter().forEach((tip) => {
await tx.tips.iter().forEachAsync(async (tip) => {
// FIXME: The tip record needs a proper status field!
if (tip.pickedUpTimestamp) {
return;
}
const opId = RetryTags.forTipPickup(tip);
const retryRecord = await tx.operationRetries.get(opId);
if (tip.acceptedTimestamp) {
resp.pendingOperations.push({
type: PendingTaskType.TipPickup,
id: opId,
givesLifeness: true,
timestampDue: tip.retryInfo.nextRetry,
timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
merchantBaseUrl: tip.merchantBaseUrl,
tipId: tip.walletTipId,
merchantTipId: tip.merchantTipId,
@ -204,56 +243,77 @@ async function gatherTipPending(
}
async function gatherPurchasePending(
tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>,
tx: GetReadOnlyAccess<{
purchases: typeof WalletStoresV1.purchases;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.purchases.iter().forEach((pr) => {
// FIXME: Only iter purchases with some "active" flag!
await tx.purchases.iter().forEachAsync(async (pr) => {
if (
pr.paymentSubmitPending &&
pr.abortStatus === AbortStatus.None &&
!pr.payFrozen
) {
const timestampDue = pr.payRetryInfo?.nextRetry ?? AbsoluteTime.now();
const payOpId = RetryTags.forPay(pr);
const payRetryRecord = await tx.operationRetries.get(payOpId);
const timestampDue =
payRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();
resp.pendingOperations.push({
type: PendingTaskType.Pay,
id: payOpId,
givesLifeness: true,
timestampDue,
isReplay: false,
proposalId: pr.proposalId,
retryInfo: pr.payRetryInfo,
lastError: pr.lastPayError,
retryInfo: payRetryRecord?.retryInfo,
lastError: payRetryRecord?.lastError,
});
}
if (pr.refundQueryRequested) {
const refundQueryOpId = RetryTags.forRefundQuery(pr);
const refundQueryRetryRecord = await tx.operationRetries.get(
refundQueryOpId,
);
resp.pendingOperations.push({
type: PendingTaskType.RefundQuery,
id: refundQueryOpId,
givesLifeness: true,
timestampDue: pr.refundStatusRetryInfo.nextRetry,
timestampDue:
refundQueryRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
proposalId: pr.proposalId,
retryInfo: pr.refundStatusRetryInfo,
lastError: pr.lastRefundStatusError,
retryInfo: refundQueryRetryRecord?.retryInfo,
lastError: refundQueryRetryRecord?.lastError,
});
}
});
}
async function gatherRecoupPending(
tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>,
tx: GetReadOnlyAccess<{
recoupGroups: typeof WalletStoresV1.recoupGroups;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.recoupGroups.iter().forEach((rg) => {
await tx.recoupGroups.iter().forEachAsync(async (rg) => {
if (rg.timestampFinished) {
return;
}
const opId = RetryTags.forRecoup(rg);
const retryRecord = await tx.operationRetries.get(opId);
resp.pendingOperations.push({
type: PendingTaskType.Recoup,
id: opId,
givesLifeness: true,
timestampDue: rg.retryInfo.nextRetry,
timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),
recoupGroupId: rg.recoupGroupId,
retryInfo: rg.retryInfo,
lastError: rg.lastError,
retryInfo: retryRecord?.retryInfo,
lastError: retryRecord?.lastError,
});
});
}
@ -261,14 +321,18 @@ async function gatherRecoupPending(
async function gatherBackupPending(
tx: GetReadOnlyAccess<{
backupProviders: typeof WalletStoresV1.backupProviders;
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
now: AbsoluteTime,
resp: PendingOperationsResponse,
): Promise<void> {
await tx.backupProviders.iter().forEach((bp) => {
await tx.backupProviders.iter().forEachAsync(async (bp) => {
const opId = RetryTags.forBackup(bp);
const retryRecord = await tx.operationRetries.get(opId);
if (bp.state.tag === BackupProviderStateTag.Ready) {
resp.pendingOperations.push({
type: PendingTaskType.Backup,
id: opId,
givesLifeness: false,
timestampDue: AbsoluteTime.fromTimestamp(bp.state.nextBackupTimestamp),
backupProviderBaseUrl: bp.baseUrl,
@ -277,11 +341,12 @@ async function gatherBackupPending(
} else if (bp.state.tag === BackupProviderStateTag.Retrying) {
resp.pendingOperations.push({
type: PendingTaskType.Backup,
id: opId,
givesLifeness: false,
timestampDue: bp.state.retryInfo.nextRetry,
timestampDue: retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(),
backupProviderBaseUrl: bp.baseUrl,
retryInfo: bp.state.retryInfo,
lastError: bp.state.lastError,
retryInfo: retryRecord?.retryInfo,
lastError: retryRecord?.lastError,
});
}
});
@ -305,6 +370,7 @@ export async function getPendingOperations(
planchets: x.planchets,
depositGroups: x.depositGroups,
recoupGroups: x.recoupGroups,
operationRetries: x.operationRetries,
}))
.runReadWrite(async (tx) => {
const resp: PendingOperationsResponse = {

View File

@ -42,6 +42,8 @@ import {
CoinRecord,
CoinSourceType,
CoinStatus,
OperationAttemptResult,
OperationAttemptResultType,
RecoupGroupRecord,
RefreshCoinSource,
ReserveRecordStatus,
@ -52,64 +54,13 @@ import {
import { InternalWalletState } from "../internal-wallet-state.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { GetReadWriteAccess } from "../util/query.js";
import { RetryInfo } from "../util/retries.js";
import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";
import { guardOperationException } from "./common.js";
import { createRefreshGroup, processRefreshGroup } from "./refresh.js";
import { internalCreateWithdrawalGroup } from "./withdraw.js";
const logger = new Logger("operations/recoup.ts");
async function setupRecoupRetry(
ws: InternalWalletState,
recoupGroupId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
recoupGroups: x.recoupGroups,
}))
.runReadWrite(async (tx) => {
const r = await tx.recoupGroups.get(recoupGroupId);
if (!r) {
return;
}
if (options.reset) {
r.retryInfo = RetryInfo.reset();
} else {
r.retryInfo = RetryInfo.increment(r.retryInfo);
}
delete r.lastError;
await tx.recoupGroups.put(r);
});
}
async function reportRecoupError(
ws: InternalWalletState,
recoupGroupId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({
recoupGroups: x.recoupGroups,
}))
.runReadWrite(async (tx) => {
const r = await tx.recoupGroups.get(recoupGroupId);
if (!r) {
return;
}
if (!r.retryInfo) {
logger.error(
"reporting error for inactive recoup group (no retry info)",
);
}
r.lastError = err;
await tx.recoupGroups.put(r);
});
ws.notify({ type: NotificationType.RecoupOperationError, error: err });
}
/**
* Store a recoup group record in the database after marking
* a coin in the group as finished.
@ -353,25 +304,20 @@ export async function processRecoupGroup(
forceNow?: boolean;
} = {},
): Promise<void> {
await ws.memoProcessRecoup.memo(recoupGroupId, async () => {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportRecoupError(ws, recoupGroupId, e);
return await guardOperationException(
async () => await processRecoupGroupImpl(ws, recoupGroupId, options),
onOpErr,
);
});
await runOperationHandlerForResult(
await processRecoupGroupHandler(ws, recoupGroupId, options),
);
return;
}
async function processRecoupGroupImpl(
export async function processRecoupGroupHandler(
ws: InternalWalletState,
recoupGroupId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
): Promise<OperationAttemptResult> {
const forceNow = options.forceNow ?? false;
await setupRecoupRetry(ws, recoupGroupId, { reset: forceNow });
let recoupGroup = await ws.db
.mktx((x) => ({
recoupGroups: x.recoupGroups,
@ -380,11 +326,11 @@ async function processRecoupGroupImpl(
return tx.recoupGroups.get(recoupGroupId);
});
if (!recoupGroup) {
return;
return OperationAttemptResult.finishedEmpty();
}
if (recoupGroup.timestampFinished) {
logger.trace("recoup group finished");
return;
return OperationAttemptResult.finishedEmpty();
}
const ps = recoupGroup.coinPubs.map(async (x, i) => {
try {
@ -404,12 +350,12 @@ async function processRecoupGroupImpl(
return tx.recoupGroups.get(recoupGroupId);
});
if (!recoupGroup) {
return;
return OperationAttemptResult.finishedEmpty();
}
for (const b of recoupGroup.recoupFinishedPerCoin) {
if (!b) {
return;
return OperationAttemptResult.finishedEmpty();
}
}
@ -480,8 +426,6 @@ async function processRecoupGroupImpl(
return;
}
rg2.timestampFinished = TalerProtocolTimestamp.now();
rg2.retryInfo = RetryInfo.reset();
rg2.lastError = undefined;
if (rg2.scheduleRefreshCoins.length > 0) {
const refreshGroupId = await createRefreshGroup(
ws,
@ -495,6 +439,7 @@ async function processRecoupGroupImpl(
}
await tx.recoupGroups.put(rg2);
});
return OperationAttemptResult.finishedEmpty();
}
export async function createRecoupGroup(
@ -514,10 +459,8 @@ export async function createRecoupGroup(
recoupGroupId,
exchangeBaseUrl: exchangeBaseUrl,
coinPubs: coinPubs,
lastError: undefined,
timestampFinished: undefined,
timestampStarted: TalerProtocolTimestamp.now(),
retryInfo: RetryInfo.reset(),
recoupFinishedPerCoin: coinPubs.map(() => false),
// Will be populated later
oldAmountPerCoin: [],

View File

@ -57,6 +57,8 @@ import {
CoinSourceType,
CoinStatus,
DenominationRecord,
OperationAttemptResult,
OperationAttemptResultType,
OperationStatus,
RefreshCoinStatus,
RefreshGroupRecord,
@ -74,7 +76,7 @@ import {
} from "../util/http.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
import { RetryInfo } from "../util/retries.js";
import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";
import { guardOperationException } from "./common.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import {
@ -133,11 +135,9 @@ function updateGroupStatus(rg: RefreshGroupRecord): void {
if (allDone) {
if (anyFrozen) {
rg.frozen = true;
rg.retryInfo = RetryInfo.reset();
} else {
rg.timestampFinished = AbsoluteTime.toTimestamp(AbsoluteTime.now());
rg.operationStatus = OperationStatus.Finished;
rg.retryInfo = RetryInfo.reset();
}
}
}
@ -730,89 +730,14 @@ async function refreshReveal(
});
}
async function setupRefreshRetry(
ws: InternalWalletState,
refreshGroupId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
refreshGroups: x.refreshGroups,
}))
.runReadWrite(async (tx) => {
const r = await tx.refreshGroups.get(refreshGroupId);
if (!r) {
return;
}
if (options.reset) {
r.retryInfo = RetryInfo.reset();
} else {
r.retryInfo = RetryInfo.increment(r.retryInfo);
}
delete r.lastError;
await tx.refreshGroups.put(r);
});
}
async function reportRefreshError(
ws: InternalWalletState,
refreshGroupId: string,
err: TalerErrorDetail | undefined,
): Promise<void> {
await ws.db
.mktx((x) => ({
refreshGroups: x.refreshGroups,
}))
.runReadWrite(async (tx) => {
const r = await tx.refreshGroups.get(refreshGroupId);
if (!r) {
return;
}
if (!r.retryInfo) {
logger.error(
"reported error for inactive refresh group (no retry info)",
);
}
r.lastError = err;
await tx.refreshGroups.put(r);
});
if (err) {
ws.notify({ type: NotificationType.RefreshOperationError, error: err });
}
}
/**
* Actually process a refresh group that has been created.
*/
export async function processRefreshGroup(
ws: InternalWalletState,
refreshGroupId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
await ws.memoProcessRefresh.memo(refreshGroupId, async () => {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportRefreshError(ws, refreshGroupId, e);
return await guardOperationException(
async () => await processRefreshGroupImpl(ws, refreshGroupId, options),
onOpErr,
);
});
}
async function processRefreshGroupImpl(
ws: InternalWalletState,
refreshGroupId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
): Promise<OperationAttemptResult> {
logger.info(`processing refresh group ${refreshGroupId}`);
await setupRefreshRetry(ws, refreshGroupId, { reset: forceNow });
const refreshGroup = await ws.db
.mktx((x) => ({
@ -822,10 +747,16 @@ async function processRefreshGroupImpl(
return tx.refreshGroups.get(refreshGroupId);
});
if (!refreshGroup) {
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
if (refreshGroup.timestampFinished) {
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
// Process refresh sessions of the group in parallel.
logger.trace("processing refresh sessions for old coins");
@ -855,6 +786,10 @@ async function processRefreshGroupImpl(
logger.warn("process refresh sessions got exception");
logger.warn(`exception: ${e}`);
}
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
async function processRefreshSession(
@ -975,13 +910,11 @@ export async function createRefreshGroup(
operationStatus: OperationStatus.Pending,
timestampFinished: undefined,
statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending),
lastError: undefined,
lastErrorPerCoin: {},
oldCoinPubs: oldCoinPubs.map((x) => x.coinPub),
reason,
refreshGroupId,
refreshSessionPerCoin: oldCoinPubs.map(() => undefined),
retryInfo: RetryInfo.reset(),
inputPerCoin,
estimatedOutputPerCoin,
timestampCreated: TalerProtocolTimestamp.now(),
@ -1034,7 +967,7 @@ function getAutoRefreshExecuteThreshold(d: DenominationRecord): AbsoluteTime {
export async function autoRefresh(
ws: InternalWalletState,
exchangeBaseUrl: string,
): Promise<void> {
): Promise<OperationAttemptResult> {
logger.info(`doing auto-refresh check for '${exchangeBaseUrl}'`);
// We must make sure that the exchange is up-to-date so that
@ -1109,4 +1042,5 @@ export async function autoRefresh(
exchange.nextRefreshCheck = AbsoluteTime.toTimestamp(minCheckThreshold);
await tx.exchanges.put(exchange);
});
return OperationAttemptResult.finishedEmpty();
}

View File

@ -51,6 +51,7 @@ import {
import {
AbortStatus,
CoinStatus,
OperationAttemptResult,
PurchaseRecord,
RefundReason,
RefundState,
@ -60,8 +61,6 @@ import { InternalWalletState } from "../internal-wallet-state.js";
import { readSuccessResponseJsonOrThrow } from "../util/http.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
import { RetryInfo } from "../util/retries.js";
import { guardOperationException } from "./common.js";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js";
const logger = new Logger("refund.ts");
@ -120,68 +119,6 @@ export async function prepareRefund(
},
};
}
/**
* Retry querying and applying refunds for an order later.
*/
async function setupPurchaseQueryRefundRetry(
ws: InternalWalletState,
proposalId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
purchases: x.purchases,
}))
.runReadWrite(async (tx) => {
const pr = await tx.purchases.get(proposalId);
if (!pr) {
return;
}
if (options.reset) {
pr.refundStatusRetryInfo = RetryInfo.reset();
} else {
pr.refundStatusRetryInfo = RetryInfo.increment(
pr.refundStatusRetryInfo,
);
}
await tx.purchases.put(pr);
});
}
/**
* Report an error that happending when querying for a purchase's refund.
*/
async function reportPurchaseQueryRefundError(
ws: InternalWalletState,
proposalId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({
purchases: x.purchases,
}))
.runReadWrite(async (tx) => {
const pr = await tx.purchases.get(proposalId);
if (!pr) {
return;
}
if (!pr.refundStatusRetryInfo) {
logger.error(
"reported error on an inactive purchase (no refund status retry info)",
);
}
pr.lastRefundStatusError = err;
await tx.purchases.put(pr);
});
if (err) {
ws.notify({
type: NotificationType.RefundStatusOperationError,
error: err,
});
}
}
function getRefundKey(d: MerchantCoinRefundStatus): string {
return `${d.coin_pub}-${d.rtransaction_id}`;
@ -492,8 +429,6 @@ async function acceptRefunds(
if (queryDone) {
p.timestampLastRefundStatus = now;
p.lastRefundStatusError = undefined;
p.refundStatusRetryInfo = RetryInfo.reset();
p.refundQueryRequested = false;
if (p.abortStatus === AbortStatus.AbortRefund) {
p.abortStatus = AbortStatus.AbortFinished;
@ -502,8 +437,6 @@ async function acceptRefunds(
} else {
// No error, but we need to try again!
p.timestampLastRefundStatus = now;
p.refundStatusRetryInfo = RetryInfo.increment(p.refundStatusRetryInfo);
p.lastRefundStatusError = undefined;
logger.trace("refund query not done");
}
@ -621,8 +554,6 @@ export async function applyRefundFromPurchaseId(
return false;
}
p.refundQueryRequested = true;
p.lastRefundStatusError = undefined;
p.refundStatusRetryInfo = RetryInfo.reset();
await tx.purchases.put(p);
return true;
});
@ -631,7 +562,7 @@ export async function applyRefundFromPurchaseId(
ws.notify({
type: NotificationType.RefundStarted,
});
await processPurchaseQueryRefundImpl(ws, proposalId, {
await processPurchaseQueryRefund(ws, proposalId, {
forceNow: true,
waitForAutoRefund: false,
});
@ -672,22 +603,6 @@ export async function applyRefundFromPurchaseId(
};
}
export async function processPurchaseQueryRefund(
ws: InternalWalletState,
proposalId: string,
options: {
forceNow?: boolean;
waitForAutoRefund?: boolean;
} = {},
): Promise<void> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportPurchaseQueryRefundError(ws, proposalId, e);
await guardOperationException(
() => processPurchaseQueryRefundImpl(ws, proposalId, options),
onOpErr,
);
}
async function queryAndSaveAwaitingRefund(
ws: InternalWalletState,
purchase: PurchaseRecord,
@ -742,17 +657,15 @@ async function queryAndSaveAwaitingRefund(
return refundAwaiting;
}
async function processPurchaseQueryRefundImpl(
export async function processPurchaseQueryRefund(
ws: InternalWalletState,
proposalId: string,
options: {
forceNow?: boolean;
waitForAutoRefund?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
): Promise<OperationAttemptResult> {
const waitForAutoRefund = options.waitForAutoRefund ?? false;
await setupPurchaseQueryRefundRetry(ws, proposalId, { reset: forceNow });
const purchase = await ws.db
.mktx((x) => ({
purchases: x.purchases,
@ -761,11 +674,11 @@ async function processPurchaseQueryRefundImpl(
return tx.purchases.get(proposalId);
});
if (!purchase) {
return;
return OperationAttemptResult.finishedEmpty();
}
if (!purchase.refundQueryRequested) {
return;
return OperationAttemptResult.finishedEmpty();
}
if (purchase.timestampFirstSuccessfulPay) {
@ -780,7 +693,9 @@ async function processPurchaseQueryRefundImpl(
purchase,
waitForAutoRefund,
);
if (Amounts.isZero(awaitingAmount)) return;
if (Amounts.isZero(awaitingAmount)) {
return OperationAttemptResult.finishedEmpty();
}
}
const requestUrl = new URL(
@ -873,6 +788,7 @@ async function processPurchaseQueryRefundImpl(
}
await acceptRefunds(ws, proposalId, refunds, RefundReason.AbortRefund);
}
return OperationAttemptResult.finishedEmpty();
}
export async function abortFailedPayWithRefund(
@ -899,8 +815,6 @@ export async function abortFailedPayWithRefund(
purchase.refundQueryRequested = true;
purchase.paymentSubmitPending = false;
purchase.abortStatus = AbortStatus.AbortRefund;
purchase.lastPayError = undefined;
purchase.payRetryInfo = RetryInfo.reset();
await tx.purchases.put(purchase);
});
processPurchaseQueryRefund(ws, proposalId, {

View File

@ -18,29 +18,45 @@
* Imports.
*/
import {
Amounts, BlindedDenominationSignature,
codecForMerchantTipResponseV2, codecForTipPickupGetResponse, DenomKeyType, encodeCrock, getRandomBytes, j2s, Logger, NotificationType, parseTipUri, PrepareTipResult, TalerErrorCode, TalerErrorDetail, TalerProtocolTimestamp, TipPlanchetDetail, URL
Amounts,
BlindedDenominationSignature,
codecForMerchantTipResponseV2,
codecForTipPickupGetResponse,
DenomKeyType,
encodeCrock,
getRandomBytes,
j2s,
Logger,
parseTipUri,
PrepareTipResult,
TalerErrorCode,
TalerProtocolTimestamp,
TipPlanchetDetail,
URL,
} from "@gnu-taler/taler-util";
import { DerivedTipPlanchet } from "../crypto/cryptoTypes.js";
import {
CoinRecord,
CoinSourceType,
CoinStatus, DenominationRecord, TipRecord
CoinStatus,
DenominationRecord,
OperationAttemptResult,
OperationAttemptResultType,
TipRecord,
} from "../db.js";
import { makeErrorDetail } from "../errors.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import {
getHttpResponseErrorDetails,
readSuccessResponseJsonOrThrow
readSuccessResponseJsonOrThrow,
} from "../util/http.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
import {
RetryInfo
} from "../util/retries.js";
import { guardOperationException } from "./common.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import {
getCandidateWithdrawalDenoms, getExchangeWithdrawalInfo, selectWithdrawalDenominations, updateWithdrawalDenoms
getCandidateWithdrawalDenoms,
getExchangeWithdrawalInfo,
selectWithdrawalDenominations,
updateWithdrawalDenoms,
} from "./withdraw.js";
const logger = new Logger("operations/tip.ts");
@ -114,8 +130,6 @@ export async function prepareTip(
createdTimestamp: TalerProtocolTimestamp.now(),
merchantTipId: res.merchantTipId,
tipAmountEffective: selectedDenoms.totalCoinValue,
retryInfo: RetryInfo.reset(),
lastError: undefined,
denomsSel: selectedDenoms,
pickedUpTimestamp: undefined,
secretSeed,
@ -144,82 +158,13 @@ export async function prepareTip(
return tipStatus;
}
async function reportTipError(
ws: InternalWalletState,
walletTipId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({
tips: x.tips,
}))
.runReadWrite(async (tx) => {
const t = await tx.tips.get(walletTipId);
if (!t) {
return;
}
if (!t.retryInfo) {
logger.reportBreak();
}
t.lastError = err;
await tx.tips.put(t);
});
if (err) {
ws.notify({ type: NotificationType.TipOperationError, error: err });
}
}
async function setupTipRetry(
ws: InternalWalletState,
walletTipId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({
tips: x.tips,
}))
.runReadWrite(async (tx) => {
const t = await tx.tips.get(walletTipId);
if (!t) {
return;
}
if (options.reset) {
t.retryInfo = RetryInfo.reset();
} else {
t.retryInfo = RetryInfo.increment(t.retryInfo);
}
delete t.lastError;
await tx.tips.put(t);
});
}
export async function processTip(
ws: InternalWalletState,
tipId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportTipError(ws, tipId, e);
await guardOperationException(
() => processTipImpl(ws, tipId, options),
onOpErr,
);
}
async function processTipImpl(
ws: InternalWalletState,
walletTipId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
await setupTipRetry(ws, walletTipId, { reset: forceNow });
): Promise<OperationAttemptResult> {
const tipRecord = await ws.db
.mktx((x) => ({
tips: x.tips,
@ -228,12 +173,18 @@ async function processTipImpl(
return tx.tips.get(walletTipId);
});
if (!tipRecord) {
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
if (tipRecord.pickedUpTimestamp) {
logger.warn("tip already picked up");
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
const denomsForWithdraw = tipRecord.denomsSel;
@ -284,22 +235,21 @@ async function processTipImpl(
logger.trace(`got tip response, status ${merchantResp.status}`);
// Hide transient errors.
// FIXME: Why do we do this?
if (
tipRecord.retryInfo.retryCounter < 5 &&
((merchantResp.status >= 500 && merchantResp.status <= 599) ||
merchantResp.status === 424)
(merchantResp.status >= 500 && merchantResp.status <= 599) ||
merchantResp.status === 424
) {
logger.trace(`got transient tip error`);
// FIXME: wrap in another error code that indicates a transient error
const err = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
getHttpResponseErrorDetails(merchantResp),
"tip pickup failed (transient)",
);
await reportTipError(ws, tipRecord.walletTipId, err);
// FIXME: Maybe we want to signal to the caller that the transient error happened?
return;
return {
type: OperationAttemptResultType.Error,
errorDetail: makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
getHttpResponseErrorDetails(merchantResp),
"tip pickup failed (transient)",
),
};
}
let blindedSigs: BlindedDenominationSignature[] = [];
@ -344,21 +294,14 @@ async function processTipImpl(
});
if (!isValid) {
await ws.db
.mktx((x) => ({ tips: x.tips }))
.runReadWrite(async (tx) => {
const tipRecord = await tx.tips.get(walletTipId);
if (!tipRecord) {
return;
}
tipRecord.lastError = makeErrorDetail(
TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID,
{},
"invalid signature from the exchange (via merchant tip) after unblinding",
);
await tx.tips.put(tipRecord);
});
return;
return {
type: OperationAttemptResultType.Error,
errorDetail: makeErrorDetail(
TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID,
{},
"invalid signature from the exchange (via merchant tip) after unblinding",
),
};
}
newCoinRecords.push({
@ -395,13 +338,16 @@ async function processTipImpl(
return;
}
tr.pickedUpTimestamp = TalerProtocolTimestamp.now();
tr.lastError = undefined;
tr.retryInfo = RetryInfo.reset();
await tx.tips.put(tr);
for (const cr of newCoinRecords) {
await tx.coins.put(cr);
}
});
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
export async function acceptTip(

View File

@ -38,7 +38,6 @@ import { InternalWalletState } from "../internal-wallet-state.js";
import {
AbortStatus,
RefundState,
ReserveRecordStatus,
WalletRefundItem,
WithdrawalRecordType,
} from "../db.js";
@ -48,6 +47,7 @@ import { processPurchasePay } from "./pay.js";
import { processRefreshGroup } from "./refresh.js";
import { processTip } from "./tip.js";
import { processWithdrawalGroup } from "./withdraw.js";
import { RetryTags } from "../util/retries.js";
const logger = new Logger("taler-wallet-core:transactions.ts");
@ -142,6 +142,7 @@ export async function getTransactions(
tombstones: x.tombstones,
peerPushPaymentInitiations: x.peerPushPaymentInitiations,
peerPullPaymentIncoming: x.peerPullPaymentIncoming,
operationRetries: x.operationRetries,
}))
.runReadOnly(async (tx) => {
tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => {
@ -220,6 +221,10 @@ export async function getTransactions(
if (shouldSkipSearch(transactionsRequest, [])) {
return;
}
const opId = RetryTags.forWithdrawal(wsr);
const ort = await tx.operationRetries.get(opId);
let withdrawalDetails: WithdrawalDetails;
if (wsr.wgInfo.withdrawalType === WithdrawalRecordType.PeerPullCredit) {
transactions.push({
@ -242,7 +247,7 @@ export async function getTransactions(
wsr.withdrawalGroupId,
),
frozen: false,
...(wsr.lastError ? { error: wsr.lastError } : {}),
...(ort?.lastError ? { error: ort.lastError } : {}),
});
return;
} else if (
@ -264,7 +269,7 @@ export async function getTransactions(
wsr.withdrawalGroupId,
),
frozen: false,
...(wsr.lastError ? { error: wsr.lastError } : {}),
...(ort?.lastError ? { error: ort.lastError } : {}),
});
return;
} else if (
@ -310,7 +315,7 @@ export async function getTransactions(
wsr.withdrawalGroupId,
),
frozen: false,
...(wsr.lastError ? { error: wsr.lastError } : {}),
...(ort?.lastError ? { error: ort.lastError } : {}),
});
});
@ -319,7 +324,8 @@ export async function getTransactions(
if (shouldSkipCurrency(transactionsRequest, amount.currency)) {
return;
}
const opId = RetryTags.forDeposit(dg);
const retryRecord = await tx.operationRetries.get(opId);
transactions.push({
type: TransactionType.Deposit,
amountRaw: Amounts.stringify(dg.effectiveDepositAmount),
@ -333,7 +339,7 @@ export async function getTransactions(
dg.depositGroupId,
),
depositGroupId: dg.depositGroupId,
...(dg.lastError ? { error: dg.lastError } : {}),
...(retryRecord?.lastError ? { error: retryRecord.lastError } : {}),
});
});
@ -456,7 +462,15 @@ export async function getTransactions(
});
}
const err = pr.lastPayError ?? pr.lastRefundStatusError;
const payOpId = RetryTags.forPay(pr);
const refundQueryOpId = RetryTags.forRefundQuery(pr);
const payRetryRecord = await tx.operationRetries.get(payOpId);
const refundQueryRetryRecord = await tx.operationRetries.get(
refundQueryOpId,
);
const err =
refundQueryRetryRecord?.lastError ?? payRetryRecord?.lastError;
transactions.push({
type: TransactionType.Payment,
amountRaw: Amounts.stringify(contractData.amount),
@ -495,6 +509,8 @@ export async function getTransactions(
if (!tipRecord.acceptedTimestamp) {
return;
}
const opId = RetryTags.forTipPickup(tipRecord);
const retryRecord = await tx.operationRetries.get(opId);
transactions.push({
type: TransactionType.Tip,
amountEffective: Amounts.stringify(tipRecord.tipAmountEffective),
@ -507,10 +523,7 @@ export async function getTransactions(
tipRecord.walletTipId,
),
merchantBaseUrl: tipRecord.merchantBaseUrl,
// merchant: {
// name: tipRecord.merchantBaseUrl,
// },
error: tipRecord.lastError,
error: retryRecord?.lastError,
});
});
});
@ -589,7 +602,11 @@ export async function deleteTransaction(
): Promise<void> {
const [typeStr, ...rest] = transactionId.split(":");
const type = typeStr as TransactionType;
if (type === TransactionType.Withdrawal || type === TransactionType.PeerPullCredit || type === TransactionType.PeerPushCredit) {
if (
type === TransactionType.Withdrawal ||
type === TransactionType.PeerPullCredit ||
type === TransactionType.PeerPushCredit
) {
const withdrawalGroupId = rest[0];
await ws.db
.mktx((x) => ({
@ -714,7 +731,9 @@ export async function deleteTransaction(
tombstones: x.tombstones,
}))
.runReadWrite(async (tx) => {
const debit = await tx.peerPullPaymentIncoming.get(peerPullPaymentIncomingId);
const debit = await tx.peerPullPaymentIncoming.get(
peerPullPaymentIncomingId,
);
if (debit) {
await tx.peerPullPaymentIncoming.delete(peerPullPaymentIncomingId);
await tx.tombstones.put({
@ -737,10 +756,7 @@ export async function deleteTransaction(
if (debit) {
await tx.peerPushPaymentInitiations.delete(pursePub);
await tx.tombstones.put({
id: makeEventId(
TombstoneTag.DeletePeerPushDebit,
pursePub,
),
id: makeEventId(TombstoneTag.DeletePeerPushDebit, pursePub),
});
}
});

View File

@ -56,7 +56,6 @@ import {
WithdrawBatchResponse,
WithdrawResponse,
WithdrawUriInfoResponse,
WithdrawUriResult,
} from "@gnu-taler/taler-util";
import { EddsaKeypair } from "../crypto/cryptoImplementation.js";
import {
@ -68,9 +67,10 @@ import {
DenomSelectionState,
ExchangeDetailsRecord,
ExchangeRecord,
OperationAttemptResult,
OperationAttemptResultType,
OperationStatus,
PlanchetRecord,
ReserveBankInfo,
ReserveRecordStatus,
WalletStoresV1,
WgInfo,
@ -98,7 +98,6 @@ import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "../versions.js";
import { guardOperationException } from "./common.js";
import {
getExchangeDetails,
getExchangePaytoUri,
@ -691,31 +690,12 @@ async function processPlanchetExchangeBatchRequest(
withdrawalGroup.exchangeBaseUrl,
).href;
try {
const resp = await ws.http.postJson(reqUrl, d);
const r = await readSuccessResponseJsonOrThrow(
resp,
codecForWithdrawBatchResponse(),
);
return r;
} catch (e) {
const errDetail = getErrorDetailFromException(e);
logger.trace("withdrawal batch request failed", e);
logger.trace(e);
await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
.runReadWrite(async (tx) => {
let wg = await tx.withdrawalGroups.get(
withdrawalGroup.withdrawalGroupId,
);
if (!wg) {
return;
}
wg.lastError = errDetail;
await tx.withdrawalGroups.put(wg);
});
return;
}
const resp = await ws.http.postJson(reqUrl, d);
const r = await readSuccessResponseJsonOrThrow(
resp,
codecForWithdrawBatchResponse(),
);
return r;
}
async function processPlanchetVerifyAndStoreCoin(
@ -951,50 +931,6 @@ export async function updateWithdrawalDenoms(
}
}
async function setupWithdrawalRetry(
ws: InternalWalletState,
withdrawalGroupId: string,
options: {
reset: boolean;
},
): Promise<void> {
await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
.runReadWrite(async (tx) => {
const wsr = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wsr) {
return;
}
if (options.reset) {
wsr.retryInfo = RetryInfo.reset();
} else {
wsr.retryInfo = RetryInfo.increment(wsr.retryInfo);
}
await tx.withdrawalGroups.put(wsr);
});
}
async function reportWithdrawalError(
ws: InternalWalletState,
withdrawalGroupId: string,
err: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
.runReadWrite(async (tx) => {
const wsr = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wsr) {
return;
}
if (!wsr.retryInfo) {
logger.reportBreak();
}
wsr.lastError = err;
await tx.withdrawalGroups.put(wsr);
});
ws.notify({ type: NotificationType.WithdrawOperationError, error: err });
}
/**
* Update the information about a reserve that is stored in the wallet
* by querying the reserve's exchange.
@ -1071,28 +1007,9 @@ async function queryReserve(
export async function processWithdrawalGroup(
ws: InternalWalletState,
withdrawalGroupId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const onOpErr = (e: TalerErrorDetail): Promise<void> =>
reportWithdrawalError(ws, withdrawalGroupId, e);
await guardOperationException(
() => processWithdrawGroupImpl(ws, withdrawalGroupId, options),
onOpErr,
);
}
async function processWithdrawGroupImpl(
ws: InternalWalletState,
withdrawalGroupId: string,
options: {
forceNow?: boolean;
} = {},
): Promise<void> {
const forceNow = options.forceNow ?? false;
logger.trace("processing withdraw group", withdrawalGroupId);
await setupWithdrawalRetry(ws, withdrawalGroupId, { reset: forceNow });
options: {} = {},
): Promise<OperationAttemptResult> {
logger.trace("processing withdrawal group", withdrawalGroupId);
const withdrawalGroup = await ws.db
.mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))
.runReadOnly(async (tx) => {
@ -1106,24 +1023,44 @@ async function processWithdrawGroupImpl(
switch (withdrawalGroup.reserveStatus) {
case ReserveRecordStatus.RegisteringBank:
await processReserveBankStatus(ws, withdrawalGroupId);
return await processWithdrawGroupImpl(ws, withdrawalGroupId, {
return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,
});
case ReserveRecordStatus.QueryingStatus: {
const res = await queryReserve(ws, withdrawalGroupId);
if (res.ready) {
return await processWithdrawGroupImpl(ws, withdrawalGroupId, {
return await processWithdrawalGroup(ws, withdrawalGroupId, {
forceNow: true,
});
}
return;
return {
type: OperationAttemptResultType.Pending,
result: undefined,
};
}
case ReserveRecordStatus.WaitConfirmBank: {
const res = await processReserveBankStatus(ws, withdrawalGroupId);
switch (res.status) {
case BankStatusResultCode.Aborted:
case BankStatusResultCode.Done:
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
case BankStatusResultCode.Waiting: {
return {
type: OperationAttemptResultType.Pending,
result: undefined,
};
}
}
}
case ReserveRecordStatus.WaitConfirmBank:
await processReserveBankStatus(ws, withdrawalGroupId);
return;
case ReserveRecordStatus.BankAborted:
// FIXME
return;
return {
type: OperationAttemptResultType.Pending,
result: undefined,
};
case ReserveRecordStatus.Dormant:
// We can try to withdraw, nothing needs to be done with the reserve.
break;
@ -1150,11 +1087,12 @@ async function processWithdrawGroupImpl(
return;
}
wg.operationStatus = OperationStatus.Finished;
delete wg.lastError;
delete wg.retryInfo;
await tx.withdrawalGroups.put(wg);
});
return;
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms
@ -1175,7 +1113,7 @@ async function processWithdrawGroupImpl(
if (ws.batchWithdrawal) {
const resp = await processPlanchetExchangeBatchRequest(ws, withdrawalGroup);
if (!resp) {
return;
throw Error("unable to do batch withdrawal");
}
for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {
work.push(
@ -1236,8 +1174,6 @@ async function processWithdrawGroupImpl(
finishedForFirstTime = true;
wg.timestampFinish = TalerProtocolTimestamp.now();
wg.operationStatus = OperationStatus.Finished;
delete wg.lastError;
wg.retryInfo = RetryInfo.reset();
}
await tx.withdrawalGroups.put(wg);
@ -1259,6 +1195,11 @@ async function processWithdrawGroupImpl(
reservePub: withdrawalGroup.reservePub,
});
}
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
}
const AGE_MASK_GROUPS = "8:10:12:14:16:18".split(":").map(n => parseInt(n, 10))
@ -1529,10 +1470,7 @@ async function getWithdrawalGroupRecordTx(
}
export function getReserveRequestTimeout(r: WithdrawalGroupRecord): Duration {
return Duration.max(
{ d_ms: 60000 },
Duration.min({ d_ms: 5000 }, RetryInfo.getDuration(r.retryInfo)),
);
return { d_ms: 60000 };
}
export function getBankStatusUrl(talerWithdrawUri: string): string {
@ -1611,17 +1549,25 @@ async function registerReserveWithBank(
);
r.reserveStatus = ReserveRecordStatus.WaitConfirmBank;
r.operationStatus = OperationStatus.Pending;
r.retryInfo = RetryInfo.reset();
await tx.withdrawalGroups.put(r);
});
ws.notify({ type: NotificationType.ReserveRegisteredWithBank });
return processReserveBankStatus(ws, withdrawalGroupId);
}
enum BankStatusResultCode {
Done = "done",
Waiting = "waiting",
Aborted = "aborted",
}
interface BankStatusResult {
status: BankStatusResultCode;
}
async function processReserveBankStatus(
ws: InternalWalletState,
withdrawalGroupId: string,
): Promise<void> {
): Promise<BankStatusResult> {
const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, {
withdrawalGroupId,
});
@ -1630,17 +1576,21 @@ async function processReserveBankStatus(
case ReserveRecordStatus.RegisteringBank:
break;
default:
return;
return {
status: BankStatusResultCode.Done,
};
}
if (
withdrawalGroup.wgInfo.withdrawalType != WithdrawalRecordType.BankIntegrated
) {
throw Error();
throw Error("wrong withdrawal record type");
}
const bankInfo = withdrawalGroup.wgInfo.bankInfo;
if (!bankInfo) {
return;
return {
status: BankStatusResultCode.Done,
};
}
const bankStatusUrl = getBankStatusUrl(bankInfo.talerWithdrawUri);
@ -1678,10 +1628,11 @@ async function processReserveBankStatus(
r.wgInfo.bankInfo.timestampBankConfirmed = now;
r.reserveStatus = ReserveRecordStatus.BankAborted;
r.operationStatus = OperationStatus.Finished;
r.retryInfo = RetryInfo.reset();
await tx.withdrawalGroups.put(r);
});
return;
return {
status: BankStatusResultCode.Aborted,
};
}
// Bank still needs to know our reserve info
@ -1722,15 +1673,17 @@ async function processReserveBankStatus(
r.wgInfo.bankInfo.timestampBankConfirmed = now;
r.reserveStatus = ReserveRecordStatus.QueryingStatus;
r.operationStatus = OperationStatus.Pending;
r.retryInfo = RetryInfo.reset();
} else {
logger.info("withdrawal: transfer not yet confirmed by bank");
r.wgInfo.bankInfo.confirmUrl = status.confirm_transfer_url;
r.senderWire = status.sender_wire;
r.retryInfo = RetryInfo.increment(r.retryInfo);
}
await tx.withdrawalGroups.put(r);
});
return {
status: BankStatusResultCode.Done,
};
}
export async function internalCreateWithdrawalGroup(
@ -1775,14 +1728,12 @@ export async function internalCreateWithdrawalGroup(
exchangeBaseUrl: canonExchange,
instructedAmount: amount,
timestampStart: now,
lastError: undefined,
operationStatus: OperationStatus.Pending,
rawWithdrawalAmount: initialDenomSel.totalWithdrawCost,
secretSeed,
reservePriv: reserveKeyPair.priv,
reservePub: reserveKeyPair.pub,
reserveStatus: args.reserveStatus,
retryInfo: RetryInfo.reset(),
withdrawalGroupId,
restrictAge: args.restrictAge,
senderWire: undefined,

View File

@ -30,14 +30,12 @@ import {
AbsoluteTime,
TalerProtocolTimestamp,
} from "@gnu-taler/taler-util";
import { ReserveRecordStatus } from "./db.js";
import { RetryInfo } from "./util/retries.js";
export enum PendingTaskType {
ExchangeUpdate = "exchange-update",
ExchangeCheckRefresh = "exchange-check-refresh",
Pay = "pay",
ProposalChoice = "proposal-choice",
ProposalDownload = "proposal-download",
Refresh = "refresh",
Recoup = "recoup",
@ -109,7 +107,7 @@ export interface PendingRefreshTask {
lastError?: TalerErrorDetail;
refreshGroupId: string;
finishedPerCoin: boolean[];
retryInfo: RetryInfo;
retryInfo?: RetryInfo;
}
/**
@ -125,17 +123,6 @@ export interface PendingProposalDownloadTask {
retryInfo?: RetryInfo;
}
/**
* User must choose whether to accept or reject the merchant's
* proposed contract terms.
*/
export interface PendingProposalChoiceOperation {
type: PendingTaskType.ProposalChoice;
merchantBaseUrl: string;
proposalTimestamp: AbsoluteTime;
proposalId: string;
}
/**
* The wallet is picking up a tip that the user has accepted.
*/
@ -165,14 +152,14 @@ export interface PendingPayTask {
export interface PendingRefundQueryTask {
type: PendingTaskType.RefundQuery;
proposalId: string;
retryInfo: RetryInfo;
retryInfo?: RetryInfo;
lastError: TalerErrorDetail | undefined;
}
export interface PendingRecoupTask {
type: PendingTaskType.Recoup;
recoupGroupId: string;
retryInfo: RetryInfo;
retryInfo?: RetryInfo;
lastError: TalerErrorDetail | undefined;
}
@ -205,6 +192,11 @@ export interface PendingTaskInfoCommon {
*/
type: PendingTaskType;
/**
* Unique identifier for the pending task.
*/
id: string;
/**
* Set to true if the operation indicates that something is really in progress,
* as opposed to some regular scheduled operation that can be tried later.

View File

@ -152,6 +152,19 @@ class ResultStream<T> {
return arr;
}
async mapAsync<R>(f: (x: T) => Promise<R>): Promise<R[]> {
const arr: R[] = [];
while (true) {
const x = await this.next();
if (x.hasValue) {
arr.push(await f(x.value));
} else {
break;
}
}
return arr;
}
async forEachAsync(f: (x: T) => Promise<void>): Promise<void> {
while (true) {
const x = await this.next();
@ -572,6 +585,26 @@ function makeWriteContext(
return ctx;
}
const storeList = [
{ name: "foo" as const, value: 1 as const },
{ name: "bar" as const, value: 2 as const },
];
// => { foo: { value: 1}, bar: {value: 2} }
type StoreList = typeof storeList;
type StoreNames = StoreList[number] extends { name: infer I } ? I : never;
type H = StoreList[number] & { name: "foo"};
type Cleanup<V> = V extends { name: infer N, value: infer X} ? {name: N, value: X} : never;
type G = {
[X in StoreNames]: {
X: StoreList[number] & { name: X };
};
};
/**
* Type-safe access to a database with a particular store map.
*
@ -584,6 +617,14 @@ export class DbAccess<StoreMap> {
return this.db;
}
mktx2<
StoreNames extends keyof StoreMap,
Stores extends StoreMap[StoreNames],
StoreList extends Stores[],
>(namePicker: (x: StoreMap) => StoreList): StoreList {
return namePicker(this.stores);
}
mktx<
PickerType extends (x: StoreMap) => unknown,
BoundStores extends GetPickerType<PickerType, StoreMap>,

View File

@ -21,7 +21,29 @@
/**
* Imports.
*/
import { AbsoluteTime, Duration } from "@gnu-taler/taler-util";
import {
AbsoluteTime,
Duration,
TalerErrorDetail,
} from "@gnu-taler/taler-util";
import {
BackupProviderRecord,
DepositGroupRecord,
ExchangeRecord,
OperationAttemptResult,
OperationAttemptResultType,
ProposalRecord,
PurchaseRecord,
RecoupGroupRecord,
RefreshGroupRecord,
TipRecord,
WalletStoresV1,
WithdrawalGroupRecord,
} from "../db.js";
import { TalerError } from "../errors.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { PendingTaskType } from "../pending-types.js";
import { GetReadWriteAccess } from "./query.js";
export interface RetryInfo {
firstTry: AbsoluteTime;
@ -108,3 +130,95 @@ export namespace RetryInfo {
return r2;
}
}
export namespace RetryTags {
export function forWithdrawal(wg: WithdrawalGroupRecord): string {
return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}`;
}
export function forExchangeUpdate(exch: ExchangeRecord): string {
return `${PendingTaskType.ExchangeUpdate}:${exch.baseUrl}`;
}
export function forExchangeCheckRefresh(exch: ExchangeRecord): string {
return `${PendingTaskType.ExchangeCheckRefresh}:${exch.baseUrl}`;
}
export function forProposalClaim(pr: ProposalRecord): string {
return `${PendingTaskType.ProposalDownload}:${pr.proposalId}`;
}
export function forTipPickup(tipRecord: TipRecord): string {
return `${PendingTaskType.TipPickup}:${tipRecord.walletTipId}`;
}
export function forRefresh(refreshGroupRecord: RefreshGroupRecord): string {
return `${PendingTaskType.TipPickup}:${refreshGroupRecord.refreshGroupId}`;
}
export function forPay(purchaseRecord: PurchaseRecord): string {
return `${PendingTaskType.Pay}:${purchaseRecord.proposalId}`;
}
export function forRefundQuery(purchaseRecord: PurchaseRecord): string {
return `${PendingTaskType.RefundQuery}:${purchaseRecord.proposalId}`;
}
export function forRecoup(recoupRecord: RecoupGroupRecord): string {
return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}`;
}
export function forDeposit(depositRecord: DepositGroupRecord): string {
return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}`;
}
export function forBackup(backupRecord: BackupProviderRecord): string {
return `${PendingTaskType.Backup}:${backupRecord.baseUrl}`;
}
}
export async function scheduleRetryInTx(
ws: InternalWalletState,
tx: GetReadWriteAccess<{
operationRetries: typeof WalletStoresV1.operationRetries;
}>,
opId: string,
errorDetail?: TalerErrorDetail,
): Promise<void> {
let retryRecord = await tx.operationRetries.get(opId);
if (!retryRecord) {
retryRecord = {
id: opId,
retryInfo: RetryInfo.reset(),
};
if (errorDetail) {
retryRecord.lastError = errorDetail;
}
} else {
retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
if (errorDetail) {
retryRecord.lastError = errorDetail;
} else {
delete retryRecord.lastError;
}
}
await tx.operationRetries.put(retryRecord);
}
export async function scheduleRetry(
ws: InternalWalletState,
opId: string,
errorDetail?: TalerErrorDetail,
): Promise<void> {
return await ws.db
.mktx((x) => ({ operationRetries: x.operationRetries }))
.runReadWrite(async (tx) => {
scheduleRetryInTx(ws, tx, opId, errorDetail);
});
}
/**
* Run an operation handler, expect a success result and extract the success value.
*/
export async function runOperationHandlerForResult<T>(
res: OperationAttemptResult<T>,
): Promise<T> {
switch (res.type) {
case OperationAttemptResultType.Finished:
return res.result;
case OperationAttemptResultType.Error:
throw TalerError.fromUncheckedDetail(res.errorDetail);
default:
throw Error(`unexpected operation result (${res.type})`);
}
}

View File

@ -90,6 +90,7 @@ import {
ExchangeListItem,
OperationMap,
FeeDescription,
TalerErrorDetail,
} from "@gnu-taler/taler-util";
import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";
import {
@ -101,9 +102,15 @@ import {
CoinSourceType,
exportDb,
importDb,
OperationAttemptResult,
OperationAttemptResultType,
WalletStoresV1,
} from "./db.js";
import { getErrorDetailFromException, TalerError } from "./errors.js";
import {
getErrorDetailFromException,
makeErrorDetail,
TalerError,
} from "./errors.js";
import { createDenominationTimeline } from "./index.browser.js";
import {
DenomInfo,
@ -143,6 +150,7 @@ import {
getExchangeRequestTimeout,
getExchangeTrust,
updateExchangeFromUrl,
updateExchangeFromUrlHandler,
updateExchangeTermsOfService,
} from "./operations/exchanges.js";
import { getMerchantInfo } from "./operations/merchants.js";
@ -162,7 +170,11 @@ import {
initiatePeerToPeerPush,
} from "./operations/peer-to-peer.js";
import { getPendingOperations } from "./operations/pending.js";
import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js";
import {
createRecoupGroup,
processRecoupGroup,
processRecoupGroupHandler,
} from "./operations/recoup.js";
import {
autoRefresh,
createRefreshGroup,
@ -210,6 +222,7 @@ import {
openPromise,
} from "./util/promiseUtils.js";
import { DbAccess, GetReadWriteAccess } from "./util/query.js";
import { RetryInfo, runOperationHandlerForResult } from "./util/retries.js";
import { TimerAPI, TimerGroup } from "./util/timer.js";
import {
WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,
@ -237,7 +250,12 @@ async function getWithdrawalDetailsForAmount(
amount: AmountJson,
restrictAge: number | undefined,
): Promise<ManualWithdrawalDetails> {
const wi = await getExchangeWithdrawalInfo(ws, exchangeBaseUrl, amount, restrictAge);
const wi = await getExchangeWithdrawalInfo(
ws,
exchangeBaseUrl,
amount,
restrictAge,
);
const paytoUris = wi.exchangeDetails.wireInfo.accounts.map(
(x) => x.payto_uri,
);
@ -252,8 +270,108 @@ async function getWithdrawalDetailsForAmount(
};
}
/**
* Call the right handler for a pending operation without doing
* any special error handling.
*/
async function callOperationHandler(
ws: InternalWalletState,
pending: PendingTaskInfo,
forceNow = false,
): Promise<OperationAttemptResult<unknown, unknown>> {
switch (pending.type) {
case PendingTaskType.ExchangeUpdate:
return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl, {
forceNow,
});
case PendingTaskType.Refresh:
return await processRefreshGroup(ws, pending.refreshGroupId, {
forceNow,
});
case PendingTaskType.Withdraw:
await processWithdrawalGroup(ws, pending.withdrawalGroupId, { forceNow });
break;
case PendingTaskType.ProposalDownload:
return await processDownloadProposal(ws, pending.proposalId, {
forceNow,
});
case PendingTaskType.TipPickup:
return await processTip(ws, pending.tipId, { forceNow });
case PendingTaskType.Pay:
return await processPurchasePay(ws, pending.proposalId, { forceNow });
case PendingTaskType.RefundQuery:
return await processPurchaseQueryRefund(ws, pending.proposalId, {
forceNow,
});
case PendingTaskType.Recoup:
return await processRecoupGroupHandler(ws, pending.recoupGroupId, {
forceNow,
});
case PendingTaskType.ExchangeCheckRefresh:
return await autoRefresh(ws, pending.exchangeBaseUrl);
case PendingTaskType.Deposit: {
return await processDepositGroup(ws, pending.depositGroupId, {
forceNow,
});
}
case PendingTaskType.Backup:
return await processBackupForProvider(ws, pending.backupProviderBaseUrl);
default:
return assertUnreachable(pending);
}
throw Error("not reached");
}
export async function storeOperationError(
ws: InternalWalletState,
pendingTaskId: string,
e: TalerErrorDetail,
): Promise<void> {
await ws.db
.mktx((x) => ({ operationRetries: x.operationRetries }))
.runReadWrite(async (tx) => {
const retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
return;
}
retryRecord.lastError = e;
retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
await tx.operationRetries.put(retryRecord);
});
}
export async function storeOperationFinished(
ws: InternalWalletState,
pendingTaskId: string,
): Promise<void> {
await ws.db
.mktx((x) => ({ operationRetries: x.operationRetries }))
.runReadWrite(async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
});
}
export async function storeOperationPending(
ws: InternalWalletState,
pendingTaskId: string,
): Promise<void> {
await ws.db
.mktx((x) => ({ operationRetries: x.operationRetries }))
.runReadWrite(async (tx) => {
const retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
return;
}
delete retryRecord.lastError;
retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
await tx.operationRetries.put(retryRecord);
});
}
/**
* Execute one operation based on the pending operation info record.
*
* Store success/failure result in the database.
*/
async function processOnePendingOperation(
ws: InternalWalletState,
@ -261,47 +379,45 @@ async function processOnePendingOperation(
forceNow = false,
): Promise<void> {
logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`);
switch (pending.type) {
case PendingTaskType.ExchangeUpdate:
await updateExchangeFromUrl(ws, pending.exchangeBaseUrl, {
forceNow,
});
break;
case PendingTaskType.Refresh:
await processRefreshGroup(ws, pending.refreshGroupId, { forceNow });
break;
case PendingTaskType.Withdraw:
await processWithdrawalGroup(ws, pending.withdrawalGroupId, { forceNow });
break;
case PendingTaskType.ProposalDownload:
await processDownloadProposal(ws, pending.proposalId, { forceNow });
break;
case PendingTaskType.TipPickup:
await processTip(ws, pending.tipId, { forceNow });
break;
case PendingTaskType.Pay:
await processPurchasePay(ws, pending.proposalId, { forceNow });
break;
case PendingTaskType.RefundQuery:
await processPurchaseQueryRefund(ws, pending.proposalId, { forceNow });
break;
case PendingTaskType.Recoup:
await processRecoupGroup(ws, pending.recoupGroupId, { forceNow });
break;
case PendingTaskType.ExchangeCheckRefresh:
await autoRefresh(ws, pending.exchangeBaseUrl);
break;
case PendingTaskType.Deposit: {
await processDepositGroup(ws, pending.depositGroupId, {
forceNow,
});
break;
let maybeError: TalerErrorDetail | undefined;
try {
const resp = await callOperationHandler(ws, pending, forceNow);
switch (resp.type) {
case OperationAttemptResultType.Error:
return await storeOperationError(ws, pending.id, resp.errorDetail);
case OperationAttemptResultType.Finished:
return await storeOperationFinished(ws, pending.id);
case OperationAttemptResultType.Pending:
return await storeOperationPending(ws, pending.id);
case OperationAttemptResultType.Longpoll:
break;
}
} catch (e: any) {
if (
e instanceof TalerError &&
e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED)
) {
logger.warn("operation processed resulted in error");
logger.warn(`error was: ${j2s(e.errorDetail)}`);
maybeError = e.errorDetail;
} else {
// This is a bug, as we expect pending operations to always
// do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
// or return something.
logger.error("Uncaught exception", e);
ws.notify({
type: NotificationType.InternalError,
message: "uncaught exception",
exception: e,
});
maybeError = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
{
stack: e.stack,
},
`unexpected exception (message: ${e.message})`,
);
}
case PendingTaskType.Backup:
await processBackupForProvider(ws, pending.backupProviderBaseUrl);
break;
default:
assertUnreachable(pending);
}
}
@ -317,18 +433,7 @@ export async function runPending(
if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
try {
await processOnePendingOperation(ws, p, forceNow);
} catch (e) {
if (e instanceof TalerError) {
console.error(
"Pending operation failed:",
JSON.stringify(e.errorDetail, undefined, 2),
);
} else {
console.error(e);
}
}
await processOnePendingOperation(ws, p, forceNow);
}
}
@ -420,27 +525,7 @@ async function runTaskLoop(
if (!AbsoluteTime.isExpired(p.timestampDue)) {
continue;
}
try {
await processOnePendingOperation(ws, p);
} catch (e) {
if (
e instanceof TalerError &&
e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED)
) {
logger.warn("operation processed resulted in error");
logger.warn(`error was: ${j2s(e.errorDetail)}`);
} else {
// This is a bug, as we expect pending operations to always
// do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
// or return something.
logger.error("Uncaught exception", e);
ws.notify({
type: NotificationType.InternalError,
message: "uncaught exception",
exception: e,
});
}
}
await processOnePendingOperation(ws, p);
ws.notify({
type: NotificationType.PendingOperationProcessed,
});
@ -629,7 +714,7 @@ async function getExchangeDetailedInfo(
denominations: x.denominations,
}))
.runReadOnly(async (tx) => {
const ex = await tx.exchanges.get(exchangeBaseurl)
const ex = await tx.exchanges.get(exchangeBaseurl);
const dp = ex?.detailsPointer;
if (!dp) {
return;
@ -663,11 +748,11 @@ async function getExchangeDetailedInfo(
wireInfo: exchangeDetails.wireInfo,
},
denominations: denominations,
}
};
});
if (!exchange) {
throw Error(`exchange with base url "${exchangeBaseurl}" not found`)
throw Error(`exchange with base url "${exchangeBaseurl}" not found`);
}
const feesDescription: OperationMap<FeeDescription[]> = {
@ -809,6 +894,7 @@ declare const __GIT_HASH__: string;
const VERSION = typeof __VERSION__ !== "undefined" ? __VERSION__ : "dev";
const GIT_HASH = typeof __GIT_HASH__ !== "undefined" ? __GIT_HASH__ : undefined;
/**
* Implementation of the "wallet-core" API.
*/
@ -908,7 +994,7 @@ async function dispatchRequestInternal(
ws,
req.exchangeBaseUrl,
Amounts.parseOrThrow(req.amount),
req.restrictAge
req.restrictAge,
);
}
case "getBalances": {
@ -1106,7 +1192,7 @@ async function dispatchRequestInternal(
ws,
req.exchange,
amount,
undefined
undefined,
);
const wres = await createManualWithdrawal(ws, {
amount: amount,