diff options
| author | Florian Dold <florian@dold.me> | 2022-09-05 18:12:30 +0200 | 
|---|---|---|
| committer | Florian Dold <florian@dold.me> | 2022-09-13 16:10:41 +0200 | 
| commit | 13e7a674778754c0ed641dfd428e3d6b2b71ab2d (patch) | |
| tree | f2a0e5029305a9b818416fd94908ef77cdd7446f /packages/taler-wallet-core/src | |
| parent | f9f2911c761af1c8ed1c323dcd414cbaa9eeae7c (diff) | |
wallet-core: uniform retry handling
Diffstat (limited to 'packages/taler-wallet-core/src')
17 files changed, 969 insertions, 1230 deletions
| diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index 9d41f2114..1052e302d 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -361,14 +361,14 @@ export interface ExchangeDetailsRecord {     * Terms of service text or undefined if not downloaded yet.     *     * This is just used as a cache of the last downloaded ToS. -   *  +   *     * FIXME:  Put in separate object store!     */    termsOfServiceText: string | undefined;    /**     * content-type of the last downloaded termsOfServiceText. -   *  +   *     * * FIXME:  Put in separate object store!     */    termsOfServiceContentType: string | undefined; @@ -455,17 +455,6 @@ export interface ExchangeRecord {    nextRefreshCheck: TalerProtocolTimestamp;    /** -   * Last error (if any) for fetching updated information about the -   * exchange. -   */ -  lastError?: TalerErrorDetail; - -  /** -   * Retry status for fetching updated information about the exchange. -   */ -  retryInfo?: RetryInfo; - -  /**     * Public key of the reserve that we're currently using for     * receiving P2P payments.     */ @@ -734,24 +723,12 @@ export interface ProposalRecord {     * Session ID we got when downloading the contract.     */    downloadSessionId?: string; - -  /** -   * Retry info, even present when the operation isn't active to allow indexing -   * on the next retry timestamp. -   * -   * FIXME: Clarify what we even retry. -   */ -  retryInfo?: RetryInfo; - -  lastError: TalerErrorDetail | undefined;  }  /**   * Status of a tip we got from a merchant.   */  export interface TipRecord { -  lastError: TalerErrorDetail | undefined; -    /**     * Has the user accepted the tip?  Only after the tip has been accepted coins     * withdrawn from the tip may be used. @@ -810,12 +787,6 @@ export interface TipRecord {     * from the merchant.     */    pickedUpTimestamp: TalerProtocolTimestamp | undefined; - -  /** -   * Retry info, even present when the operation isn't active to allow indexing -   * on the next retry timestamp. -   */ -  retryInfo: RetryInfo;  }  export enum RefreshCoinStatus { @@ -837,16 +808,7 @@ export enum OperationStatus {  export interface RefreshGroupRecord {    operationStatus: OperationStatus; -  /** -   * Retry info, even present when the operation isn't active to allow indexing -   * on the next retry timestamp. -   * -   * FIXME: No, this can be optional, indexing is still possible -   */ -  retryInfo: RetryInfo; - -  lastError: TalerErrorDetail | undefined; - +  // FIXME: Put this into a different object store?    lastErrorPerCoin: { [coinIndex: number]: TalerErrorDetail };    /** @@ -1117,6 +1079,8 @@ export interface PurchaseRecord {    /**     * Pending refunds for the purchase.  A refund is pending     * when the merchant reports a transient error from the exchange. +   *  +   * FIXME: Put this into a separate object store?     */    refunds: { [refundKey: string]: WalletRefundItem }; @@ -1132,6 +1096,7 @@ export interface PurchaseRecord {    lastSessionId: string | undefined;    /** +   * Do we still need to post the deposit permissions to the merchant?     * Set for the first payment, or on re-plays.     */    paymentSubmitPending: boolean; @@ -1142,22 +1107,6 @@ export interface PurchaseRecord {     */    refundQueryRequested: boolean; -  abortStatus: AbortStatus; - -  payRetryInfo?: RetryInfo; - -  lastPayError: TalerErrorDetail | undefined; - -  /** -   * Retry information for querying the refund status with the merchant. -   */ -  refundStatusRetryInfo: RetryInfo; - -  /** -   * Last error (or undefined) for querying the refund status with the merchant. -   */ -  lastRefundStatusError: TalerErrorDetail | undefined; -    /**     * Continue querying the refund status until this deadline has expired.     */ @@ -1174,6 +1123,11 @@ export interface PurchaseRecord {     * an error where it doesn't make sense to retry.     */    payFrozen?: boolean; + +  /** +   * FIXME: How does this interact with payFrozen? +   */ +  abortStatus: AbortStatus;  }  export const WALLET_BACKUP_STATE_KEY = "walletBackupState"; @@ -1184,9 +1138,9 @@ export const WALLET_BACKUP_STATE_KEY = "walletBackupState";   */  export type ConfigRecord =    | { -    key: typeof WALLET_BACKUP_STATE_KEY; -    value: WalletBackupConfState; -  } +      key: typeof WALLET_BACKUP_STATE_KEY; +      value: WalletBackupConfState; +    }    | { key: "currencyDefaultsApplied"; value: boolean };  export interface WalletBackupConfState { @@ -1368,13 +1322,6 @@ export interface WithdrawalGroupRecord {     * FIXME: Should this not also include a timestamp for more logical merging?     */    denomSelUid: string; - -  /** -   * Retry info. -   */ -  retryInfo?: RetryInfo; - -  lastError: TalerErrorDetail | undefined;  }  export interface BankWithdrawUriRecord { @@ -1432,16 +1379,6 @@ export interface RecoupGroupRecord {     * after all individual recoups are done.     */    scheduleRefreshCoins: string[]; - -  /** -   * Retry info. -   */ -  retryInfo: RetryInfo; - -  /** -   * Last error that occurred, if any. -   */ -  lastError: TalerErrorDetail | undefined;  }  export enum BackupProviderStateTag { @@ -1452,17 +1389,15 @@ export enum BackupProviderStateTag {  export type BackupProviderState =    | { -    tag: BackupProviderStateTag.Provisional; -  } +      tag: BackupProviderStateTag.Provisional; +    }    | { -    tag: BackupProviderStateTag.Ready; -    nextBackupTimestamp: TalerProtocolTimestamp; -  } +      tag: BackupProviderStateTag.Ready; +      nextBackupTimestamp: TalerProtocolTimestamp; +    }    | { -    tag: BackupProviderStateTag.Retrying; -    retryInfo: RetryInfo; -    lastError?: TalerErrorDetail; -  }; +      tag: BackupProviderStateTag.Retrying; +    };  export interface BackupProviderTerms {    supportedProtocolVersion: string; @@ -1573,13 +1508,6 @@ export interface DepositGroupRecord {    timestampFinished: TalerProtocolTimestamp | undefined;    operationStatus: OperationStatus; - -  lastError: TalerErrorDetail | undefined; - -  /** -   * Retry info. -   */ -  retryInfo?: RetryInfo;  }  /** @@ -1749,6 +1677,60 @@ export interface ReserveRecord {    reservePriv: string;  } +export interface OperationRetryRecord { +  /** +   * Unique identifier for the operation.  Typically of +   * the format `${opType}-${opUniqueKey}` +   */ +  id: string; + +  lastError?: TalerErrorDetail; + +  retryInfo: RetryInfo; +} + +export enum OperationAttemptResultType { +  Finished = "finished", +  Pending = "pending", +  Error = "error", +  Longpoll = "longpoll", +} + +// FIXME: not part of DB! +export type OperationAttemptResult<TSuccess = unknown, TPending = unknown> = +  | OperationAttemptFinishedResult<TSuccess> +  | OperationAttemptErrorResult +  | OperationAttemptLongpollResult +  | OperationAttemptPendingResult<TPending>; + +export namespace OperationAttemptResult { +  export function finishedEmpty(): OperationAttemptResult<unknown, unknown> { +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    }; +  } +} + +export interface OperationAttemptFinishedResult<T> { +  type: OperationAttemptResultType.Finished; +  result: T; +} + +export interface OperationAttemptPendingResult<T> { +  type: OperationAttemptResultType.Pending; +  result: T; +} + +export interface OperationAttemptErrorResult { +  type: OperationAttemptResultType.Error; +  errorDetail: TalerErrorDetail; +} + +export interface OperationAttemptLongpollResult { +  type: OperationAttemptResultType.Longpoll; +} +  export const WalletStoresV1 = {    coins: describeStore(      describeContents<CoinRecord>("coins", { @@ -1913,6 +1895,12 @@ export const WalletStoresV1 = {      describeContents<TombstoneRecord>("tombstones", { keyPath: "id" }),      {},    ), +  operationRetries: describeStore( +    describeContents<OperationRetryRecord>("operationRetries", { +      keyPath: "id", +    }), +    {}, +  ),    ghostDepositGroups: describeStore(      describeContents<GhostDepositGroupRecord>("ghostDepositGroups", {        keyPath: "contractTermsHash", diff --git a/packages/taler-wallet-core/src/operations/backup/import.ts b/packages/taler-wallet-core/src/operations/backup/import.ts index ff7ff0d03..e8683265b 100644 --- a/packages/taler-wallet-core/src/operations/backup/import.ts +++ b/packages/taler-wallet-core/src/operations/backup/import.ts @@ -274,7 +274,6 @@ export async function importBackup(              protocolVersionRange: backupExchange.protocol_version_range,            },            permanent: true, -          retryInfo: RetryInfo.reset(),            lastUpdate: undefined,            nextUpdate: TalerProtocolTimestamp.now(),            nextRefreshCheck: TalerProtocolTimestamp.now(), @@ -341,7 +340,7 @@ export async function importBackup(            }            const denomPubHash =              cryptoComp.rsaDenomPubToHash[ -            backupDenomination.denom_pub.rsa_public_key +              backupDenomination.denom_pub.rsa_public_key              ];            checkLogicInvariant(!!denomPubHash);            const existingDenom = await tx.denominations.get([ @@ -426,7 +425,6 @@ export async function importBackup(            }          } -          // FIXME: import reserves with new schema          // for (const backupReserve of backupExchangeDetails.reserves) { @@ -517,7 +515,6 @@ export async function importBackup(          //     }          //   }          // } -        }        for (const backupProposal of backupBlob.proposals) { @@ -560,7 +557,7 @@ export async function importBackup(              const amount = Amounts.parseOrThrow(parsedContractTerms.amount);              const contractTermsHash =                cryptoComp.proposalIdToContractTermsHash[ -              backupProposal.proposal_id +                backupProposal.proposal_id                ];              let maxWireFee: AmountJson;              if (parsedContractTerms.max_wire_fee) { @@ -611,7 +608,6 @@ export async function importBackup(            }            await tx.proposals.put({              claimToken: backupProposal.claim_token, -            lastError: undefined,              merchantBaseUrl: backupProposal.merchant_base_url,              timestamp: backupProposal.timestamp,              orderId: backupProposal.order_id, @@ -620,7 +616,6 @@ export async function importBackup(                cryptoComp.proposalNoncePrivToPub[backupProposal.nonce_priv],              proposalId: backupProposal.proposal_id,              repurchaseProposalId: backupProposal.repurchase_proposal_id, -            retryInfo: RetryInfo.reset(),              download,              proposalStatus,            }); @@ -706,7 +701,7 @@ export async function importBackup(            const amount = Amounts.parseOrThrow(parsedContractTerms.amount);            const contractTermsHash =              cryptoComp.proposalIdToContractTermsHash[ -            backupPurchase.proposal_id +              backupPurchase.proposal_id              ];            let maxWireFee: AmountJson;            if (parsedContractTerms.max_wire_fee) { @@ -755,10 +750,7 @@ export async function importBackup(              noncePriv: backupPurchase.nonce_priv,              noncePub:                cryptoComp.proposalNoncePrivToPub[backupPurchase.nonce_priv], -            lastPayError: undefined,              autoRefundDeadline: TalerProtocolTimestamp.never(), -            refundStatusRetryInfo: RetryInfo.reset(), -            lastRefundStatusError: undefined,              refundAwaiting: undefined,              timestampAccept: backupPurchase.timestamp_accept,              timestampFirstSuccessfulPay: @@ -767,8 +759,6 @@ export async function importBackup(              merchantPaySig: backupPurchase.merchant_pay_sig,              lastSessionId: undefined,              abortStatus, -            // FIXME! -            payRetryInfo: RetryInfo.reset(),              download,              paymentSubmitPending:                !backupPurchase.timestamp_first_successful_pay, @@ -851,7 +841,6 @@ export async function importBackup(              timestampCreated: backupRefreshGroup.timestamp_created,              refreshGroupId: backupRefreshGroup.refresh_group_id,              reason, -            lastError: undefined,              lastErrorPerCoin: {},              oldCoinPubs: backupRefreshGroup.old_coins.map((x) => x.coin_pub),              statusPerCoin: backupRefreshGroup.old_coins.map((x) => @@ -869,7 +858,6 @@ export async function importBackup(                Amounts.parseOrThrow(x.estimated_output_amount),              ),              refreshSessionPerCoin, -            retryInfo: RetryInfo.reset(),            });          }        } @@ -891,11 +879,9 @@ export async function importBackup(              createdTimestamp: backupTip.timestamp_created,              denomsSel,              exchangeBaseUrl: backupTip.exchange_base_url, -            lastError: undefined,              merchantBaseUrl: backupTip.exchange_base_url,              merchantTipId: backupTip.merchant_tip_id,              pickedUpTimestamp: backupTip.timestamp_finished, -            retryInfo: RetryInfo.reset(),              secretSeed: backupTip.secret_seed,              tipAmountEffective: denomsSel.totalCoinValue,              tipAmountRaw: Amounts.parseOrThrow(backupTip.tip_amount_raw), diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts index 45b8491df..56871104c 100644 --- a/packages/taler-wallet-core/src/operations/backup/index.ts +++ b/packages/taler-wallet-core/src/operations/backup/index.ts @@ -25,9 +25,12 @@   * Imports.   */  import { -  AbsoluteTime, AmountString, +  AbsoluteTime, +  AmountString,    BackupRecovery, -  buildCodecForObject, bytesToString, canonicalizeBaseUrl, +  buildCodecForObject, +  bytesToString, +  canonicalizeBaseUrl,    canonicalJson,    Codec,    codecForAmountString, @@ -36,19 +39,32 @@ import {    codecForNumber,    codecForString,    codecOptional, -  ConfirmPayResultType, decodeCrock, DenomKeyType, -  durationFromSpec, eddsaGetPublic, +  ConfirmPayResultType, +  decodeCrock, +  DenomKeyType, +  durationFromSpec, +  eddsaGetPublic,    EddsaKeyPair,    encodeCrock,    getRandomBytes, -  hash, hashDenomPub, +  hash, +  hashDenomPub,    HttpStatusCode, -  j2s, kdf, Logger, +  j2s, +  kdf, +  Logger,    notEmpty,    PreparePayResultType,    RecoveryLoadRequest, -  RecoveryMergeStrategy, rsaBlind, secretbox, secretbox_open, stringToBytes, TalerErrorDetail, TalerProtocolTimestamp, URL, -  WalletBackupContentV1 +  RecoveryMergeStrategy, +  rsaBlind, +  secretbox, +  secretbox_open, +  stringToBytes, +  TalerErrorDetail, +  TalerProtocolTimestamp, +  URL, +  WalletBackupContentV1,  } from "@gnu-taler/taler-util";  import { gunzipSync, gzipSync } from "fflate";  import { TalerCryptoInterface } from "../../crypto/cryptoImplementation.js"; @@ -58,26 +74,28 @@ import {    BackupProviderStateTag,    BackupProviderTerms,    ConfigRecord, +  OperationAttemptResult, +  OperationAttemptResultType,    WalletBackupConfState,    WalletStoresV1, -  WALLET_BACKUP_STATE_KEY +  WALLET_BACKUP_STATE_KEY,  } from "../../db.js";  import { InternalWalletState } from "../../internal-wallet-state.js";  import {    readSuccessResponseJsonOrThrow, -  readTalerErrorResponse +  readTalerErrorResponse,  } from "../../util/http.js";  import {    checkDbInvariant, -  checkLogicInvariant +  checkLogicInvariant,  } from "../../util/invariants.js";  import { GetReadWriteAccess } from "../../util/query.js"; -import { RetryInfo } from "../../util/retries.js"; +import { RetryInfo, RetryTags, scheduleRetryInTx } from "../../util/retries.js";  import { guardOperationException } from "../common.js";  import {    checkPaymentByProposalId,    confirmPay, -  preparePayForUri +  preparePayForUri,  } from "../pay.js";  import { exportBackup } from "./export.js";  import { BackupCryptoPrecomputedData, importBackup } from "./import.js"; @@ -244,8 +262,7 @@ function getNextBackupTimestamp(): TalerProtocolTimestamp {  async function runBackupCycleForProvider(    ws: InternalWalletState,    args: BackupForProviderArgs, -): Promise<void> { - +): Promise<OperationAttemptResult> {    const provider = await ws.db      .mktx((x) => ({ backupProviders: x.backupProviders }))      .runReadOnly(async (tx) => { @@ -254,7 +271,10 @@ async function runBackupCycleForProvider(    if (!provider) {      logger.warn("provider disappeared"); -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    const backupJson = await exportBackup(ws); @@ -292,8 +312,8 @@ async function runBackupCycleForProvider(        "if-none-match": newHash,        ...(provider.lastBackupHash          ? { -          "if-match": provider.lastBackupHash, -        } +            "if-match": provider.lastBackupHash, +          }          : {}),      },    }); @@ -315,7 +335,10 @@ async function runBackupCycleForProvider(          };          await tx.backupProvider.put(prov);        }); -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    if (resp.status === HttpStatusCode.PaymentRequired) { @@ -344,7 +367,10 @@ async function runBackupCycleForProvider(      // FIXME: check if the provider is overcharging us!      await ws.db -      .mktx((x) => ({ backupProviders: x.backupProviders })) +      .mktx((x) => ({ +        backupProviders: x.backupProviders, +        operationRetries: x.operationRetries, +      }))        .runReadWrite(async (tx) => {          const provRec = await tx.backupProviders.get(provider.baseUrl);          checkDbInvariant(!!provRec); @@ -354,11 +380,8 @@ async function runBackupCycleForProvider(          provRec.currentPaymentProposalId = proposalId;          // FIXME: allocate error code for this!          await tx.backupProviders.put(provRec); -        await incrementBackupRetryInTx( -          tx, -          args.backupProviderBaseUrl, -          undefined, -        ); +        const opId = RetryTags.forBackup(provRec); +        await scheduleRetryInTx(ws, tx, opId);        });      if (doPay) { @@ -371,12 +394,15 @@ async function runBackupCycleForProvider(      }      if (args.retryAfterPayment) { -      await runBackupCycleForProvider(ws, { +      return await runBackupCycleForProvider(ws, {          ...args,          retryAfterPayment: false,        });      } -    return; +    return { +      type: OperationAttemptResultType.Pending, +      result: undefined, +    };    }    if (resp.status === HttpStatusCode.NoContent) { @@ -395,7 +421,10 @@ async function runBackupCycleForProvider(          };          await tx.backupProviders.put(prov);        }); -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    if (resp.status === HttpStatusCode.Conflict) { @@ -406,7 +435,10 @@ async function runBackupCycleForProvider(      const cryptoData = await computeBackupCryptoData(ws.cryptoApi, blob);      await importBackup(ws, blob, cryptoData);      await ws.db -      .mktx((x) => ({ backupProvider: x.backupProviders })) +      .mktx((x) => ({ +        backupProvider: x.backupProviders, +        operationRetries: x.operationRetries, +      }))        .runReadWrite(async (tx) => {          const prov = await tx.backupProvider.get(provider.baseUrl);          if (!prov) { @@ -414,20 +446,21 @@ async function runBackupCycleForProvider(            return;          }          prov.lastBackupHash = encodeCrock(hash(backupEnc)); -        // FIXME:  Allocate error code for this situation? +        // FIXME: Allocate error code for this situation? +        // FIXME: Add operation retry record! +        const opId = RetryTags.forBackup(prov); +        await scheduleRetryInTx(ws, tx, opId);          prov.state = {            tag: BackupProviderStateTag.Retrying, -          retryInfo: RetryInfo.reset(),          };          await tx.backupProvider.put(prov);        });      logger.info("processed existing backup");      // Now upload our own, merged backup. -    await runBackupCycleForProvider(ws, { +    return await runBackupCycleForProvider(ws, {        ...args,        retryAfterPayment: false,      }); -    return;    }    // Some other response that we did not expect! @@ -436,53 +469,16 @@ async function runBackupCycleForProvider(    const err = await readTalerErrorResponse(resp);    logger.error(`got error response from backup provider: ${j2s(err)}`); -  await ws.db -    .mktx((x) => ({ backupProviders: x.backupProviders })) -    .runReadWrite(async (tx) => { -      incrementBackupRetryInTx(tx, args.backupProviderBaseUrl, err); -    }); -} - -async function incrementBackupRetryInTx( -  tx: GetReadWriteAccess<{ -    backupProviders: typeof WalletStoresV1.backupProviders; -  }>, -  backupProviderBaseUrl: string, -  err: TalerErrorDetail | undefined, -): Promise<void> { -  const pr = await tx.backupProviders.get(backupProviderBaseUrl); -  if (!pr) { -    return; -  } -  if (pr.state.tag === BackupProviderStateTag.Retrying) { -    pr.state.lastError = err; -    pr.state.retryInfo = RetryInfo.increment(pr.state.retryInfo); -  } else if (pr.state.tag === BackupProviderStateTag.Ready) { -    pr.state = { -      tag: BackupProviderStateTag.Retrying, -      retryInfo: RetryInfo.reset(), -      lastError: err, -    }; -  } -  await tx.backupProviders.put(pr); -} - -async function incrementBackupRetry( -  ws: InternalWalletState, -  backupProviderBaseUrl: string, -  err: TalerErrorDetail | undefined, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ backupProviders: x.backupProviders })) -    .runReadWrite(async (tx) => -      incrementBackupRetryInTx(tx, backupProviderBaseUrl, err), -    ); +  return { +    type: OperationAttemptResultType.Error, +    errorDetail: err, +  };  }  export async function processBackupForProvider(    ws: InternalWalletState,    backupProviderBaseUrl: string, -): Promise<void> { +): Promise<OperationAttemptResult> {    const provider = await ws.db      .mktx((x) => ({ backupProviders: x.backupProviders }))      .runReadOnly(async (tx) => { @@ -492,17 +488,10 @@ export async function processBackupForProvider(      throw Error("unknown backup provider");    } -  const onOpErr = (err: TalerErrorDetail): Promise<void> => -    incrementBackupRetry(ws, backupProviderBaseUrl, err); - -  const run = async () => { -    await runBackupCycleForProvider(ws, { -      backupProviderBaseUrl: provider.baseUrl, -      retryAfterPayment: true, -    }); -  }; - -  await guardOperationException(run, onOpErr); +  return await runBackupCycleForProvider(ws, { +    backupProviderBaseUrl: provider.baseUrl, +    retryAfterPayment: true, +  });  }  export interface RemoveBackupProviderRequest { @@ -818,24 +807,34 @@ export async function getBackupInfo(  ): Promise<BackupInfo> {    const backupConfig = await provideBackupState(ws);    const providerRecords = await ws.db -    .mktx((x) => ({ backupProviders: x.backupProviders })) +    .mktx((x) => ({ +      backupProviders: x.backupProviders, +      operationRetries: x.operationRetries, +    }))      .runReadOnly(async (tx) => { -      return await tx.backupProviders.iter().toArray(); +      return await tx.backupProviders.iter().mapAsync(async (bp) => { +        const opId = RetryTags.forBackup(bp); +        const retryRecord = await tx.operationRetries.get(opId); +        return { +          provider: bp, +          retryRecord, +        }; +      });      });    const providers: ProviderInfo[] = [];    for (const x of providerRecords) {      providers.push({ -      active: x.state.tag !== BackupProviderStateTag.Provisional, -      syncProviderBaseUrl: x.baseUrl, -      lastSuccessfulBackupTimestamp: x.lastBackupCycleTimestamp, -      paymentProposalIds: x.paymentProposalIds, +      active: x.provider.state.tag !== BackupProviderStateTag.Provisional, +      syncProviderBaseUrl: x.provider.baseUrl, +      lastSuccessfulBackupTimestamp: x.provider.lastBackupCycleTimestamp, +      paymentProposalIds: x.provider.paymentProposalIds,        lastError: -        x.state.tag === BackupProviderStateTag.Retrying -          ? x.state.lastError +        x.provider.state.tag === BackupProviderStateTag.Retrying +          ? x.retryRecord?.lastError            : undefined, -      paymentStatus: await getProviderPaymentInfo(ws, x), -      terms: x.terms, -      name: x.name, +      paymentStatus: await getProviderPaymentInfo(ws, x.provider), +      terms: x.provider.terms, +      name: x.provider.name,      });    }    return { diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts index 734bc4c2b..6eed12a38 100644 --- a/packages/taler-wallet-core/src/operations/deposits.ts +++ b/packages/taler-wallet-core/src/operations/deposits.ts @@ -44,7 +44,12 @@ import {    TrackDepositGroupResponse,    URL,  } from "@gnu-taler/taler-util"; -import { DepositGroupRecord, OperationStatus } from "../db.js"; +import { +  DepositGroupRecord, +  OperationAttemptErrorResult, +  OperationAttemptResult, +  OperationStatus, +} from "../db.js";  import { InternalWalletState } from "../internal-wallet-state.js";  import { selectPayCoins } from "../util/coinSelection.js";  import { readSuccessResponseJsonOrThrow } from "../util/http.js"; @@ -67,91 +72,16 @@ import { getTotalRefreshCost } from "./refresh.js";  const logger = new Logger("deposits.ts");  /** - * Set up the retry timeout for a deposit group. - */ -async function setupDepositGroupRetry( -  ws: InternalWalletState, -  depositGroupId: string, -  options: { -    resetRetry: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      depositGroups: x.depositGroups, -    })) -    .runReadWrite(async (tx) => { -      const x = await tx.depositGroups.get(depositGroupId); -      if (!x) { -        return; -      } -      if (options.resetRetry) { -        x.retryInfo = RetryInfo.reset(); -      } else { -        x.retryInfo = RetryInfo.increment(x.retryInfo); -      } -      delete x.lastError; -      await tx.depositGroups.put(x); -    }); -} - -/** - * Report an error that occurred while processing the deposit group. - */ -async function reportDepositGroupError( -  ws: InternalWalletState, -  depositGroupId: string, -  err: TalerErrorDetail, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ depositGroups: x.depositGroups })) -    .runReadWrite(async (tx) => { -      const r = await tx.depositGroups.get(depositGroupId); -      if (!r) { -        return; -      } -      if (!r.retryInfo) { -        logger.error( -          `deposit group record (${depositGroupId}) reports error, but no retry active`, -        ); -        return; -      } -      r.lastError = err; -      await tx.depositGroups.put(r); -    }); -  ws.notify({ type: NotificationType.DepositOperationError, error: err }); -} - -export async function processDepositGroup( -  ws: InternalWalletState, -  depositGroupId: string, -  options: { -    forceNow?: boolean; -    cancellationToken?: CancellationToken; -  } = {}, -): Promise<void> { -  const onOpErr = (err: TalerErrorDetail): Promise<void> => -    reportDepositGroupError(ws, depositGroupId, err); -  return await guardOperationException( -    async () => await processDepositGroupImpl(ws, depositGroupId, options), -    onOpErr, -  ); -} - -/**   * @see {processDepositGroup}   */ -async function processDepositGroupImpl( +export async function processDepositGroup(    ws: InternalWalletState,    depositGroupId: string,    options: {      forceNow?: boolean;      cancellationToken?: CancellationToken;    } = {}, -): Promise<void> { -  const forceNow = options.forceNow ?? false; -  await setupDepositGroupRetry(ws, depositGroupId, { resetRetry: forceNow }); - +): Promise<OperationAttemptResult> {    const depositGroup = await ws.db      .mktx((x) => ({        depositGroups: x.depositGroups, @@ -161,11 +91,11 @@ async function processDepositGroupImpl(      });    if (!depositGroup) {      logger.warn(`deposit group ${depositGroupId} not found`); -    return; +    return OperationAttemptResult.finishedEmpty();    }    if (depositGroup.timestampFinished) {      logger.trace(`deposit group ${depositGroupId} already finished`); -    return; +    return OperationAttemptResult.finishedEmpty();    }    const contractData = extractContractData( @@ -240,11 +170,10 @@ async function processDepositGroupImpl(        if (allDeposited) {          dg.timestampFinished = TalerProtocolTimestamp.now();          dg.operationStatus = OperationStatus.Finished; -        delete dg.lastError; -        delete dg.retryInfo;          await tx.depositGroups.put(dg);        }      }); +  return OperationAttemptResult.finishedEmpty();  }  export async function trackDepositGroup( @@ -338,9 +267,9 @@ export async function getFeeForDeposit(    const csr: CoinSelectionRequest = {      allowedAuditors: [], -    allowedExchanges: Object.values(exchangeInfos).map(v => ({ +    allowedExchanges: Object.values(exchangeInfos).map((v) => ({        exchangeBaseUrl: v.url, -      exchangePub: v.master_pub +      exchangePub: v.master_pub,      })),      amount: Amounts.parseOrThrow(req.amount),      maxDepositFee: Amounts.parseOrThrow(req.amount), @@ -383,7 +312,6 @@ export async function prepareDepositGroup(    }    const amount = Amounts.parseOrThrow(req.amount); -    const exchangeInfos: { url: string; master_pub: string }[] = [];    await ws.db @@ -473,7 +401,7 @@ export async function prepareDepositGroup(      payCoinSel,    ); -  return { totalDepositCost, effectiveDepositAmount } +  return { totalDepositCost, effectiveDepositAmount };  }  export async function createDepositGroup(    ws: InternalWalletState, @@ -600,9 +528,7 @@ export async function createDepositGroup(        payto_uri: req.depositPaytoUri,        salt: wireSalt,      }, -    retryInfo: RetryInfo.reset(),      operationStatus: OperationStatus.Pending, -    lastError: undefined,    };    await ws.db diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts index b75bdfd74..1021da6b6 100644 --- a/packages/taler-wallet-core/src/operations/exchanges.ts +++ b/packages/taler-wallet-core/src/operations/exchanges.ts @@ -53,6 +53,8 @@ import {    DenominationVerificationStatus,    ExchangeDetailsRecord,    ExchangeRecord, +  OperationAttemptResult, +  OperationAttemptResultType,    WalletStoresV1,  } from "../db.js";  import { TalerError } from "../errors.js"; @@ -64,7 +66,7 @@ import {    readSuccessResponseTextOrThrow,  } from "../util/http.js";  import { DbAccess, GetReadOnlyAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; +import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";  import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js";  import { guardOperationException } from "./common.js"; @@ -102,51 +104,6 @@ function denominationRecordFromKeys(    return d;  } -async function reportExchangeUpdateError( -  ws: InternalWalletState, -  baseUrl: string, -  err: TalerErrorDetail, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ exchanges: x.exchanges })) -    .runReadWrite(async (tx) => { -      const exchange = await tx.exchanges.get(baseUrl); -      if (!exchange) { -        return; -      } -      if (!exchange.retryInfo) { -        logger.reportBreak(); -      } -      exchange.lastError = err; -      await tx.exchanges.put(exchange); -    }); -  ws.notify({ type: NotificationType.ExchangeOperationError, error: err }); -} - -async function setupExchangeUpdateRetry( -  ws: InternalWalletState, -  baseUrl: string, -  options: { -    reset: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ exchanges: x.exchanges })) -    .runReadWrite(async (tx) => { -      const exchange = await tx.exchanges.get(baseUrl); -      if (!exchange) { -        return; -      } -      if (options.reset) { -        exchange.retryInfo = RetryInfo.reset(); -      } else { -        exchange.retryInfo = RetryInfo.increment(exchange.retryInfo); -      } -      delete exchange.lastError; -      await tx.exchanges.put(exchange); -    }); -} -  export function getExchangeRequestTimeout(): Duration {    return Duration.fromSpec({      seconds: 5, @@ -360,25 +317,6 @@ async function downloadExchangeWireInfo(    return wireInfo;  } -export async function updateExchangeFromUrl( -  ws: InternalWalletState, -  baseUrl: string, -  options: { -    forceNow?: boolean; -    cancellationToken?: CancellationToken; -  } = {}, -): Promise<{ -  exchange: ExchangeRecord; -  exchangeDetails: ExchangeDetailsRecord; -}> { -  const onOpErr = (e: TalerErrorDetail): Promise<void> => -    reportExchangeUpdateError(ws, baseUrl, e); -  return await guardOperationException( -    () => updateExchangeFromUrlImpl(ws, baseUrl, options), -    onOpErr, -  ); -} -  async function provideExchangeRecord(    ws: InternalWalletState,    baseUrl: string, @@ -398,7 +336,6 @@ async function provideExchangeRecord(          const r: ExchangeRecord = {            permanent: true,            baseUrl: baseUrl, -          retryInfo: RetryInfo.reset(),            detailsPointer: undefined,            lastUpdate: undefined,            nextUpdate: AbsoluteTime.toTimestamp(now), @@ -530,25 +467,42 @@ export async function downloadTosFromAcceptedFormat(    );  } +export async function updateExchangeFromUrl( +  ws: InternalWalletState, +  baseUrl: string, +  options: { +    forceNow?: boolean; +    cancellationToken?: CancellationToken; +  } = {}, +): Promise<{ +  exchange: ExchangeRecord; +  exchangeDetails: ExchangeDetailsRecord; +}> { +  return runOperationHandlerForResult( +    await updateExchangeFromUrlHandler(ws, baseUrl, options), +  ); +} +  /**   * Update or add exchange DB entry by fetching the /keys and /wire information.   * Optionally link the reserve entry to the new or existing   * exchange entry in then DB.   */ -async function updateExchangeFromUrlImpl( +export async function updateExchangeFromUrlHandler(    ws: InternalWalletState,    baseUrl: string,    options: {      forceNow?: boolean;      cancellationToken?: CancellationToken;    } = {}, -): Promise<{ -  exchange: ExchangeRecord; -  exchangeDetails: ExchangeDetailsRecord; -}> { +): Promise< +  OperationAttemptResult<{ +    exchange: ExchangeRecord; +    exchangeDetails: ExchangeDetailsRecord; +  }> +> {    const forceNow = options.forceNow ?? false;    logger.info(`updating exchange info for ${baseUrl}, forced: ${forceNow}`); -  await setupExchangeUpdateRetry(ws, baseUrl, { reset: forceNow });    const now = AbsoluteTime.now();    baseUrl = canonicalizeBaseUrl(baseUrl); @@ -565,7 +519,10 @@ async function updateExchangeFromUrlImpl(      !AbsoluteTime.isExpired(AbsoluteTime.fromTimestamp(exchange.nextUpdate))    ) {      logger.info("using existing exchange info"); -    return { exchange, exchangeDetails }; +    return { +      type: OperationAttemptResultType.Finished, +      result: { exchange, exchangeDetails }, +    };    }    logger.info("updating exchange /keys info"); @@ -649,8 +606,6 @@ async function updateExchangeFromUrlImpl(          termsOfServiceAcceptedTimestamp: TalerProtocolTimestamp.now(),        };        // FIXME: only update if pointer got updated -      delete r.lastError; -      delete r.retryInfo;        r.lastUpdate = TalerProtocolTimestamp.now();        r.nextUpdate = keysInfo.expiry;        // New denominations might be available. @@ -771,8 +726,11 @@ async function updateExchangeFromUrlImpl(      type: NotificationType.ExchangeAdded,    });    return { -    exchange: updated.exchange, -    exchangeDetails: updated.exchangeDetails, +    type: OperationAttemptResultType.Finished, +    result: { +      exchange: updated.exchange, +      exchangeDetails: updated.exchangeDetails, +    },    };  } diff --git a/packages/taler-wallet-core/src/operations/pay.ts b/packages/taler-wallet-core/src/operations/pay.ts index 3d4d2b5a0..9e63cd516 100644 --- a/packages/taler-wallet-core/src/operations/pay.ts +++ b/packages/taler-wallet-core/src/operations/pay.ts @@ -37,9 +37,6 @@ import {    ContractTerms,    ContractTermsUtil,    Duration, -  durationMax, -  durationMin, -  durationMul,    encodeCrock,    ForcedCoinSel,    getRandomBytes, @@ -59,10 +56,7 @@ import {    TransactionType,    URL,  } from "@gnu-taler/taler-util"; -import { -  EXCHANGE_COINS_LOCK, -  InternalWalletState, -} from "../internal-wallet-state.js"; +import { EddsaKeypair } from "../crypto/cryptoImplementation.js";  import {    AbortStatus,    AllowedAuditorInfo, @@ -71,6 +65,8 @@ import {    CoinRecord,    CoinStatus,    DenominationRecord, +  OperationAttemptResult, +  OperationAttemptResultType,    ProposalRecord,    ProposalStatus,    PurchaseRecord, @@ -83,6 +79,11 @@ import {    TalerError,  } from "../errors.js";  import { +  EXCHANGE_COINS_LOCK, +  InternalWalletState, +} from "../internal-wallet-state.js"; +import { assertUnreachable } from "../util/assertUnreachable.js"; +import {    AvailableCoinInfo,    CoinCandidateSelection,    PreviousPayCoins, @@ -98,11 +99,9 @@ import {    throwUnexpectedRequestError,  } from "../util/http.js";  import { GetReadWriteAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; +import { RetryInfo, RetryTags, scheduleRetry } from "../util/retries.js";  import { getExchangeDetails } from "./exchanges.js";  import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js"; -import { guardOperationException } from "./common.js"; -import { EddsaKeypair } from "../crypto/cryptoImplementation.js";  /**   * Logger. @@ -448,10 +447,6 @@ async function recordConfirmPay(      timestampAccept: AbsoluteTime.toTimestamp(AbsoluteTime.now()),      timestampLastRefundStatus: undefined,      proposalId: proposal.proposalId, -    lastPayError: undefined, -    lastRefundStatusError: undefined, -    payRetryInfo: RetryInfo.reset(), -    refundStatusRetryInfo: RetryInfo.reset(),      refundQueryRequested: false,      timestampFirstSuccessfulPay: undefined,      autoRefundDeadline: undefined, @@ -475,8 +470,6 @@ async function recordConfirmPay(        const p = await tx.proposals.get(proposal.proposalId);        if (p) {          p.proposalStatus = ProposalStatus.Accepted; -        delete p.lastError; -        delete p.retryInfo;          await tx.proposals.put(p);        }        await tx.purchases.put(t); @@ -490,117 +483,6 @@ async function recordConfirmPay(    return t;  } -async function reportProposalError( -  ws: InternalWalletState, -  proposalId: string, -  err: TalerErrorDetail, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ proposals: x.proposals })) -    .runReadWrite(async (tx) => { -      const pr = await tx.proposals.get(proposalId); -      if (!pr) { -        return; -      } -      if (!pr.retryInfo) { -        logger.error( -          `Asked to report an error for a proposal (${proposalId}) that is not active (no retryInfo)`, -        ); -        logger.reportBreak(); -        return; -      } -      pr.lastError = err; -      await tx.proposals.put(pr); -    }); -  ws.notify({ type: NotificationType.ProposalOperationError, error: err }); -} - -async function setupProposalRetry( -  ws: InternalWalletState, -  proposalId: string, -  options: { -    reset: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ proposals: x.proposals })) -    .runReadWrite(async (tx) => { -      const pr = await tx.proposals.get(proposalId); -      if (!pr) { -        return; -      } -      if (options.reset) { -        pr.retryInfo = RetryInfo.reset(); -      } else { -        pr.retryInfo = RetryInfo.increment(pr.retryInfo); -      } -      delete pr.lastError; -      await tx.proposals.put(pr); -    }); -} - -async function setupPurchasePayRetry( -  ws: InternalWalletState, -  proposalId: string, -  options: { -    reset: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ purchases: x.purchases })) -    .runReadWrite(async (tx) => { -      const p = await tx.purchases.get(proposalId); -      if (!p) { -        return; -      } -      if (options.reset) { -        p.payRetryInfo = RetryInfo.reset(); -      } else { -        p.payRetryInfo = RetryInfo.increment(p.payRetryInfo); -      } -      delete p.lastPayError; -      await tx.purchases.put(p); -    }); -} - -async function reportPurchasePayError( -  ws: InternalWalletState, -  proposalId: string, -  err: TalerErrorDetail, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ purchases: x.purchases })) -    .runReadWrite(async (tx) => { -      const pr = await tx.purchases.get(proposalId); -      if (!pr) { -        return; -      } -      if (!pr.payRetryInfo) { -        logger.error( -          `purchase record (${proposalId}) reports error, but no retry active`, -        ); -      } -      pr.lastPayError = err; -      await tx.purchases.put(pr); -    }); -  ws.notify({ type: NotificationType.PayOperationError, error: err }); -} - -export async function processDownloadProposal( -  ws: InternalWalletState, -  proposalId: string, -  options: { -    forceNow?: boolean; -  } = {}, -): Promise<void> { -  const onOpErr = (err: TalerErrorDetail): Promise<void> => -    reportProposalError(ws, proposalId, err); -  await guardOperationException( -    () => processDownloadProposalImpl(ws, proposalId, options), -    onOpErr, -  ); -} -  async function failProposalPermanently(    ws: InternalWalletState,    proposalId: string, @@ -613,23 +495,21 @@ async function failProposalPermanently(        if (!p) {          return;        } -      delete p.retryInfo; -      p.lastError = err;        p.proposalStatus = ProposalStatus.PermanentlyFailed;        await tx.proposals.put(p);      });  } -function getProposalRequestTimeout(proposal: ProposalRecord): Duration { +function getProposalRequestTimeout(retryInfo?: RetryInfo): Duration {    return Duration.clamp({      lower: Duration.fromSpec({ seconds: 1 }),      upper: Duration.fromSpec({ seconds: 60 }), -    value: RetryInfo.getDuration(proposal.retryInfo), +    value: retryInfo ? RetryInfo.getDuration(retryInfo) : Duration.fromSpec({}),    });  }  function getPayRequestTimeout(purchase: PurchaseRecord): Duration { -  return durationMul( +  return Duration.multiply(      { d_ms: 15000 },      1 + purchase.payCoinSelection.coinPubs.length / 5,    ); @@ -682,15 +562,13 @@ export function extractContractData(    };  } -async function processDownloadProposalImpl( +export async function processDownloadProposal(    ws: InternalWalletState,    proposalId: string, -  options: { -    forceNow?: boolean; -  } = {}, -): Promise<void> { -  const forceNow = options.forceNow ?? false; -  await setupProposalRetry(ws, proposalId, { reset: forceNow }); +  options: {} = {}, +): Promise<OperationAttemptResult> { + +  const res = ws.db.mktx2((x) => [x.auditorTrust, x.coins])    const proposal = await ws.db      .mktx((x) => ({ proposals: x.proposals })) @@ -699,11 +577,17 @@ async function processDownloadProposalImpl(      });    if (!proposal) { -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    if (proposal.proposalStatus != ProposalStatus.Downloading) { -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    const orderClaimUrl = new URL( @@ -722,8 +606,16 @@ async function processDownloadProposalImpl(      requestBody.token = proposal.claimToken;    } +  const opId = RetryTags.forProposalClaim(proposal); +  const retryRecord = await ws.db +    .mktx((x) => ({ operationRetries: x.operationRetries })) +    .runReadOnly(async (tx) => { +      return tx.operationRetries.get(opId); +    }); + +  // FIXME: Do this in the background using the new return value    const httpResponse = await ws.http.postJson(orderClaimUrl, requestBody, { -    timeout: getProposalRequestTimeout(proposal), +    timeout: getProposalRequestTimeout(retryRecord?.retryInfo),    });    const r = await readSuccessResponseJsonOrErrorCode(      httpResponse, @@ -892,6 +784,11 @@ async function processDownloadProposalImpl(      type: NotificationType.ProposalDownloaded,      proposalId: proposal.proposalId,    }); + +  return { +    type: OperationAttemptResultType.Finished, +    result: undefined, +  };  }  /** @@ -954,8 +851,6 @@ async function startDownloadProposal(      proposalId: proposalId,      proposalStatus: ProposalStatus.Downloading,      repurchaseProposalId: undefined, -    retryInfo: RetryInfo.reset(), -    lastError: undefined,      downloadSessionId: sessionId,    }; @@ -1000,17 +895,13 @@ async function storeFirstPaySuccess(        }        purchase.timestampFirstSuccessfulPay = now;        purchase.paymentSubmitPending = false; -      purchase.lastPayError = undefined;        purchase.lastSessionId = sessionId; -      purchase.payRetryInfo = RetryInfo.reset();        purchase.merchantPaySig = paySig;        const protoAr = purchase.download.contractData.autoRefund;        if (protoAr) {          const ar = Duration.fromTalerProtocolDuration(protoAr);          logger.info("auto_refund present");          purchase.refundQueryRequested = true; -        purchase.refundStatusRetryInfo = RetryInfo.reset(); -        purchase.lastRefundStatusError = undefined;          purchase.autoRefundDeadline = AbsoluteTime.toTimestamp(            AbsoluteTime.addDuration(AbsoluteTime.now(), ar),          ); @@ -1038,8 +929,6 @@ async function storePayReplaySuccess(          throw Error("invalid payment state");        }        purchase.paymentSubmitPending = false; -      purchase.lastPayError = undefined; -      purchase.payRetryInfo = RetryInfo.reset();        purchase.lastSessionId = sessionId;        await tx.purchases.put(purchase);      }); @@ -1298,7 +1187,8 @@ export async function checkPaymentByProposalId(          await tx.purchases.put(p);        });      const r = await processPurchasePay(ws, proposalId, { forceNow: true }); -    if (r.type !== ConfirmPayResultType.Done) { +    if (r.type !== OperationAttemptResultType.Finished) { +      // FIXME: This does not surface the original error        throw Error("submitting pay failed");      }      return { @@ -1458,6 +1348,45 @@ export async function generateDepositPermissions(  }  /** + * Run the operation handler for a payment + * and return the result as a {@link ConfirmPayResult}. + */ +export async function runPayForConfirmPay( +  ws: InternalWalletState, +  proposalId: string, +): Promise<ConfirmPayResult> { +  const res = await processPurchasePay(ws, proposalId, { forceNow: true }); +  switch (res.type) { +    case OperationAttemptResultType.Finished: { +      const purchase = await ws.db +        .mktx((x) => ({ purchases: x.purchases })) +        .runReadOnly(async (tx) => { +          return tx.purchases.get(proposalId); +        }); +      if (!purchase?.download) { +        throw Error("purchase record not available anymore"); +      } +      return { +        type: ConfirmPayResultType.Done, +        contractTerms: purchase.download.contractTermsRaw, +      }; +    } +    case OperationAttemptResultType.Error: +      // FIXME: allocate error code! +      throw Error("payment failed"); +    case OperationAttemptResultType.Pending: +      return { +        type: ConfirmPayResultType.Pending, +        lastError: undefined, +      }; +    case OperationAttemptResultType.Longpoll: +      throw Error("unexpected processPurchasePay result (longpoll)"); +    default: +      assertUnreachable(res); +  } +} + +/**   * Add a contract to the wallet and sign coins, and send them.   */  export async function confirmPay( @@ -1503,7 +1432,7 @@ export async function confirmPay(    if (existingPurchase) {      logger.trace("confirmPay: submitting payment for existing purchase"); -    return await processPurchasePay(ws, proposalId, { forceNow: true }); +    return runPayForConfirmPay(ws, proposalId);    }    logger.trace("confirmPay: purchase record does not exist yet"); @@ -1559,6 +1488,7 @@ export async function confirmPay(      res,      d.contractData,    ); +    await recordConfirmPay(      ws,      proposal, @@ -1567,7 +1497,7 @@ export async function confirmPay(      sessionIdOverride,    ); -  return await processPurchasePay(ws, proposalId, { forceNow: true }); +  return runPayForConfirmPay(ws, proposalId);  }  export async function processPurchasePay( @@ -1576,24 +1506,7 @@ export async function processPurchasePay(    options: {      forceNow?: boolean;    } = {}, -): Promise<ConfirmPayResult> { -  const onOpErr = (e: TalerErrorDetail): Promise<void> => -    reportPurchasePayError(ws, proposalId, e); -  return await guardOperationException( -    () => processPurchasePayImpl(ws, proposalId, options), -    onOpErr, -  ); -} - -async function processPurchasePayImpl( -  ws: InternalWalletState, -  proposalId: string, -  options: { -    forceNow?: boolean; -  } = {}, -): Promise<ConfirmPayResult> { -  const forceNow = options.forceNow ?? false; -  await setupPurchasePayRetry(ws, proposalId, { reset: forceNow }); +): Promise<OperationAttemptResult> {    const purchase = await ws.db      .mktx((x) => ({ purchases: x.purchases }))      .runReadOnly(async (tx) => { @@ -1601,8 +1514,8 @@ async function processPurchasePayImpl(      });    if (!purchase) {      return { -      type: ConfirmPayResultType.Pending, -      lastError: { +      type: OperationAttemptResultType.Error, +      errorDetail: {          // FIXME: allocate more specific error code          code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,          hint: `trying to pay for purchase that is not in the database`, @@ -1611,10 +1524,7 @@ async function processPurchasePayImpl(      };    }    if (!purchase.paymentSubmitPending) { -    return { -      type: ConfirmPayResultType.Pending, -      lastError: purchase.lastPayError, -    }; +    OperationAttemptResult.finishedEmpty();    }    logger.trace(`processing purchase pay ${proposalId}`); @@ -1659,23 +1569,12 @@ async function processPurchasePayImpl(      logger.trace(`got resp ${JSON.stringify(resp)}`); -    // Hide transient errors. -    if ( -      (purchase.payRetryInfo?.retryCounter ?? 0) <= 5 && -      resp.status >= 500 && -      resp.status <= 599 -    ) { -      logger.trace("treating /pay error as transient"); -      const err = makeErrorDetail( -        TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, -        getHttpResponseErrorDetails(resp), -        "/pay failed", -      ); -      return { -        type: ConfirmPayResultType.Pending, -        lastError: err, -      }; -    } +    const payOpId = RetryTags.forPay(purchase); +    const payRetryRecord = await ws.db +      .mktx((x) => ({ operationRetries: x.operationRetries })) +      .runReadOnly(async (tx) => { +        return await tx.operationRetries.get(payOpId); +      });      if (resp.status === HttpStatusCode.BadRequest) {        const errDetails = await readUnexpectedResponseDetails(resp); @@ -1689,8 +1588,6 @@ async function processPurchasePayImpl(              return;            }            purch.payFrozen = true; -          purch.lastPayError = errDetails; -          delete purch.payRetryInfo;            await tx.purchases.put(purch);          });        throw makePendingOperationFailedError( @@ -1708,7 +1605,9 @@ async function processPurchasePayImpl(        ) {          // Do this in the background, as it might take some time          handleInsufficientFunds(ws, proposalId, err).catch(async (e) => { -          reportPurchasePayError(ws, proposalId, { +          console.log("handling insufficient funds failed"); + +          await scheduleRetry(ws, RetryTags.forPay(purchase), {              code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,              message: "unexpected exception",              hint: "unexpected exception", @@ -1719,9 +1618,8 @@ async function processPurchasePayImpl(          });          return { -          type: ConfirmPayResultType.Pending, -          // FIXME: should we return something better here? -          lastError: err, +          type: OperationAttemptResultType.Pending, +          result: undefined,          };        }      } @@ -1761,22 +1659,6 @@ async function processPurchasePayImpl(      const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () =>        ws.http.postJson(payAgainUrl, reqBody),      ); -    // Hide transient errors. -    if ( -      (purchase.payRetryInfo?.retryCounter ?? 0) <= 5 && -      resp.status >= 500 && -      resp.status <= 599 -    ) { -      const err = makeErrorDetail( -        TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, -        getHttpResponseErrorDetails(resp), -        "/paid failed", -      ); -      return { -        type: ConfirmPayResultType.Pending, -        lastError: err, -      }; -    }      if (resp.status !== 204) {        throw TalerError.fromDetail(          TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, @@ -1793,10 +1675,7 @@ async function processPurchasePayImpl(      proposalId: purchase.proposalId,    }); -  return { -    type: ConfirmPayResultType.Done, -    contractTerms: purchase.download.contractTermsRaw, -  }; +  return OperationAttemptResult.finishedEmpty();  }  export async function refuseProposal( diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index 5cf3afd4d..7d5a5bfd9 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -36,40 +36,50 @@ import {  import { AbsoluteTime } from "@gnu-taler/taler-util";  import { InternalWalletState } from "../internal-wallet-state.js";  import { GetReadOnlyAccess } from "../util/query.js"; +import { RetryTags } from "../util/retries.js"; +import { Wallet } from "../wallet.js";  async function gatherExchangePending(    tx: GetReadOnlyAccess<{      exchanges: typeof WalletStoresV1.exchanges;      exchangeDetails: typeof WalletStoresV1.exchangeDetails; +    operationRetries: typeof WalletStoresV1.operationRetries;    }>,    now: AbsoluteTime,    resp: PendingOperationsResponse,  ): Promise<void> { -  await tx.exchanges.iter().forEachAsync(async (e) => { +  await tx.exchanges.iter().forEachAsync(async (exch) => { +    const opTag = RetryTags.forExchangeUpdate(exch); +    let opr = await tx.operationRetries.get(opTag);      resp.pendingOperations.push({        type: PendingTaskType.ExchangeUpdate, +      id: opTag,        givesLifeness: false,        timestampDue: -        e.retryInfo?.nextRetry ?? AbsoluteTime.fromTimestamp(e.nextUpdate), -      exchangeBaseUrl: e.baseUrl, -      lastError: e.lastError, +        opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate), +      exchangeBaseUrl: exch.baseUrl, +      lastError: opr?.lastError,      });      // We only schedule a check for auto-refresh if the exchange update      // was successful. -    if (!e.lastError) { +    if (!opr?.lastError) {        resp.pendingOperations.push({          type: PendingTaskType.ExchangeCheckRefresh, -        timestampDue: AbsoluteTime.fromTimestamp(e.nextRefreshCheck), +        id: RetryTags.forExchangeCheckRefresh(exch), +        timestampDue: AbsoluteTime.fromTimestamp(exch.nextRefreshCheck),          givesLifeness: false, -        exchangeBaseUrl: e.baseUrl, +        exchangeBaseUrl: exch.baseUrl,        });      }    });  }  async function gatherRefreshPending( -  tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>, +  tx: GetReadOnlyAccess<{ +    refreshGroups: typeof WalletStoresV1.refreshGroups; +    operationRetries: typeof WalletStoresV1.operationRetries; +  }>,    now: AbsoluteTime,    resp: PendingOperationsResponse,  ): Promise<void> { @@ -83,15 +93,19 @@ async function gatherRefreshPending(      if (r.frozen) {        return;      } +    const opId = RetryTags.forRefresh(r); +    const retryRecord = await tx.operationRetries.get(opId); +      resp.pendingOperations.push({        type: PendingTaskType.Refresh, +      id: opId,        givesLifeness: true, -      timestampDue: r.retryInfo.nextRetry, +      timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),        refreshGroupId: r.refreshGroupId,        finishedPerCoin: r.statusPerCoin.map(          (x) => x === RefreshCoinStatus.Finished,        ), -      retryInfo: r.retryInfo, +      retryInfo: retryRecord?.retryInfo,      });    }  } @@ -100,6 +114,7 @@ async function gatherWithdrawalPending(    tx: GetReadOnlyAccess<{      withdrawalGroups: typeof WalletStoresV1.withdrawalGroups;      planchets: typeof WalletStoresV1.planchets; +    operationRetries: typeof WalletStoresV1.operationRetries;    }>,    now: AbsoluteTime,    resp: PendingOperationsResponse, @@ -111,54 +126,68 @@ async function gatherWithdrawalPending(      if (wsr.timestampFinish) {        return;      } -    let numCoinsWithdrawn = 0; -    let numCoinsTotal = 0; -    await tx.planchets.indexes.byGroup -      .iter(wsr.withdrawalGroupId) -      .forEach((x) => { -        numCoinsTotal++; -        if (x.withdrawalDone) { -          numCoinsWithdrawn++; -        } -      }); +    const opTag = RetryTags.forWithdrawal(wsr); +    let opr = await tx.operationRetries.get(opTag); +    const now = AbsoluteTime.now(); +    if (!opr) { +      opr = { +        id: opTag, +        retryInfo: { +          firstTry: now, +          nextRetry: now, +          retryCounter: 0, +        }, +      }; +    }      resp.pendingOperations.push({        type: PendingTaskType.Withdraw, +      id: opTag,        givesLifeness: true, -      timestampDue: wsr.retryInfo?.nextRetry ?? AbsoluteTime.now(), +      timestampDue: opr.retryInfo?.nextRetry ?? AbsoluteTime.now(),        withdrawalGroupId: wsr.withdrawalGroupId, -      lastError: wsr.lastError, -      retryInfo: wsr.retryInfo, +      lastError: opr.lastError, +      retryInfo: opr.retryInfo,      });    }  }  async function gatherProposalPending( -  tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>, +  tx: GetReadOnlyAccess<{ +    proposals: typeof WalletStoresV1.proposals; +    operationRetries: typeof WalletStoresV1.operationRetries; +  }>,    now: AbsoluteTime,    resp: PendingOperationsResponse,  ): Promise<void> { -  await tx.proposals.iter().forEach((proposal) => { +  await tx.proposals.iter().forEachAsync(async (proposal) => {      if (proposal.proposalStatus == ProposalStatus.Proposed) {        // Nothing to do, user needs to choose.      } else if (proposal.proposalStatus == ProposalStatus.Downloading) { -      const timestampDue = proposal.retryInfo?.nextRetry ?? AbsoluteTime.now(); +      const opId = RetryTags.forProposalClaim(proposal); +      const retryRecord = await tx.operationRetries.get(opId); +      const timestampDue = +        retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now();        resp.pendingOperations.push({          type: PendingTaskType.ProposalDownload, +        id: opId,          givesLifeness: true,          timestampDue,          merchantBaseUrl: proposal.merchantBaseUrl,          orderId: proposal.orderId,          proposalId: proposal.proposalId,          proposalTimestamp: proposal.timestamp, -        lastError: proposal.lastError, -        retryInfo: proposal.retryInfo, +        lastError: retryRecord?.lastError, +        retryInfo: retryRecord?.retryInfo,        });      }    });  }  async function gatherDepositPending( -  tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>, +  tx: GetReadOnlyAccess<{ +    depositGroups: typeof WalletStoresV1.depositGroups; +    operationRetries: typeof WalletStoresV1.operationRetries; +  }>,    now: AbsoluteTime,    resp: PendingOperationsResponse,  ): Promise<void> { @@ -169,32 +198,42 @@ async function gatherDepositPending(      if (dg.timestampFinished) {        return;      } -    const timestampDue = dg.retryInfo?.nextRetry ?? AbsoluteTime.now(); +    const opId = RetryTags.forDeposit(dg); +    const retryRecord = await tx.operationRetries.get(opId); +    const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();      resp.pendingOperations.push({        type: PendingTaskType.Deposit, +      id: opId,        givesLifeness: true,        timestampDue,        depositGroupId: dg.depositGroupId, -      lastError: dg.lastError, -      retryInfo: dg.retryInfo, +      lastError: retryRecord?.lastError, +      retryInfo: retryRecord?.retryInfo,      });    }  }  async function gatherTipPending( -  tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>, +  tx: GetReadOnlyAccess<{ +    tips: typeof WalletStoresV1.tips; +    operationRetries: typeof WalletStoresV1.operationRetries; +  }>,    now: AbsoluteTime,    resp: PendingOperationsResponse,  ): Promise<void> { -  await tx.tips.iter().forEach((tip) => { +  await tx.tips.iter().forEachAsync(async (tip) => { +    // FIXME: The tip record needs a proper status field!      if (tip.pickedUpTimestamp) {        return;      } +    const opId = RetryTags.forTipPickup(tip); +    const retryRecord = await tx.operationRetries.get(opId);      if (tip.acceptedTimestamp) {        resp.pendingOperations.push({          type: PendingTaskType.TipPickup, +        id: opId,          givesLifeness: true, -        timestampDue: tip.retryInfo.nextRetry, +        timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),          merchantBaseUrl: tip.merchantBaseUrl,          tipId: tip.walletTipId,          merchantTipId: tip.merchantTipId, @@ -204,56 +243,77 @@ async function gatherTipPending(  }  async function gatherPurchasePending( -  tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>, +  tx: GetReadOnlyAccess<{ +    purchases: typeof WalletStoresV1.purchases; +    operationRetries: typeof WalletStoresV1.operationRetries; +  }>,    now: AbsoluteTime,    resp: PendingOperationsResponse,  ): Promise<void> { -  await tx.purchases.iter().forEach((pr) => { +  // FIXME: Only iter purchases with some "active" flag! +  await tx.purchases.iter().forEachAsync(async (pr) => {      if (        pr.paymentSubmitPending &&        pr.abortStatus === AbortStatus.None &&        !pr.payFrozen      ) { -      const timestampDue = pr.payRetryInfo?.nextRetry ?? AbsoluteTime.now(); +      const payOpId = RetryTags.forPay(pr); +      const payRetryRecord = await tx.operationRetries.get(payOpId); + +      const timestampDue = +        payRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();        resp.pendingOperations.push({          type: PendingTaskType.Pay, +        id: payOpId,          givesLifeness: true,          timestampDue,          isReplay: false,          proposalId: pr.proposalId, -        retryInfo: pr.payRetryInfo, -        lastError: pr.lastPayError, +        retryInfo: payRetryRecord?.retryInfo, +        lastError: payRetryRecord?.lastError,        });      }      if (pr.refundQueryRequested) { +      const refundQueryOpId = RetryTags.forRefundQuery(pr); +      const refundQueryRetryRecord = await tx.operationRetries.get( +        refundQueryOpId, +      );        resp.pendingOperations.push({          type: PendingTaskType.RefundQuery, +        id: refundQueryOpId,          givesLifeness: true, -        timestampDue: pr.refundStatusRetryInfo.nextRetry, +        timestampDue: +          refundQueryRetryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),          proposalId: pr.proposalId, -        retryInfo: pr.refundStatusRetryInfo, -        lastError: pr.lastRefundStatusError, +        retryInfo: refundQueryRetryRecord?.retryInfo, +        lastError: refundQueryRetryRecord?.lastError,        });      }    });  }  async function gatherRecoupPending( -  tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>, +  tx: GetReadOnlyAccess<{ +    recoupGroups: typeof WalletStoresV1.recoupGroups; +    operationRetries: typeof WalletStoresV1.operationRetries; +  }>,    now: AbsoluteTime,    resp: PendingOperationsResponse,  ): Promise<void> { -  await tx.recoupGroups.iter().forEach((rg) => { +  await tx.recoupGroups.iter().forEachAsync(async (rg) => {      if (rg.timestampFinished) {        return;      } +    const opId = RetryTags.forRecoup(rg); +    const retryRecord = await tx.operationRetries.get(opId);      resp.pendingOperations.push({        type: PendingTaskType.Recoup, +      id: opId,        givesLifeness: true, -      timestampDue: rg.retryInfo.nextRetry, +      timestampDue: retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(),        recoupGroupId: rg.recoupGroupId, -      retryInfo: rg.retryInfo, -      lastError: rg.lastError, +      retryInfo: retryRecord?.retryInfo, +      lastError: retryRecord?.lastError,      });    });  } @@ -261,14 +321,18 @@ async function gatherRecoupPending(  async function gatherBackupPending(    tx: GetReadOnlyAccess<{      backupProviders: typeof WalletStoresV1.backupProviders; +    operationRetries: typeof WalletStoresV1.operationRetries;    }>,    now: AbsoluteTime,    resp: PendingOperationsResponse,  ): Promise<void> { -  await tx.backupProviders.iter().forEach((bp) => { +  await tx.backupProviders.iter().forEachAsync(async (bp) => { +    const opId = RetryTags.forBackup(bp); +    const retryRecord = await tx.operationRetries.get(opId);      if (bp.state.tag === BackupProviderStateTag.Ready) {        resp.pendingOperations.push({          type: PendingTaskType.Backup, +        id: opId,          givesLifeness: false,          timestampDue: AbsoluteTime.fromTimestamp(bp.state.nextBackupTimestamp),          backupProviderBaseUrl: bp.baseUrl, @@ -277,11 +341,12 @@ async function gatherBackupPending(      } else if (bp.state.tag === BackupProviderStateTag.Retrying) {        resp.pendingOperations.push({          type: PendingTaskType.Backup, +        id: opId,          givesLifeness: false, -        timestampDue: bp.state.retryInfo.nextRetry, +        timestampDue: retryRecord?.retryInfo?.nextRetry ?? AbsoluteTime.now(),          backupProviderBaseUrl: bp.baseUrl, -        retryInfo: bp.state.retryInfo, -        lastError: bp.state.lastError, +        retryInfo: retryRecord?.retryInfo, +        lastError: retryRecord?.lastError,        });      }    }); @@ -305,6 +370,7 @@ export async function getPendingOperations(        planchets: x.planchets,        depositGroups: x.depositGroups,        recoupGroups: x.recoupGroups, +      operationRetries: x.operationRetries,      }))      .runReadWrite(async (tx) => {        const resp: PendingOperationsResponse = { diff --git a/packages/taler-wallet-core/src/operations/recoup.ts b/packages/taler-wallet-core/src/operations/recoup.ts index 283707947..387c23f41 100644 --- a/packages/taler-wallet-core/src/operations/recoup.ts +++ b/packages/taler-wallet-core/src/operations/recoup.ts @@ -42,6 +42,8 @@ import {    CoinRecord,    CoinSourceType,    CoinStatus, +  OperationAttemptResult, +  OperationAttemptResultType,    RecoupGroupRecord,    RefreshCoinSource,    ReserveRecordStatus, @@ -52,64 +54,13 @@ import {  import { InternalWalletState } from "../internal-wallet-state.js";  import { readSuccessResponseJsonOrThrow } from "../util/http.js";  import { GetReadWriteAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; +import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";  import { guardOperationException } from "./common.js";  import { createRefreshGroup, processRefreshGroup } from "./refresh.js";  import { internalCreateWithdrawalGroup } from "./withdraw.js";  const logger = new Logger("operations/recoup.ts"); -async function setupRecoupRetry( -  ws: InternalWalletState, -  recoupGroupId: string, -  options: { -    reset: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      recoupGroups: x.recoupGroups, -    })) -    .runReadWrite(async (tx) => { -      const r = await tx.recoupGroups.get(recoupGroupId); -      if (!r) { -        return; -      } -      if (options.reset) { -        r.retryInfo = RetryInfo.reset(); -      } else { -        r.retryInfo = RetryInfo.increment(r.retryInfo); -      } -      delete r.lastError; -      await tx.recoupGroups.put(r); -    }); -} - -async function reportRecoupError( -  ws: InternalWalletState, -  recoupGroupId: string, -  err: TalerErrorDetail, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      recoupGroups: x.recoupGroups, -    })) -    .runReadWrite(async (tx) => { -      const r = await tx.recoupGroups.get(recoupGroupId); -      if (!r) { -        return; -      } -      if (!r.retryInfo) { -        logger.error( -          "reporting error for inactive recoup group (no retry info)", -        ); -      } -      r.lastError = err; -      await tx.recoupGroups.put(r); -    }); -  ws.notify({ type: NotificationType.RecoupOperationError, error: err }); -} -  /**   * Store a recoup group record in the database after marking   * a coin in the group as finished. @@ -353,25 +304,20 @@ export async function processRecoupGroup(      forceNow?: boolean;    } = {},  ): Promise<void> { -  await ws.memoProcessRecoup.memo(recoupGroupId, async () => { -    const onOpErr = (e: TalerErrorDetail): Promise<void> => -      reportRecoupError(ws, recoupGroupId, e); -    return await guardOperationException( -      async () => await processRecoupGroupImpl(ws, recoupGroupId, options), -      onOpErr, -    ); -  }); +  await runOperationHandlerForResult( +    await processRecoupGroupHandler(ws, recoupGroupId, options), +  ); +  return;  } -async function processRecoupGroupImpl( +export async function processRecoupGroupHandler(    ws: InternalWalletState,    recoupGroupId: string,    options: {      forceNow?: boolean;    } = {}, -): Promise<void> { +): Promise<OperationAttemptResult> {    const forceNow = options.forceNow ?? false; -  await setupRecoupRetry(ws, recoupGroupId, { reset: forceNow });    let recoupGroup = await ws.db      .mktx((x) => ({        recoupGroups: x.recoupGroups, @@ -380,11 +326,11 @@ async function processRecoupGroupImpl(        return tx.recoupGroups.get(recoupGroupId);      });    if (!recoupGroup) { -    return; +    return OperationAttemptResult.finishedEmpty();    }    if (recoupGroup.timestampFinished) {      logger.trace("recoup group finished"); -    return; +    return OperationAttemptResult.finishedEmpty();    }    const ps = recoupGroup.coinPubs.map(async (x, i) => {      try { @@ -404,12 +350,12 @@ async function processRecoupGroupImpl(        return tx.recoupGroups.get(recoupGroupId);      });    if (!recoupGroup) { -    return; +    return OperationAttemptResult.finishedEmpty();    }    for (const b of recoupGroup.recoupFinishedPerCoin) {      if (!b) { -      return; +      return OperationAttemptResult.finishedEmpty();      }    } @@ -480,8 +426,6 @@ async function processRecoupGroupImpl(          return;        }        rg2.timestampFinished = TalerProtocolTimestamp.now(); -      rg2.retryInfo = RetryInfo.reset(); -      rg2.lastError = undefined;        if (rg2.scheduleRefreshCoins.length > 0) {          const refreshGroupId = await createRefreshGroup(            ws, @@ -495,6 +439,7 @@ async function processRecoupGroupImpl(        }        await tx.recoupGroups.put(rg2);      }); +  return OperationAttemptResult.finishedEmpty();  }  export async function createRecoupGroup( @@ -514,10 +459,8 @@ export async function createRecoupGroup(      recoupGroupId,      exchangeBaseUrl: exchangeBaseUrl,      coinPubs: coinPubs, -    lastError: undefined,      timestampFinished: undefined,      timestampStarted: TalerProtocolTimestamp.now(), -    retryInfo: RetryInfo.reset(),      recoupFinishedPerCoin: coinPubs.map(() => false),      // Will be populated later      oldAmountPerCoin: [], diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 64a734bb3..64b85a040 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -57,6 +57,8 @@ import {    CoinSourceType,    CoinStatus,    DenominationRecord, +  OperationAttemptResult, +  OperationAttemptResultType,    OperationStatus,    RefreshCoinStatus,    RefreshGroupRecord, @@ -74,7 +76,7 @@ import {  } from "../util/http.js";  import { checkDbInvariant } from "../util/invariants.js";  import { GetReadWriteAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; +import { RetryInfo, runOperationHandlerForResult } from "../util/retries.js";  import { guardOperationException } from "./common.js";  import { updateExchangeFromUrl } from "./exchanges.js";  import { @@ -133,11 +135,9 @@ function updateGroupStatus(rg: RefreshGroupRecord): void {    if (allDone) {      if (anyFrozen) {        rg.frozen = true; -      rg.retryInfo = RetryInfo.reset();      } else {        rg.timestampFinished = AbsoluteTime.toTimestamp(AbsoluteTime.now());        rg.operationStatus = OperationStatus.Finished; -      rg.retryInfo = RetryInfo.reset();      }    }  } @@ -730,89 +730,14 @@ async function refreshReveal(    });  } -async function setupRefreshRetry( -  ws: InternalWalletState, -  refreshGroupId: string, -  options: { -    reset: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      refreshGroups: x.refreshGroups, -    })) -    .runReadWrite(async (tx) => { -      const r = await tx.refreshGroups.get(refreshGroupId); -      if (!r) { -        return; -      } -      if (options.reset) { -        r.retryInfo = RetryInfo.reset(); -      } else { -        r.retryInfo = RetryInfo.increment(r.retryInfo); -      } -      delete r.lastError; -      await tx.refreshGroups.put(r); -    }); -} - -async function reportRefreshError( -  ws: InternalWalletState, -  refreshGroupId: string, -  err: TalerErrorDetail | undefined, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      refreshGroups: x.refreshGroups, -    })) -    .runReadWrite(async (tx) => { -      const r = await tx.refreshGroups.get(refreshGroupId); -      if (!r) { -        return; -      } -      if (!r.retryInfo) { -        logger.error( -          "reported error for inactive refresh group (no retry info)", -        ); -      } -      r.lastError = err; -      await tx.refreshGroups.put(r); -    }); -  if (err) { -    ws.notify({ type: NotificationType.RefreshOperationError, error: err }); -  } -} - -/** - * Actually process a refresh group that has been created. - */  export async function processRefreshGroup(    ws: InternalWalletState,    refreshGroupId: string,    options: {      forceNow?: boolean;    } = {}, -): Promise<void> { -  await ws.memoProcessRefresh.memo(refreshGroupId, async () => { -    const onOpErr = (e: TalerErrorDetail): Promise<void> => -      reportRefreshError(ws, refreshGroupId, e); -    return await guardOperationException( -      async () => await processRefreshGroupImpl(ws, refreshGroupId, options), -      onOpErr, -    ); -  }); -} - -async function processRefreshGroupImpl( -  ws: InternalWalletState, -  refreshGroupId: string, -  options: { -    forceNow?: boolean; -  } = {}, -): Promise<void> { -  const forceNow = options.forceNow ?? false; +): Promise<OperationAttemptResult> {    logger.info(`processing refresh group ${refreshGroupId}`); -  await setupRefreshRetry(ws, refreshGroupId, { reset: forceNow });    const refreshGroup = await ws.db      .mktx((x) => ({ @@ -822,10 +747,16 @@ async function processRefreshGroupImpl(        return tx.refreshGroups.get(refreshGroupId);      });    if (!refreshGroup) { -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    if (refreshGroup.timestampFinished) { -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    // Process refresh sessions of the group in parallel.    logger.trace("processing refresh sessions for old coins"); @@ -855,6 +786,10 @@ async function processRefreshGroupImpl(      logger.warn("process refresh sessions got exception");      logger.warn(`exception: ${e}`);    } +  return { +    type: OperationAttemptResultType.Finished, +    result: undefined, +  };  }  async function processRefreshSession( @@ -975,13 +910,11 @@ export async function createRefreshGroup(      operationStatus: OperationStatus.Pending,      timestampFinished: undefined,      statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending), -    lastError: undefined,      lastErrorPerCoin: {},      oldCoinPubs: oldCoinPubs.map((x) => x.coinPub),      reason,      refreshGroupId,      refreshSessionPerCoin: oldCoinPubs.map(() => undefined), -    retryInfo: RetryInfo.reset(),      inputPerCoin,      estimatedOutputPerCoin,      timestampCreated: TalerProtocolTimestamp.now(), @@ -1034,7 +967,7 @@ function getAutoRefreshExecuteThreshold(d: DenominationRecord): AbsoluteTime {  export async function autoRefresh(    ws: InternalWalletState,    exchangeBaseUrl: string, -): Promise<void> { +): Promise<OperationAttemptResult> {    logger.info(`doing auto-refresh check for '${exchangeBaseUrl}'`);    // We must make sure that the exchange is up-to-date so that @@ -1109,4 +1042,5 @@ export async function autoRefresh(        exchange.nextRefreshCheck = AbsoluteTime.toTimestamp(minCheckThreshold);        await tx.exchanges.put(exchange);      }); +  return OperationAttemptResult.finishedEmpty();  } diff --git a/packages/taler-wallet-core/src/operations/refund.ts b/packages/taler-wallet-core/src/operations/refund.ts index 8f5c1143a..bc8c185db 100644 --- a/packages/taler-wallet-core/src/operations/refund.ts +++ b/packages/taler-wallet-core/src/operations/refund.ts @@ -51,6 +51,7 @@ import {  import {    AbortStatus,    CoinStatus, +  OperationAttemptResult,    PurchaseRecord,    RefundReason,    RefundState, @@ -60,8 +61,6 @@ import { InternalWalletState } from "../internal-wallet-state.js";  import { readSuccessResponseJsonOrThrow } from "../util/http.js";  import { checkDbInvariant } from "../util/invariants.js";  import { GetReadWriteAccess } from "../util/query.js"; -import { RetryInfo } from "../util/retries.js"; -import { guardOperationException } from "./common.js";  import { createRefreshGroup, getTotalRefreshCost } from "./refresh.js";  const logger = new Logger("refund.ts"); @@ -120,68 +119,6 @@ export async function prepareRefund(      },    };  } -/** - * Retry querying and applying refunds for an order later. - */ -async function setupPurchaseQueryRefundRetry( -  ws: InternalWalletState, -  proposalId: string, -  options: { -    reset: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      purchases: x.purchases, -    })) -    .runReadWrite(async (tx) => { -      const pr = await tx.purchases.get(proposalId); -      if (!pr) { -        return; -      } -      if (options.reset) { -        pr.refundStatusRetryInfo = RetryInfo.reset(); -      } else { -        pr.refundStatusRetryInfo = RetryInfo.increment( -          pr.refundStatusRetryInfo, -        ); -      } -      await tx.purchases.put(pr); -    }); -} - -/** - * Report an error that happending when querying for a purchase's refund. - */ -async function reportPurchaseQueryRefundError( -  ws: InternalWalletState, -  proposalId: string, -  err: TalerErrorDetail, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      purchases: x.purchases, -    })) -    .runReadWrite(async (tx) => { -      const pr = await tx.purchases.get(proposalId); -      if (!pr) { -        return; -      } -      if (!pr.refundStatusRetryInfo) { -        logger.error( -          "reported error on an inactive purchase (no refund status retry info)", -        ); -      } -      pr.lastRefundStatusError = err; -      await tx.purchases.put(pr); -    }); -  if (err) { -    ws.notify({ -      type: NotificationType.RefundStatusOperationError, -      error: err, -    }); -  } -}  function getRefundKey(d: MerchantCoinRefundStatus): string {    return `${d.coin_pub}-${d.rtransaction_id}`; @@ -492,8 +429,6 @@ async function acceptRefunds(        if (queryDone) {          p.timestampLastRefundStatus = now; -        p.lastRefundStatusError = undefined; -        p.refundStatusRetryInfo = RetryInfo.reset();          p.refundQueryRequested = false;          if (p.abortStatus === AbortStatus.AbortRefund) {            p.abortStatus = AbortStatus.AbortFinished; @@ -502,8 +437,6 @@ async function acceptRefunds(        } else {          // No error, but we need to try again!          p.timestampLastRefundStatus = now; -        p.refundStatusRetryInfo = RetryInfo.increment(p.refundStatusRetryInfo); -        p.lastRefundStatusError = undefined;          logger.trace("refund query not done");        } @@ -621,8 +554,6 @@ export async function applyRefundFromPurchaseId(          return false;        }        p.refundQueryRequested = true; -      p.lastRefundStatusError = undefined; -      p.refundStatusRetryInfo = RetryInfo.reset();        await tx.purchases.put(p);        return true;      }); @@ -631,7 +562,7 @@ export async function applyRefundFromPurchaseId(      ws.notify({        type: NotificationType.RefundStarted,      }); -    await processPurchaseQueryRefundImpl(ws, proposalId, { +    await processPurchaseQueryRefund(ws, proposalId, {        forceNow: true,        waitForAutoRefund: false,      }); @@ -672,22 +603,6 @@ export async function applyRefundFromPurchaseId(    };  } -export async function processPurchaseQueryRefund( -  ws: InternalWalletState, -  proposalId: string, -  options: { -    forceNow?: boolean; -    waitForAutoRefund?: boolean; -  } = {}, -): Promise<void> { -  const onOpErr = (e: TalerErrorDetail): Promise<void> => -    reportPurchaseQueryRefundError(ws, proposalId, e); -  await guardOperationException( -    () => processPurchaseQueryRefundImpl(ws, proposalId, options), -    onOpErr, -  ); -} -  async function queryAndSaveAwaitingRefund(    ws: InternalWalletState,    purchase: PurchaseRecord, @@ -742,17 +657,15 @@ async function queryAndSaveAwaitingRefund(    return refundAwaiting;  } -async function processPurchaseQueryRefundImpl( +export async function processPurchaseQueryRefund(    ws: InternalWalletState,    proposalId: string,    options: {      forceNow?: boolean;      waitForAutoRefund?: boolean;    } = {}, -): Promise<void> { -  const forceNow = options.forceNow ?? false; +): Promise<OperationAttemptResult> {    const waitForAutoRefund = options.waitForAutoRefund ?? false; -  await setupPurchaseQueryRefundRetry(ws, proposalId, { reset: forceNow });    const purchase = await ws.db      .mktx((x) => ({        purchases: x.purchases, @@ -761,11 +674,11 @@ async function processPurchaseQueryRefundImpl(        return tx.purchases.get(proposalId);      });    if (!purchase) { -    return; +    return OperationAttemptResult.finishedEmpty();    }    if (!purchase.refundQueryRequested) { -    return; +    return OperationAttemptResult.finishedEmpty();    }    if (purchase.timestampFirstSuccessfulPay) { @@ -780,7 +693,9 @@ async function processPurchaseQueryRefundImpl(          purchase,          waitForAutoRefund,        ); -      if (Amounts.isZero(awaitingAmount)) return; +      if (Amounts.isZero(awaitingAmount)) { +        return OperationAttemptResult.finishedEmpty(); +      }      }      const requestUrl = new URL( @@ -873,6 +788,7 @@ async function processPurchaseQueryRefundImpl(      }      await acceptRefunds(ws, proposalId, refunds, RefundReason.AbortRefund);    } +  return OperationAttemptResult.finishedEmpty();  }  export async function abortFailedPayWithRefund( @@ -899,8 +815,6 @@ export async function abortFailedPayWithRefund(        purchase.refundQueryRequested = true;        purchase.paymentSubmitPending = false;        purchase.abortStatus = AbortStatus.AbortRefund; -      purchase.lastPayError = undefined; -      purchase.payRetryInfo = RetryInfo.reset();        await tx.purchases.put(purchase);      });    processPurchaseQueryRefund(ws, proposalId, { diff --git a/packages/taler-wallet-core/src/operations/tip.ts b/packages/taler-wallet-core/src/operations/tip.ts index 7148999c5..f19be91b2 100644 --- a/packages/taler-wallet-core/src/operations/tip.ts +++ b/packages/taler-wallet-core/src/operations/tip.ts @@ -18,29 +18,45 @@   * Imports.   */  import { -  Amounts, BlindedDenominationSignature, -  codecForMerchantTipResponseV2, codecForTipPickupGetResponse, DenomKeyType, encodeCrock, getRandomBytes, j2s, Logger, NotificationType, parseTipUri, PrepareTipResult, TalerErrorCode, TalerErrorDetail, TalerProtocolTimestamp, TipPlanchetDetail, URL +  Amounts, +  BlindedDenominationSignature, +  codecForMerchantTipResponseV2, +  codecForTipPickupGetResponse, +  DenomKeyType, +  encodeCrock, +  getRandomBytes, +  j2s, +  Logger, +  parseTipUri, +  PrepareTipResult, +  TalerErrorCode, +  TalerProtocolTimestamp, +  TipPlanchetDetail, +  URL,  } from "@gnu-taler/taler-util";  import { DerivedTipPlanchet } from "../crypto/cryptoTypes.js";  import {    CoinRecord,    CoinSourceType, -  CoinStatus, DenominationRecord, TipRecord +  CoinStatus, +  DenominationRecord, +  OperationAttemptResult, +  OperationAttemptResultType, +  TipRecord,  } from "../db.js";  import { makeErrorDetail } from "../errors.js";  import { InternalWalletState } from "../internal-wallet-state.js";  import {    getHttpResponseErrorDetails, -  readSuccessResponseJsonOrThrow +  readSuccessResponseJsonOrThrow,  } from "../util/http.js";  import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js"; -import { -  RetryInfo -} from "../util/retries.js"; -import { guardOperationException } from "./common.js";  import { updateExchangeFromUrl } from "./exchanges.js";  import { -  getCandidateWithdrawalDenoms, getExchangeWithdrawalInfo, selectWithdrawalDenominations, updateWithdrawalDenoms +  getCandidateWithdrawalDenoms, +  getExchangeWithdrawalInfo, +  selectWithdrawalDenominations, +  updateWithdrawalDenoms,  } from "./withdraw.js";  const logger = new Logger("operations/tip.ts"); @@ -114,8 +130,6 @@ export async function prepareTip(        createdTimestamp: TalerProtocolTimestamp.now(),        merchantTipId: res.merchantTipId,        tipAmountEffective: selectedDenoms.totalCoinValue, -      retryInfo: RetryInfo.reset(), -      lastError: undefined,        denomsSel: selectedDenoms,        pickedUpTimestamp: undefined,        secretSeed, @@ -144,82 +158,13 @@ export async function prepareTip(    return tipStatus;  } -async function reportTipError( -  ws: InternalWalletState, -  walletTipId: string, -  err: TalerErrorDetail, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      tips: x.tips, -    })) -    .runReadWrite(async (tx) => { -      const t = await tx.tips.get(walletTipId); -      if (!t) { -        return; -      } -      if (!t.retryInfo) { -        logger.reportBreak(); -      } -      t.lastError = err; -      await tx.tips.put(t); -    }); -  if (err) { -    ws.notify({ type: NotificationType.TipOperationError, error: err }); -  } -} - -async function setupTipRetry( -  ws: InternalWalletState, -  walletTipId: string, -  options: { -    reset: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ -      tips: x.tips, -    })) -    .runReadWrite(async (tx) => { -      const t = await tx.tips.get(walletTipId); -      if (!t) { -        return; -      } -      if (options.reset) { -        t.retryInfo = RetryInfo.reset(); -      } else { -        t.retryInfo = RetryInfo.increment(t.retryInfo); -      } -      delete t.lastError; -      await tx.tips.put(t); -    }); -} -  export async function processTip(    ws: InternalWalletState, -  tipId: string, -  options: { -    forceNow?: boolean; -  } = {}, -): Promise<void> { -  const onOpErr = (e: TalerErrorDetail): Promise<void> => -    reportTipError(ws, tipId, e); -  await guardOperationException( -    () => processTipImpl(ws, tipId, options), -    onOpErr, -  ); -} - -async function processTipImpl( -  ws: InternalWalletState,    walletTipId: string,    options: {      forceNow?: boolean;    } = {}, -): Promise<void> { -  const forceNow = options.forceNow ?? false; -  await setupTipRetry(ws, walletTipId, { reset: forceNow }); - +): Promise<OperationAttemptResult> {    const tipRecord = await ws.db      .mktx((x) => ({        tips: x.tips, @@ -228,12 +173,18 @@ async function processTipImpl(        return tx.tips.get(walletTipId);      });    if (!tipRecord) { -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    if (tipRecord.pickedUpTimestamp) {      logger.warn("tip already picked up"); -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    const denomsForWithdraw = tipRecord.denomsSel; @@ -284,22 +235,21 @@ async function processTipImpl(    logger.trace(`got tip response, status ${merchantResp.status}`); -  // Hide transient errors. +  // FIXME: Why do we do this?    if ( -    tipRecord.retryInfo.retryCounter < 5 && -    ((merchantResp.status >= 500 && merchantResp.status <= 599) || -      merchantResp.status === 424) +    (merchantResp.status >= 500 && merchantResp.status <= 599) || +    merchantResp.status === 424    ) {      logger.trace(`got transient tip error`);      // FIXME: wrap in another error code that indicates a transient error -    const err = makeErrorDetail( -      TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, -      getHttpResponseErrorDetails(merchantResp), -      "tip pickup failed (transient)", -    ); -    await reportTipError(ws, tipRecord.walletTipId, err); -    // FIXME: Maybe we want to signal to the caller that the transient error happened? -    return; +    return { +      type: OperationAttemptResultType.Error, +      errorDetail: makeErrorDetail( +        TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR, +        getHttpResponseErrorDetails(merchantResp), +        "tip pickup failed (transient)", +      ), +    };    }    let blindedSigs: BlindedDenominationSignature[] = []; @@ -344,21 +294,14 @@ async function processTipImpl(      });      if (!isValid) { -      await ws.db -        .mktx((x) => ({ tips: x.tips })) -        .runReadWrite(async (tx) => { -          const tipRecord = await tx.tips.get(walletTipId); -          if (!tipRecord) { -            return; -          } -          tipRecord.lastError = makeErrorDetail( -            TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID, -            {}, -            "invalid signature from the exchange (via merchant tip) after unblinding", -          ); -          await tx.tips.put(tipRecord); -        }); -      return; +      return { +        type: OperationAttemptResultType.Error, +        errorDetail: makeErrorDetail( +          TalerErrorCode.WALLET_TIPPING_COIN_SIGNATURE_INVALID, +          {}, +          "invalid signature from the exchange (via merchant tip) after unblinding", +        ), +      };      }      newCoinRecords.push({ @@ -395,13 +338,16 @@ async function processTipImpl(          return;        }        tr.pickedUpTimestamp = TalerProtocolTimestamp.now(); -      tr.lastError = undefined; -      tr.retryInfo = RetryInfo.reset();        await tx.tips.put(tr);        for (const cr of newCoinRecords) {          await tx.coins.put(cr);        }      }); + +  return { +    type: OperationAttemptResultType.Finished, +    result: undefined, +  };  }  export async function acceptTip( diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index 956d565a6..5a96fc6ff 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -38,7 +38,6 @@ import { InternalWalletState } from "../internal-wallet-state.js";  import {    AbortStatus,    RefundState, -  ReserveRecordStatus,    WalletRefundItem,    WithdrawalRecordType,  } from "../db.js"; @@ -48,6 +47,7 @@ import { processPurchasePay } from "./pay.js";  import { processRefreshGroup } from "./refresh.js";  import { processTip } from "./tip.js";  import { processWithdrawalGroup } from "./withdraw.js"; +import { RetryTags } from "../util/retries.js";  const logger = new Logger("taler-wallet-core:transactions.ts"); @@ -142,6 +142,7 @@ export async function getTransactions(        tombstones: x.tombstones,        peerPushPaymentInitiations: x.peerPushPaymentInitiations,        peerPullPaymentIncoming: x.peerPullPaymentIncoming, +      operationRetries: x.operationRetries,      }))      .runReadOnly(async (tx) => {        tx.peerPushPaymentInitiations.iter().forEachAsync(async (pi) => { @@ -220,6 +221,10 @@ export async function getTransactions(          if (shouldSkipSearch(transactionsRequest, [])) {            return;          } + +        const opId = RetryTags.forWithdrawal(wsr); +        const ort = await tx.operationRetries.get(opId); +          let withdrawalDetails: WithdrawalDetails;          if (wsr.wgInfo.withdrawalType === WithdrawalRecordType.PeerPullCredit) {            transactions.push({ @@ -242,7 +247,7 @@ export async function getTransactions(                wsr.withdrawalGroupId,              ),              frozen: false, -            ...(wsr.lastError ? { error: wsr.lastError } : {}), +            ...(ort?.lastError ? { error: ort.lastError } : {}),            });            return;          } else if ( @@ -264,7 +269,7 @@ export async function getTransactions(                wsr.withdrawalGroupId,              ),              frozen: false, -            ...(wsr.lastError ? { error: wsr.lastError } : {}), +            ...(ort?.lastError ? { error: ort.lastError } : {}),            });            return;          } else if ( @@ -310,7 +315,7 @@ export async function getTransactions(              wsr.withdrawalGroupId,            ),            frozen: false, -          ...(wsr.lastError ? { error: wsr.lastError } : {}), +          ...(ort?.lastError ? { error: ort.lastError } : {}),          });        }); @@ -319,7 +324,8 @@ export async function getTransactions(          if (shouldSkipCurrency(transactionsRequest, amount.currency)) {            return;          } - +        const opId = RetryTags.forDeposit(dg); +        const retryRecord = await tx.operationRetries.get(opId);          transactions.push({            type: TransactionType.Deposit,            amountRaw: Amounts.stringify(dg.effectiveDepositAmount), @@ -333,7 +339,7 @@ export async function getTransactions(              dg.depositGroupId,            ),            depositGroupId: dg.depositGroupId, -          ...(dg.lastError ? { error: dg.lastError } : {}), +          ...(retryRecord?.lastError ? { error: retryRecord.lastError } : {}),          });        }); @@ -456,7 +462,15 @@ export async function getTransactions(            });          } -        const err = pr.lastPayError ?? pr.lastRefundStatusError; +        const payOpId = RetryTags.forPay(pr); +        const refundQueryOpId = RetryTags.forRefundQuery(pr); +        const payRetryRecord = await tx.operationRetries.get(payOpId); +        const refundQueryRetryRecord = await tx.operationRetries.get( +          refundQueryOpId, +        ); + +        const err = +          refundQueryRetryRecord?.lastError ?? payRetryRecord?.lastError;          transactions.push({            type: TransactionType.Payment,            amountRaw: Amounts.stringify(contractData.amount), @@ -495,6 +509,8 @@ export async function getTransactions(          if (!tipRecord.acceptedTimestamp) {            return;          } +        const opId = RetryTags.forTipPickup(tipRecord); +        const retryRecord = await tx.operationRetries.get(opId);          transactions.push({            type: TransactionType.Tip,            amountEffective: Amounts.stringify(tipRecord.tipAmountEffective), @@ -507,10 +523,7 @@ export async function getTransactions(              tipRecord.walletTipId,            ),            merchantBaseUrl: tipRecord.merchantBaseUrl, -          // merchant: { -          //   name: tipRecord.merchantBaseUrl, -          // }, -          error: tipRecord.lastError, +          error: retryRecord?.lastError,          });        });      }); @@ -589,7 +602,11 @@ export async function deleteTransaction(  ): Promise<void> {    const [typeStr, ...rest] = transactionId.split(":");    const type = typeStr as TransactionType; -  if (type === TransactionType.Withdrawal || type === TransactionType.PeerPullCredit || type === TransactionType.PeerPushCredit) { +  if ( +    type === TransactionType.Withdrawal || +    type === TransactionType.PeerPullCredit || +    type === TransactionType.PeerPushCredit +  ) {      const withdrawalGroupId = rest[0];      await ws.db        .mktx((x) => ({ @@ -714,7 +731,9 @@ export async function deleteTransaction(          tombstones: x.tombstones,        }))        .runReadWrite(async (tx) => { -        const debit = await tx.peerPullPaymentIncoming.get(peerPullPaymentIncomingId); +        const debit = await tx.peerPullPaymentIncoming.get( +          peerPullPaymentIncomingId, +        );          if (debit) {            await tx.peerPullPaymentIncoming.delete(peerPullPaymentIncomingId);            await tx.tombstones.put({ @@ -737,10 +756,7 @@ export async function deleteTransaction(          if (debit) {            await tx.peerPushPaymentInitiations.delete(pursePub);            await tx.tombstones.put({ -            id: makeEventId( -              TombstoneTag.DeletePeerPushDebit, -              pursePub, -            ), +            id: makeEventId(TombstoneTag.DeletePeerPushDebit, pursePub),            });          }        }); diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index b80745316..ce5863b31 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -56,7 +56,6 @@ import {    WithdrawBatchResponse,    WithdrawResponse,    WithdrawUriInfoResponse, -  WithdrawUriResult,  } from "@gnu-taler/taler-util";  import { EddsaKeypair } from "../crypto/cryptoImplementation.js";  import { @@ -68,9 +67,10 @@ import {    DenomSelectionState,    ExchangeDetailsRecord,    ExchangeRecord, +  OperationAttemptResult, +  OperationAttemptResultType,    OperationStatus,    PlanchetRecord, -  ReserveBankInfo,    ReserveRecordStatus,    WalletStoresV1,    WgInfo, @@ -98,7 +98,6 @@ import {    WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,    WALLET_EXCHANGE_PROTOCOL_VERSION,  } from "../versions.js"; -import { guardOperationException } from "./common.js";  import {    getExchangeDetails,    getExchangePaytoUri, @@ -691,31 +690,12 @@ async function processPlanchetExchangeBatchRequest(      withdrawalGroup.exchangeBaseUrl,    ).href; -  try { -    const resp = await ws.http.postJson(reqUrl, d); -    const r = await readSuccessResponseJsonOrThrow( -      resp, -      codecForWithdrawBatchResponse(), -    ); -    return r; -  } catch (e) { -    const errDetail = getErrorDetailFromException(e); -    logger.trace("withdrawal batch request failed", e); -    logger.trace(e); -    await ws.db -      .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) -      .runReadWrite(async (tx) => { -        let wg = await tx.withdrawalGroups.get( -          withdrawalGroup.withdrawalGroupId, -        ); -        if (!wg) { -          return; -        } -        wg.lastError = errDetail; -        await tx.withdrawalGroups.put(wg); -      }); -    return; -  } +  const resp = await ws.http.postJson(reqUrl, d); +  const r = await readSuccessResponseJsonOrThrow( +    resp, +    codecForWithdrawBatchResponse(), +  ); +  return r;  }  async function processPlanchetVerifyAndStoreCoin( @@ -951,50 +931,6 @@ export async function updateWithdrawalDenoms(    }  } -async function setupWithdrawalRetry( -  ws: InternalWalletState, -  withdrawalGroupId: string, -  options: { -    reset: boolean; -  }, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) -    .runReadWrite(async (tx) => { -      const wsr = await tx.withdrawalGroups.get(withdrawalGroupId); -      if (!wsr) { -        return; -      } -      if (options.reset) { -        wsr.retryInfo = RetryInfo.reset(); -      } else { -        wsr.retryInfo = RetryInfo.increment(wsr.retryInfo); -      } -      await tx.withdrawalGroups.put(wsr); -    }); -} - -async function reportWithdrawalError( -  ws: InternalWalletState, -  withdrawalGroupId: string, -  err: TalerErrorDetail, -): Promise<void> { -  await ws.db -    .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups })) -    .runReadWrite(async (tx) => { -      const wsr = await tx.withdrawalGroups.get(withdrawalGroupId); -      if (!wsr) { -        return; -      } -      if (!wsr.retryInfo) { -        logger.reportBreak(); -      } -      wsr.lastError = err; -      await tx.withdrawalGroups.put(wsr); -    }); -  ws.notify({ type: NotificationType.WithdrawOperationError, error: err }); -} -  /**   * Update the information about a reserve that is stored in the wallet   * by querying the reserve's exchange. @@ -1071,28 +1007,9 @@ async function queryReserve(  export async function processWithdrawalGroup(    ws: InternalWalletState,    withdrawalGroupId: string, -  options: { -    forceNow?: boolean; -  } = {}, -): Promise<void> { -  const onOpErr = (e: TalerErrorDetail): Promise<void> => -    reportWithdrawalError(ws, withdrawalGroupId, e); -  await guardOperationException( -    () => processWithdrawGroupImpl(ws, withdrawalGroupId, options), -    onOpErr, -  ); -} - -async function processWithdrawGroupImpl( -  ws: InternalWalletState, -  withdrawalGroupId: string, -  options: { -    forceNow?: boolean; -  } = {}, -): Promise<void> { -  const forceNow = options.forceNow ?? false; -  logger.trace("processing withdraw group", withdrawalGroupId); -  await setupWithdrawalRetry(ws, withdrawalGroupId, { reset: forceNow }); +  options: {} = {}, +): Promise<OperationAttemptResult> { +  logger.trace("processing withdrawal group", withdrawalGroupId);    const withdrawalGroup = await ws.db      .mktx((x) => ({ withdrawalGroups: x.withdrawalGroups }))      .runReadOnly(async (tx) => { @@ -1106,24 +1023,44 @@ async function processWithdrawGroupImpl(    switch (withdrawalGroup.reserveStatus) {      case ReserveRecordStatus.RegisteringBank:        await processReserveBankStatus(ws, withdrawalGroupId); -      return await processWithdrawGroupImpl(ws, withdrawalGroupId, { +      return await processWithdrawalGroup(ws, withdrawalGroupId, {          forceNow: true,        });      case ReserveRecordStatus.QueryingStatus: {        const res = await queryReserve(ws, withdrawalGroupId);        if (res.ready) { -        return await processWithdrawGroupImpl(ws, withdrawalGroupId, { +        return await processWithdrawalGroup(ws, withdrawalGroupId, {            forceNow: true,          });        } -      return; +      return { +        type: OperationAttemptResultType.Pending, +        result: undefined, +      }; +    } +    case ReserveRecordStatus.WaitConfirmBank: { +      const res = await processReserveBankStatus(ws, withdrawalGroupId); +      switch (res.status) { +        case BankStatusResultCode.Aborted: +        case BankStatusResultCode.Done: +          return { +            type: OperationAttemptResultType.Finished, +            result: undefined, +          }; +        case BankStatusResultCode.Waiting: { +          return { +            type: OperationAttemptResultType.Pending, +            result: undefined, +          }; +        } +      }      } -    case ReserveRecordStatus.WaitConfirmBank: -      await processReserveBankStatus(ws, withdrawalGroupId); -      return;      case ReserveRecordStatus.BankAborted:        // FIXME -      return; +      return { +        type: OperationAttemptResultType.Pending, +        result: undefined, +      };      case ReserveRecordStatus.Dormant:        // We can try to withdraw, nothing needs to be done with the reserve.        break; @@ -1150,11 +1087,12 @@ async function processWithdrawGroupImpl(            return;          }          wg.operationStatus = OperationStatus.Finished; -        delete wg.lastError; -        delete wg.retryInfo;          await tx.withdrawalGroups.put(wg);        }); -    return; +    return { +      type: OperationAttemptResultType.Finished, +      result: undefined, +    };    }    const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms @@ -1175,7 +1113,7 @@ async function processWithdrawGroupImpl(    if (ws.batchWithdrawal) {      const resp = await processPlanchetExchangeBatchRequest(ws, withdrawalGroup);      if (!resp) { -      return; +      throw Error("unable to do batch withdrawal");      }      for (let coinIdx = 0; coinIdx < numTotalCoins; coinIdx++) {        work.push( @@ -1236,8 +1174,6 @@ async function processWithdrawGroupImpl(          finishedForFirstTime = true;          wg.timestampFinish = TalerProtocolTimestamp.now();          wg.operationStatus = OperationStatus.Finished; -        delete wg.lastError; -        wg.retryInfo = RetryInfo.reset();        }        await tx.withdrawalGroups.put(wg); @@ -1259,6 +1195,11 @@ async function processWithdrawGroupImpl(        reservePub: withdrawalGroup.reservePub,      });    } + +  return { +    type: OperationAttemptResultType.Finished, +    result: undefined, +  };  }  const AGE_MASK_GROUPS = "8:10:12:14:16:18".split(":").map(n => parseInt(n, 10)) @@ -1529,10 +1470,7 @@ async function getWithdrawalGroupRecordTx(  }  export function getReserveRequestTimeout(r: WithdrawalGroupRecord): Duration { -  return Duration.max( -    { d_ms: 60000 }, -    Duration.min({ d_ms: 5000 }, RetryInfo.getDuration(r.retryInfo)), -  ); +  return { d_ms: 60000 };  }  export function getBankStatusUrl(talerWithdrawUri: string): string { @@ -1611,17 +1549,25 @@ async function registerReserveWithBank(        );        r.reserveStatus = ReserveRecordStatus.WaitConfirmBank;        r.operationStatus = OperationStatus.Pending; -      r.retryInfo = RetryInfo.reset();        await tx.withdrawalGroups.put(r);      });    ws.notify({ type: NotificationType.ReserveRegisteredWithBank }); -  return processReserveBankStatus(ws, withdrawalGroupId); +} + +enum BankStatusResultCode { +  Done = "done", +  Waiting = "waiting", +  Aborted = "aborted", +} + +interface BankStatusResult { +  status: BankStatusResultCode;  }  async function processReserveBankStatus(    ws: InternalWalletState,    withdrawalGroupId: string, -): Promise<void> { +): Promise<BankStatusResult> {    const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, {      withdrawalGroupId,    }); @@ -1630,17 +1576,21 @@ async function processReserveBankStatus(      case ReserveRecordStatus.RegisteringBank:        break;      default: -      return; +      return { +        status: BankStatusResultCode.Done, +      };    }    if (      withdrawalGroup.wgInfo.withdrawalType != WithdrawalRecordType.BankIntegrated    ) { -    throw Error(); +    throw Error("wrong withdrawal record type");    }    const bankInfo = withdrawalGroup.wgInfo.bankInfo;    if (!bankInfo) { -    return; +    return { +      status: BankStatusResultCode.Done, +    };    }    const bankStatusUrl = getBankStatusUrl(bankInfo.talerWithdrawUri); @@ -1678,10 +1628,11 @@ async function processReserveBankStatus(          r.wgInfo.bankInfo.timestampBankConfirmed = now;          r.reserveStatus = ReserveRecordStatus.BankAborted;          r.operationStatus = OperationStatus.Finished; -        r.retryInfo = RetryInfo.reset();          await tx.withdrawalGroups.put(r);        }); -    return; +    return { +      status: BankStatusResultCode.Aborted, +    };    }    // Bank still needs to know our reserve info @@ -1722,15 +1673,17 @@ async function processReserveBankStatus(          r.wgInfo.bankInfo.timestampBankConfirmed = now;          r.reserveStatus = ReserveRecordStatus.QueryingStatus;          r.operationStatus = OperationStatus.Pending; -        r.retryInfo = RetryInfo.reset();        } else {          logger.info("withdrawal: transfer not yet confirmed by bank");          r.wgInfo.bankInfo.confirmUrl = status.confirm_transfer_url;          r.senderWire = status.sender_wire; -        r.retryInfo = RetryInfo.increment(r.retryInfo);        }        await tx.withdrawalGroups.put(r);      }); + +  return { +    status: BankStatusResultCode.Done, +  };  }  export async function internalCreateWithdrawalGroup( @@ -1775,14 +1728,12 @@ export async function internalCreateWithdrawalGroup(      exchangeBaseUrl: canonExchange,      instructedAmount: amount,      timestampStart: now, -    lastError: undefined,      operationStatus: OperationStatus.Pending,      rawWithdrawalAmount: initialDenomSel.totalWithdrawCost,      secretSeed,      reservePriv: reserveKeyPair.priv,      reservePub: reserveKeyPair.pub,      reserveStatus: args.reserveStatus, -    retryInfo: RetryInfo.reset(),      withdrawalGroupId,      restrictAge: args.restrictAge,      senderWire: undefined, diff --git a/packages/taler-wallet-core/src/pending-types.ts b/packages/taler-wallet-core/src/pending-types.ts index 39df9d0cb..61c7136df 100644 --- a/packages/taler-wallet-core/src/pending-types.ts +++ b/packages/taler-wallet-core/src/pending-types.ts @@ -30,14 +30,12 @@ import {    AbsoluteTime,    TalerProtocolTimestamp,  } from "@gnu-taler/taler-util"; -import { ReserveRecordStatus } from "./db.js";  import { RetryInfo } from "./util/retries.js";  export enum PendingTaskType {    ExchangeUpdate = "exchange-update",    ExchangeCheckRefresh = "exchange-check-refresh",    Pay = "pay", -  ProposalChoice = "proposal-choice",    ProposalDownload = "proposal-download",    Refresh = "refresh",    Recoup = "recoup", @@ -109,7 +107,7 @@ export interface PendingRefreshTask {    lastError?: TalerErrorDetail;    refreshGroupId: string;    finishedPerCoin: boolean[]; -  retryInfo: RetryInfo; +  retryInfo?: RetryInfo;  }  /** @@ -126,17 +124,6 @@ export interface PendingProposalDownloadTask {  }  /** - * User must choose whether to accept or reject the merchant's - * proposed contract terms. - */ -export interface PendingProposalChoiceOperation { -  type: PendingTaskType.ProposalChoice; -  merchantBaseUrl: string; -  proposalTimestamp: AbsoluteTime; -  proposalId: string; -} - -/**   * The wallet is picking up a tip that the user has accepted.   */  export interface PendingTipPickupTask { @@ -165,14 +152,14 @@ export interface PendingPayTask {  export interface PendingRefundQueryTask {    type: PendingTaskType.RefundQuery;    proposalId: string; -  retryInfo: RetryInfo; +  retryInfo?: RetryInfo;    lastError: TalerErrorDetail | undefined;  }  export interface PendingRecoupTask {    type: PendingTaskType.Recoup;    recoupGroupId: string; -  retryInfo: RetryInfo; +  retryInfo?: RetryInfo;    lastError: TalerErrorDetail | undefined;  } @@ -206,6 +193,11 @@ export interface PendingTaskInfoCommon {    type: PendingTaskType;    /** +   * Unique identifier for the pending task. +   */ +  id: string; + +  /**     * Set to true if the operation indicates that something is really in progress,     * as opposed to some regular scheduled operation that can be tried later.     */ diff --git a/packages/taler-wallet-core/src/util/query.ts b/packages/taler-wallet-core/src/util/query.ts index e954e5c78..65b67eff2 100644 --- a/packages/taler-wallet-core/src/util/query.ts +++ b/packages/taler-wallet-core/src/util/query.ts @@ -152,6 +152,19 @@ class ResultStream<T> {      return arr;    } +  async mapAsync<R>(f: (x: T) => Promise<R>): Promise<R[]> { +    const arr: R[] = []; +    while (true) { +      const x = await this.next(); +      if (x.hasValue) { +        arr.push(await f(x.value)); +      } else { +        break; +      } +    } +    return arr; +  } +    async forEachAsync(f: (x: T) => Promise<void>): Promise<void> {      while (true) {        const x = await this.next(); @@ -572,6 +585,26 @@ function makeWriteContext(    return ctx;  } +const storeList = [ +  { name: "foo" as const, value: 1 as const }, +  { name: "bar" as const, value: 2 as const }, +]; +// => { foo: { value: 1}, bar: {value: 2} } + +type StoreList = typeof storeList; + +type StoreNames = StoreList[number] extends { name: infer I } ? I : never; + +type H = StoreList[number] & { name: "foo"}; + +type Cleanup<V> = V extends { name: infer N, value: infer X} ? {name: N, value: X} : never; + +type G = { +  [X in StoreNames]: { +    X: StoreList[number] & { name: X }; +  }; +}; +  /**   * Type-safe access to a database with a particular store map.   * @@ -584,6 +617,14 @@ export class DbAccess<StoreMap> {      return this.db;    } +  mktx2< +    StoreNames extends keyof StoreMap, +    Stores extends StoreMap[StoreNames], +    StoreList extends Stores[], +  >(namePicker: (x: StoreMap) => StoreList): StoreList { +    return namePicker(this.stores); +  } +    mktx<      PickerType extends (x: StoreMap) => unknown,      BoundStores extends GetPickerType<PickerType, StoreMap>, diff --git a/packages/taler-wallet-core/src/util/retries.ts b/packages/taler-wallet-core/src/util/retries.ts index 13a05b385..3a41e8348 100644 --- a/packages/taler-wallet-core/src/util/retries.ts +++ b/packages/taler-wallet-core/src/util/retries.ts @@ -21,7 +21,29 @@  /**   * Imports.   */ -import { AbsoluteTime, Duration } from "@gnu-taler/taler-util"; +import { +  AbsoluteTime, +  Duration, +  TalerErrorDetail, +} from "@gnu-taler/taler-util"; +import { +  BackupProviderRecord, +  DepositGroupRecord, +  ExchangeRecord, +  OperationAttemptResult, +  OperationAttemptResultType, +  ProposalRecord, +  PurchaseRecord, +  RecoupGroupRecord, +  RefreshGroupRecord, +  TipRecord, +  WalletStoresV1, +  WithdrawalGroupRecord, +} from "../db.js"; +import { TalerError } from "../errors.js"; +import { InternalWalletState } from "../internal-wallet-state.js"; +import { PendingTaskType } from "../pending-types.js"; +import { GetReadWriteAccess } from "./query.js";  export interface RetryInfo {    firstTry: AbsoluteTime; @@ -108,3 +130,95 @@ export namespace RetryInfo {      return r2;    }  } + +export namespace RetryTags { +  export function forWithdrawal(wg: WithdrawalGroupRecord): string { +    return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}`; +  } +  export function forExchangeUpdate(exch: ExchangeRecord): string { +    return `${PendingTaskType.ExchangeUpdate}:${exch.baseUrl}`; +  } +  export function forExchangeCheckRefresh(exch: ExchangeRecord): string { +    return `${PendingTaskType.ExchangeCheckRefresh}:${exch.baseUrl}`; +  } +  export function forProposalClaim(pr: ProposalRecord): string { +    return `${PendingTaskType.ProposalDownload}:${pr.proposalId}`; +  } +  export function forTipPickup(tipRecord: TipRecord): string { +    return `${PendingTaskType.TipPickup}:${tipRecord.walletTipId}`; +  } +  export function forRefresh(refreshGroupRecord: RefreshGroupRecord): string { +    return `${PendingTaskType.TipPickup}:${refreshGroupRecord.refreshGroupId}`; +  } +  export function forPay(purchaseRecord: PurchaseRecord): string { +    return `${PendingTaskType.Pay}:${purchaseRecord.proposalId}`; +  } +  export function forRefundQuery(purchaseRecord: PurchaseRecord): string { +    return `${PendingTaskType.RefundQuery}:${purchaseRecord.proposalId}`; +  } +  export function forRecoup(recoupRecord: RecoupGroupRecord): string { +    return `${PendingTaskType.Recoup}:${recoupRecord.recoupGroupId}`; +  } +  export function forDeposit(depositRecord: DepositGroupRecord): string { +    return `${PendingTaskType.Deposit}:${depositRecord.depositGroupId}`; +  } +  export function forBackup(backupRecord: BackupProviderRecord): string { +    return `${PendingTaskType.Backup}:${backupRecord.baseUrl}`; +  } +} + +export async function scheduleRetryInTx( +  ws: InternalWalletState, +  tx: GetReadWriteAccess<{ +    operationRetries: typeof WalletStoresV1.operationRetries; +  }>, +  opId: string, +  errorDetail?: TalerErrorDetail, +): Promise<void> { +  let retryRecord = await tx.operationRetries.get(opId); +  if (!retryRecord) { +    retryRecord = { +      id: opId, +      retryInfo: RetryInfo.reset(), +    }; +    if (errorDetail) { +      retryRecord.lastError = errorDetail; +    } +  } else { +    retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); +    if (errorDetail) { +      retryRecord.lastError = errorDetail; +    } else { +      delete retryRecord.lastError; +    } +  } +  await tx.operationRetries.put(retryRecord); +} + +export async function scheduleRetry( +  ws: InternalWalletState, +  opId: string, +  errorDetail?: TalerErrorDetail, +): Promise<void> { +  return await ws.db +    .mktx((x) => ({ operationRetries: x.operationRetries })) +    .runReadWrite(async (tx) => { +      scheduleRetryInTx(ws, tx, opId, errorDetail); +    }); +} + +/** + * Run an operation handler, expect a success result and extract the success value. + */ +export async function runOperationHandlerForResult<T>( +  res: OperationAttemptResult<T>, +): Promise<T> { +  switch (res.type) { +    case OperationAttemptResultType.Finished: +      return res.result; +    case OperationAttemptResultType.Error: +      throw TalerError.fromUncheckedDetail(res.errorDetail); +    default: +      throw Error(`unexpected operation result (${res.type})`); +  } +} diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 779fe9528..f041d9aa9 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -90,6 +90,7 @@ import {    ExchangeListItem,    OperationMap,    FeeDescription, +  TalerErrorDetail,  } from "@gnu-taler/taler-util";  import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";  import { @@ -101,9 +102,15 @@ import {    CoinSourceType,    exportDb,    importDb, +  OperationAttemptResult, +  OperationAttemptResultType,    WalletStoresV1,  } from "./db.js"; -import { getErrorDetailFromException, TalerError } from "./errors.js"; +import { +  getErrorDetailFromException, +  makeErrorDetail, +  TalerError, +} from "./errors.js";  import { createDenominationTimeline } from "./index.browser.js";  import {    DenomInfo, @@ -143,6 +150,7 @@ import {    getExchangeRequestTimeout,    getExchangeTrust,    updateExchangeFromUrl, +  updateExchangeFromUrlHandler,    updateExchangeTermsOfService,  } from "./operations/exchanges.js";  import { getMerchantInfo } from "./operations/merchants.js"; @@ -162,7 +170,11 @@ import {    initiatePeerToPeerPush,  } from "./operations/peer-to-peer.js";  import { getPendingOperations } from "./operations/pending.js"; -import { createRecoupGroup, processRecoupGroup } from "./operations/recoup.js"; +import { +  createRecoupGroup, +  processRecoupGroup, +  processRecoupGroupHandler, +} from "./operations/recoup.js";  import {    autoRefresh,    createRefreshGroup, @@ -210,6 +222,7 @@ import {    openPromise,  } from "./util/promiseUtils.js";  import { DbAccess, GetReadWriteAccess } from "./util/query.js"; +import { RetryInfo, runOperationHandlerForResult } from "./util/retries.js";  import { TimerAPI, TimerGroup } from "./util/timer.js";  import {    WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, @@ -237,7 +250,12 @@ async function getWithdrawalDetailsForAmount(    amount: AmountJson,    restrictAge: number | undefined,  ): Promise<ManualWithdrawalDetails> { -  const wi = await getExchangeWithdrawalInfo(ws, exchangeBaseUrl, amount, restrictAge); +  const wi = await getExchangeWithdrawalInfo( +    ws, +    exchangeBaseUrl, +    amount, +    restrictAge, +  );    const paytoUris = wi.exchangeDetails.wireInfo.accounts.map(      (x) => x.payto_uri,    ); @@ -253,55 +271,153 @@ async function getWithdrawalDetailsForAmount(  }  /** - * Execute one operation based on the pending operation info record. + * Call the right handler for a pending operation without doing + * any special error handling.   */ -async function processOnePendingOperation( +async function callOperationHandler(    ws: InternalWalletState,    pending: PendingTaskInfo,    forceNow = false, -): Promise<void> { -  logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`); +): Promise<OperationAttemptResult<unknown, unknown>> {    switch (pending.type) {      case PendingTaskType.ExchangeUpdate: -      await updateExchangeFromUrl(ws, pending.exchangeBaseUrl, { +      return await updateExchangeFromUrlHandler(ws, pending.exchangeBaseUrl, {          forceNow,        }); -      break;      case PendingTaskType.Refresh: -      await processRefreshGroup(ws, pending.refreshGroupId, { forceNow }); -      break; +      return await processRefreshGroup(ws, pending.refreshGroupId, { +        forceNow, +      });      case PendingTaskType.Withdraw:        await processWithdrawalGroup(ws, pending.withdrawalGroupId, { forceNow });        break;      case PendingTaskType.ProposalDownload: -      await processDownloadProposal(ws, pending.proposalId, { forceNow }); -      break; +      return await processDownloadProposal(ws, pending.proposalId, { +        forceNow, +      });      case PendingTaskType.TipPickup: -      await processTip(ws, pending.tipId, { forceNow }); -      break; +      return await processTip(ws, pending.tipId, { forceNow });      case PendingTaskType.Pay: -      await processPurchasePay(ws, pending.proposalId, { forceNow }); -      break; +      return await processPurchasePay(ws, pending.proposalId, { forceNow });      case PendingTaskType.RefundQuery: -      await processPurchaseQueryRefund(ws, pending.proposalId, { forceNow }); -      break; +      return await processPurchaseQueryRefund(ws, pending.proposalId, { +        forceNow, +      });      case PendingTaskType.Recoup: -      await processRecoupGroup(ws, pending.recoupGroupId, { forceNow }); -      break; +      return await processRecoupGroupHandler(ws, pending.recoupGroupId, { +        forceNow, +      });      case PendingTaskType.ExchangeCheckRefresh: -      await autoRefresh(ws, pending.exchangeBaseUrl); -      break; +      return await autoRefresh(ws, pending.exchangeBaseUrl);      case PendingTaskType.Deposit: { -      await processDepositGroup(ws, pending.depositGroupId, { +      return await processDepositGroup(ws, pending.depositGroupId, {          forceNow,        }); -      break;      }      case PendingTaskType.Backup: -      await processBackupForProvider(ws, pending.backupProviderBaseUrl); -      break; +      return await processBackupForProvider(ws, pending.backupProviderBaseUrl);      default: -      assertUnreachable(pending); +      return assertUnreachable(pending); +  } +  throw Error("not reached"); +} + +export async function storeOperationError( +  ws: InternalWalletState, +  pendingTaskId: string, +  e: TalerErrorDetail, +): Promise<void> { +  await ws.db +    .mktx((x) => ({ operationRetries: x.operationRetries })) +    .runReadWrite(async (tx) => { +      const retryRecord = await tx.operationRetries.get(pendingTaskId); +      if (!retryRecord) { +        return; +      } +      retryRecord.lastError = e; +      retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); +      await tx.operationRetries.put(retryRecord); +    }); +} + +export async function storeOperationFinished( +  ws: InternalWalletState, +  pendingTaskId: string, +): Promise<void> { +  await ws.db +    .mktx((x) => ({ operationRetries: x.operationRetries })) +    .runReadWrite(async (tx) => { +      await tx.operationRetries.delete(pendingTaskId); +    }); +} + +export async function storeOperationPending( +  ws: InternalWalletState, +  pendingTaskId: string, +): Promise<void> { +  await ws.db +    .mktx((x) => ({ operationRetries: x.operationRetries })) +    .runReadWrite(async (tx) => { +      const retryRecord = await tx.operationRetries.get(pendingTaskId); +      if (!retryRecord) { +        return; +      } +      delete retryRecord.lastError; +      retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); +      await tx.operationRetries.put(retryRecord); +    }); +} + +/** + * Execute one operation based on the pending operation info record. + * + * Store success/failure result in the database. + */ +async function processOnePendingOperation( +  ws: InternalWalletState, +  pending: PendingTaskInfo, +  forceNow = false, +): Promise<void> { +  logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`); +  let maybeError: TalerErrorDetail | undefined; +  try { +    const resp = await callOperationHandler(ws, pending, forceNow); +    switch (resp.type) { +      case OperationAttemptResultType.Error: +        return await storeOperationError(ws, pending.id, resp.errorDetail); +      case OperationAttemptResultType.Finished: +        return await storeOperationFinished(ws, pending.id); +      case OperationAttemptResultType.Pending: +        return await storeOperationPending(ws, pending.id); +      case OperationAttemptResultType.Longpoll: +        break; +    } +  } catch (e: any) { +    if ( +      e instanceof TalerError && +      e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED) +    ) { +      logger.warn("operation processed resulted in error"); +      logger.warn(`error was: ${j2s(e.errorDetail)}`); +      maybeError = e.errorDetail; +    } else { +      // This is a bug, as we expect pending operations to always +      // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED +      // or return something. +      logger.error("Uncaught exception", e); +      ws.notify({ +        type: NotificationType.InternalError, +        message: "uncaught exception", +        exception: e, +      }); +      maybeError = makeErrorDetail( +        TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION, +        { +          stack: e.stack, +        }, +        `unexpected exception (message: ${e.message})`, +      ); +    }    }  } @@ -317,18 +433,7 @@ export async function runPending(      if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) {        continue;      } -    try { -      await processOnePendingOperation(ws, p, forceNow); -    } catch (e) { -      if (e instanceof TalerError) { -        console.error( -          "Pending operation failed:", -          JSON.stringify(e.errorDetail, undefined, 2), -        ); -      } else { -        console.error(e); -      } -    } +    await processOnePendingOperation(ws, p, forceNow);    }  } @@ -420,27 +525,7 @@ async function runTaskLoop(          if (!AbsoluteTime.isExpired(p.timestampDue)) {            continue;          } -        try { -          await processOnePendingOperation(ws, p); -        } catch (e) { -          if ( -            e instanceof TalerError && -            e.hasErrorCode(TalerErrorCode.WALLET_PENDING_OPERATION_FAILED) -          ) { -            logger.warn("operation processed resulted in error"); -            logger.warn(`error was: ${j2s(e.errorDetail)}`); -          } else { -            // This is a bug, as we expect pending operations to always -            // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED -            // or return something. -            logger.error("Uncaught exception", e); -            ws.notify({ -              type: NotificationType.InternalError, -              message: "uncaught exception", -              exception: e, -            }); -          } -        } +        await processOnePendingOperation(ws, p);          ws.notify({            type: NotificationType.PendingOperationProcessed,          }); @@ -629,7 +714,7 @@ async function getExchangeDetailedInfo(        denominations: x.denominations,      }))      .runReadOnly(async (tx) => { -      const ex = await tx.exchanges.get(exchangeBaseurl) +      const ex = await tx.exchanges.get(exchangeBaseurl);        const dp = ex?.detailsPointer;        if (!dp) {          return; @@ -663,11 +748,11 @@ async function getExchangeDetailedInfo(            wireInfo: exchangeDetails.wireInfo,          },          denominations: denominations, -      } +      };      });    if (!exchange) { -    throw Error(`exchange with base url "${exchangeBaseurl}" not found`) +    throw Error(`exchange with base url "${exchangeBaseurl}" not found`);    }    const feesDescription: OperationMap<FeeDescription[]> = { @@ -809,6 +894,7 @@ declare const __GIT_HASH__: string;  const VERSION = typeof __VERSION__ !== "undefined" ? __VERSION__ : "dev";  const GIT_HASH = typeof __GIT_HASH__ !== "undefined" ? __GIT_HASH__ : undefined; +  /**   * Implementation of the "wallet-core" API.   */ @@ -908,7 +994,7 @@ async function dispatchRequestInternal(          ws,          req.exchangeBaseUrl,          Amounts.parseOrThrow(req.amount), -        req.restrictAge +        req.restrictAge,        );      }      case "getBalances": { @@ -1106,7 +1192,7 @@ async function dispatchRequestInternal(          ws,          req.exchange,          amount, -        undefined +        undefined,        );        const wres = await createManualWithdrawal(ws, {          amount: amount, | 
