diff options
Diffstat (limited to 'packages')
14 files changed, 334 insertions, 130 deletions
| diff --git a/packages/taler-wallet-core/src/db.ts b/packages/taler-wallet-core/src/db.ts index a23ba0f76..cbf49c4ca 100644 --- a/packages/taler-wallet-core/src/db.ts +++ b/packages/taler-wallet-core/src/db.ts @@ -69,7 +69,7 @@ import {    StoreDescriptor,    StoreWithIndexes,  } from "./util/query.js"; -import { RetryInfo, RetryTags } from "./util/retries.js"; +import { RetryInfo, TaskIdentifiers } from "./util/retries.js";  /**   * This file contains the database schema of the Taler wallet together @@ -1945,7 +1945,7 @@ export interface OperationRetryRecord {     * Unique identifier for the operation.  Typically of     * the format `${opType}-${opUniqueKey}`     * -   * @see {@link RetryTags} +   * @see {@link TaskIdentifiers}     */    id: string; diff --git a/packages/taler-wallet-core/src/operations/backup/index.ts b/packages/taler-wallet-core/src/operations/backup/index.ts index 3dae26087..59e99b505 100644 --- a/packages/taler-wallet-core/src/operations/backup/index.ts +++ b/packages/taler-wallet-core/src/operations/backup/index.ts @@ -99,7 +99,7 @@ import {  import {    OperationAttemptResult,    OperationAttemptResultType, -  RetryTags, +  TaskIdentifiers,    scheduleRetryInTx,  } from "../../util/retries.js";  import { addAttentionRequest, removeAttentionRequest } from "../attention.js"; @@ -379,7 +379,7 @@ async function runBackupCycleForProvider(              logger.warn("backup provider not found anymore");              return;            } -          const opId = RetryTags.forBackup(prov); +          const opId = TaskIdentifiers.forBackup(prov);            await scheduleRetryInTx(ws, tx, opId);            prov.shouldRetryFreshProposal = true;            prov.state = { @@ -405,7 +405,7 @@ async function runBackupCycleForProvider(            logger.warn("backup provider not found anymore");            return;          } -        const opId = RetryTags.forBackup(prov); +        const opId = TaskIdentifiers.forBackup(prov);          await scheduleRetryInTx(ws, tx, opId);          prov.currentPaymentProposalId = result.proposalId;          prov.shouldRetryFreshProposal = false; @@ -479,7 +479,7 @@ async function runBackupCycleForProvider(          prov.lastBackupHash = encodeCrock(hash(backupEnc));          // FIXME: Allocate error code for this situation?          // FIXME: Add operation retry record! -        const opId = RetryTags.forBackup(prov); +        const opId = TaskIdentifiers.forBackup(prov);          await scheduleRetryInTx(ws, tx, opId);          prov.state = {            tag: BackupProviderStateTag.Retrying, @@ -920,7 +920,7 @@ export async function getBackupInfo(      .mktx((x) => [x.backupProviders, x.operationRetries])      .runReadOnly(async (tx) => {        return await tx.backupProviders.iter().mapAsync(async (bp) => { -        const opId = RetryTags.forBackup(bp); +        const opId = TaskIdentifiers.forBackup(bp);          const retryRecord = await tx.operationRetries.get(opId);          return {            provider: bp, diff --git a/packages/taler-wallet-core/src/operations/common.ts b/packages/taler-wallet-core/src/operations/common.ts index 35e6455bc..e5eda074c 100644 --- a/packages/taler-wallet-core/src/operations/common.ts +++ b/packages/taler-wallet-core/src/operations/common.ts @@ -218,6 +218,23 @@ export async function storeOperationError(      });  } +export async function resetOperationTimeout( +  ws: InternalWalletState, +  pendingTaskId: string, +): Promise<void> { +  await ws.db +    .mktx((x) => [x.operationRetries]) +    .runReadWrite(async (tx) => { +      let retryRecord = await tx.operationRetries.get(pendingTaskId); +      if (retryRecord) { +        // Note that we don't reset the lastError, it should still be visible +        // while the retry runs. +        retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo); +        await tx.operationRetries.put(retryRecord); +      } +    }); +} +  export async function storeOperationPending(    ws: InternalWalletState,    pendingTaskId: string, diff --git a/packages/taler-wallet-core/src/operations/deposits.ts b/packages/taler-wallet-core/src/operations/deposits.ts index 1e696a1d6..22283b7a8 100644 --- a/packages/taler-wallet-core/src/operations/deposits.ts +++ b/packages/taler-wallet-core/src/operations/deposits.ts @@ -122,7 +122,6 @@ export async function processDepositGroup(    ws: InternalWalletState,    depositGroupId: string,    options: { -    forceNow?: boolean;      cancellationToken?: CancellationToken;    } = {},  ): Promise<OperationAttemptResult> { diff --git a/packages/taler-wallet-core/src/operations/exchanges.ts b/packages/taler-wallet-core/src/operations/exchanges.ts index 08d30eac6..457344e06 100644 --- a/packages/taler-wallet-core/src/operations/exchanges.ts +++ b/packages/taler-wallet-core/src/operations/exchanges.ts @@ -73,7 +73,7 @@ import {  import {    OperationAttemptResult,    OperationAttemptResultType, -  RetryTags, +  TaskIdentifiers,    unwrapOperationHandlerResultOrThrow,  } from "../util/retries.js";  import { WALLET_EXCHANGE_PROTOCOL_VERSION } from "../versions.js"; @@ -552,7 +552,7 @@ export async function updateExchangeFromUrl(    return unwrapOperationHandlerResultOrThrow(      await runOperationWithErrorReporting(        ws, -      RetryTags.forExchangeUpdateFromUrl(canonUrl), +      TaskIdentifiers.forExchangeUpdateFromUrl(canonUrl),        () => updateExchangeFromUrlHandler(ws, canonUrl, options),      ),    ); diff --git a/packages/taler-wallet-core/src/operations/pay-merchant.ts b/packages/taler-wallet-core/src/operations/pay-merchant.ts index 19eb40f3a..25153f9fb 100644 --- a/packages/taler-wallet-core/src/operations/pay-merchant.ts +++ b/packages/taler-wallet-core/src/operations/pay-merchant.ts @@ -95,7 +95,7 @@ import {    TalerError,    TalerProtocolViolationError,  } from "@gnu-taler/taler-util"; -import { GetReadWriteAccess } from "../index.js"; +import { GetReadWriteAccess, PendingTaskType } from "../index.js";  import {    EXCHANGE_COINS_LOCK,    InternalWalletState, @@ -119,8 +119,9 @@ import {    OperationAttemptResult,    OperationAttemptResultType,    RetryInfo, -  RetryTags, +  TaskIdentifiers,    scheduleRetry, +  constructTaskIdentifier,  } from "../util/retries.js";  import {    makeTransactionId, @@ -360,7 +361,7 @@ export async function processDownloadProposal(      requestBody.token = proposal.claimToken;    } -  const opId = RetryTags.forPay(proposal); +  const opId = TaskIdentifiers.forPay(proposal);    const retryRecord = await ws.db      .mktx((x) => [x.operationRetries])      .runReadOnly(async (tx) => { @@ -1598,8 +1599,11 @@ export async function runPayForConfirmPay(    proposalId: string,  ): Promise<ConfirmPayResult> {    logger.trace("processing proposal for confirmPay"); -  const opId = RetryTags.byPaymentProposalId(proposalId); -  const res = await runOperationWithErrorReporting(ws, opId, async () => { +  const taskId = constructTaskIdentifier({ +    tag: PendingTaskType.Purchase, +    proposalId, +  }); +  const res = await runOperationWithErrorReporting(ws, taskId, async () => {      return await processPurchasePay(ws, proposalId, { forceNow: true });    });    logger.trace(`processPurchasePay response type ${res.type}`); @@ -1624,9 +1628,7 @@ export async function runPayForConfirmPay(        // We hide transient errors from the caller.        const opRetry = await ws.db          .mktx((x) => [x.operationRetries]) -        .runReadOnly(async (tx) => -          tx.operationRetries.get(RetryTags.byPaymentProposalId(proposalId)), -        ); +        .runReadOnly(async (tx) => tx.operationRetries.get(taskId));        return {          type: ConfirmPayResultType.Pending,          lastError: opRetry?.lastError, @@ -1792,9 +1794,7 @@ export async function confirmPay(  export async function processPurchase(    ws: InternalWalletState,    proposalId: string, -  options: { -    forceNow?: boolean; -  } = {}, +  options: Record<any, never> = {},  ): Promise<OperationAttemptResult> {    const purchase = await ws.db      .mktx((x) => [x.purchases]) @@ -1843,9 +1843,7 @@ export async function processPurchase(  export async function processPurchasePay(    ws: InternalWalletState,    proposalId: string, -  options: { -    forceNow?: boolean; -  } = {}, +  options: unknown = {},  ): Promise<OperationAttemptResult> {    const purchase = await ws.db      .mktx((x) => [x.purchases]) @@ -1935,7 +1933,7 @@ export async function processPurchasePay(          handleInsufficientFunds(ws, proposalId, err).catch(async (e) => {            console.log("handling insufficient funds failed"); -          await scheduleRetry(ws, RetryTags.forPay(purchase), { +          await scheduleRetry(ws, TaskIdentifiers.forPay(purchase), {              code: TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,              when: AbsoluteTime.now(),              message: "unexpected exception", @@ -2830,7 +2828,10 @@ export async function abortPay(    proposalId: string,    cancelImmediately?: boolean,  ): Promise<void> { -  const opId = RetryTags.byPaymentProposalId(proposalId); +  const opId = constructTaskIdentifier({ +    tag: PendingTaskType.Purchase, +    proposalId, +  });    await ws.db      .mktx((x) => [        x.purchases, diff --git a/packages/taler-wallet-core/src/operations/pay-peer.ts b/packages/taler-wallet-core/src/operations/pay-peer.ts index ff01342f8..4f65ec7ea 100644 --- a/packages/taler-wallet-core/src/operations/pay-peer.ts +++ b/packages/taler-wallet-core/src/operations/pay-peer.ts @@ -87,15 +87,17 @@ import { TalerError } from "@gnu-taler/taler-util";  import { InternalWalletState } from "../internal-wallet-state.js";  import {    makeTransactionId, +  resetOperationTimeout,    runOperationWithErrorReporting,    spendCoins,  } from "../operations/common.js";  import { readSuccessResponseJsonOrThrow } from "@gnu-taler/taler-util/http";  import { checkDbInvariant } from "../util/invariants.js";  import { +  constructTaskIdentifier,    OperationAttemptResult,    OperationAttemptResultType, -  RetryTags, +  TaskIdentifiers,  } from "../util/retries.js";  import { getPeerPaymentBalanceDetailsInTx } from "./balance.js";  import { updateExchangeFromUrl } from "./exchanges.js"; @@ -103,7 +105,10 @@ import { getTotalRefreshCost } from "./refresh.js";  import {    getExchangeWithdrawalInfo,    internalCreateWithdrawalGroup, +  processWithdrawalGroup,  } from "./withdraw.js"; +import { PendingTaskType } from "../pending-types.js"; +import { stopLongpolling } from "./transactions.js";  const logger = new Logger("operations/peer-to-peer.ts"); @@ -590,13 +595,14 @@ export async function initiatePeerPushPayment(        });      }); -  await runOperationWithErrorReporting( -    ws, -    RetryTags.byPeerPushPaymentInitiationPursePub(pursePair.pub), -    async () => { -      return await processPeerPushInitiation(ws, pursePair.pub); -    }, -  ); +  const taskId = constructTaskIdentifier({ +    tag: PendingTaskType.PeerPushInitiation, +    pursePub: pursePair.pub, +  }); + +  await runOperationWithErrorReporting(ws, taskId, async () => { +    return await processPeerPushInitiation(ws, pursePair.pub); +  });    return {      contractPriv: contractKeyPair.priv, @@ -951,7 +957,7 @@ export async function confirmPeerPushPayment(    await updateExchangeFromUrl(ws, peerInc.exchangeBaseUrl); -  const retryTag = RetryTags.forPeerPushCredit(peerInc); +  const retryTag = TaskIdentifiers.forPeerPushCredit(peerInc);    await runOperationWithErrorReporting(ws, retryTag, () =>      processPeerPushCredit(ws, req.peerPushPaymentIncomingId), @@ -1113,7 +1119,7 @@ export async function acceptIncomingPeerPullPayment(    await runOperationWithErrorReporting(      ws, -    RetryTags.forPeerPullPaymentDebit(ppi), +    TaskIdentifiers.forPeerPullPaymentDebit(ppi),      async () => {        return processPeerPullDebit(ws, ppi.peerPullPaymentIncomingId);      }, @@ -1263,7 +1269,23 @@ export async function processPeerPullCredit(    }    if (pullIni.status === OperationStatus.Finished) { -    logger.warn("peer pull payment initiation is already finished"); +    logger.warn( +      "peer pull payment initiation is already finished, retrying withdrawal", +    ); + +    const withdrawalGroupId = pullIni.withdrawalGroupId; + +    if (withdrawalGroupId) { +      const taskId = constructTaskIdentifier({ +        tag: PendingTaskType.Withdraw, +        withdrawalGroupId, +      }); +      stopLongpolling(ws, taskId); +      await resetOperationTimeout(ws, taskId); +      await runOperationWithErrorReporting(ws, taskId, () => +        processWithdrawalGroup(ws, withdrawalGroupId), +      ); +    }      return {        type: OperationAttemptResultType.Finished,        result: undefined, @@ -1514,19 +1536,19 @@ export async function initiatePeerPullPayment(    // whether purse creation has failed, or does the client/    // check this asynchronously from the transaction status? -  await runOperationWithErrorReporting( -    ws, -    RetryTags.byPeerPullPaymentInitiationPursePub(pursePair.pub), -    async () => { -      return processPeerPullCredit(ws, pursePair.pub); -    }, -  ); +  const taskId = constructTaskIdentifier({ +    tag: PendingTaskType.PeerPullInitiation, +    pursePub: pursePair.pub, +  }); + +  await runOperationWithErrorReporting(ws, taskId, async () => { +    return processPeerPullCredit(ws, pursePair.pub); +  });    // FIXME: Why do we create this only here?    // What if the previous operation didn't succeed? - -  // FIXME: Use a pre-computed withdrawal group ID -  // so we don't create it multiple times. +  // We actually should create it once we know the +  // money arrived (via long-polling).    await internalCreateWithdrawalGroup(ws, {      amount: instructedAmount, diff --git a/packages/taler-wallet-core/src/operations/pending.ts b/packages/taler-wallet-core/src/operations/pending.ts index 240c7ff65..2e3a5c9dc 100644 --- a/packages/taler-wallet-core/src/operations/pending.ts +++ b/packages/taler-wallet-core/src/operations/pending.ts @@ -39,7 +39,7 @@ import {  import { AbsoluteTime } from "@gnu-taler/taler-util";  import { InternalWalletState } from "../internal-wallet-state.js";  import { GetReadOnlyAccess } from "../util/query.js"; -import { RetryTags } from "../util/retries.js"; +import { TaskIdentifiers } from "../util/retries.js";  import { GlobalIDB } from "@gnu-taler/idb-bridge";  function getPendingCommon( @@ -74,7 +74,7 @@ async function gatherExchangePending(  ): Promise<void> {    // FIXME: We should do a range query here based on the update time.    await tx.exchanges.iter().forEachAsync(async (exch) => { -    const opTag = RetryTags.forExchangeUpdate(exch); +    const opTag = TaskIdentifiers.forExchangeUpdate(exch);      let opr = await tx.operationRetries.get(opTag);      const timestampDue =        opr?.retryInfo.nextRetry ?? AbsoluteTime.fromTimestamp(exch.nextUpdate); @@ -120,7 +120,7 @@ async function gatherRefreshPending(      if (r.timestampFinished) {        return;      } -    const opId = RetryTags.forRefresh(r); +    const opId = TaskIdentifiers.forRefresh(r);      const retryRecord = await tx.operationRetries.get(opId);      const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); @@ -158,7 +158,7 @@ async function gatherWithdrawalPending(      if (wsr.timestampFinish) {        return;      } -    const opTag = RetryTags.forWithdrawal(wsr); +    const opTag = TaskIdentifiers.forWithdrawal(wsr);      let opr = await tx.operationRetries.get(opTag);      const now = AbsoluteTime.now();      if (!opr) { @@ -208,7 +208,7 @@ async function gatherDepositPending(          deposited = false;        }      } -    const opId = RetryTags.forDeposit(dg); +    const opId = TaskIdentifiers.forDeposit(dg);      const retryRecord = await tx.operationRetries.get(opId);      const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();      resp.pendingOperations.push({ @@ -239,7 +239,7 @@ async function gatherTipPending(      if (tip.pickedUpTimestamp) {        return;      } -    const opId = RetryTags.forTipPickup(tip); +    const opId = TaskIdentifiers.forTipPickup(tip);      const retryRecord = await tx.operationRetries.get(opId);      const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();      if (tip.acceptedTimestamp) { @@ -272,7 +272,7 @@ async function gatherPurchasePending(    await tx.purchases.indexes.byStatus      .iter(keyRange)      .forEachAsync(async (pr) => { -      const opId = RetryTags.forPay(pr); +      const opId = TaskIdentifiers.forPay(pr);        const retryRecord = await tx.operationRetries.get(opId);        const timestampDue =          retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now(); @@ -301,7 +301,7 @@ async function gatherRecoupPending(      if (rg.timestampFinished) {        return;      } -    const opId = RetryTags.forRecoup(rg); +    const opId = TaskIdentifiers.forRecoup(rg);      const retryRecord = await tx.operationRetries.get(opId);      const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();      resp.pendingOperations.push({ @@ -325,7 +325,7 @@ async function gatherBackupPending(    resp: PendingOperationsResponse,  ): Promise<void> {    await tx.backupProviders.iter().forEachAsync(async (bp) => { -    const opId = RetryTags.forBackup(bp); +    const opId = TaskIdentifiers.forBackup(bp);      const retryRecord = await tx.operationRetries.get(opId);      if (bp.state.tag === BackupProviderStateTag.Ready) {        const timestampDue = AbsoluteTime.fromTimestamp( @@ -366,7 +366,7 @@ async function gatherPeerPullInitiationPending(      if (pi.status === OperationStatus.Finished) {        return;      } -    const opId = RetryTags.forPeerPullPaymentInitiation(pi); +    const opId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);      const retryRecord = await tx.operationRetries.get(opId);      const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();      resp.pendingOperations.push({ @@ -392,7 +392,7 @@ async function gatherPeerPullDebitPending(      if (pi.status === PeerPullPaymentIncomingStatus.Paid) {        return;      } -    const opId = RetryTags.forPeerPullPaymentDebit(pi); +    const opId = TaskIdentifiers.forPeerPullPaymentDebit(pi);      const retryRecord = await tx.operationRetries.get(opId);      const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();      resp.pendingOperations.push({ @@ -418,7 +418,7 @@ async function gatherPeerPushInitiationPending(      if (pi.status === PeerPushPaymentInitiationStatus.PurseCreated) {        return;      } -    const opId = RetryTags.forPeerPushPaymentInitiation(pi); +    const opId = TaskIdentifiers.forPeerPushPaymentInitiation(pi);      const retryRecord = await tx.operationRetries.get(opId);      const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();      resp.pendingOperations.push({ @@ -447,7 +447,7 @@ async function gatherPeerPushCreditPending(        case PeerPushPaymentIncomingStatus.WithdrawalCreated:          return;      } -    const opId = RetryTags.forPeerPushCredit(pi); +    const opId = TaskIdentifiers.forPeerPushCredit(pi);      const retryRecord = await tx.operationRetries.get(opId);      const timestampDue = retryRecord?.retryInfo.nextRetry ?? AbsoluteTime.now();      resp.pendingOperations.push({ diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 773689635..2d406ec7d 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -734,9 +734,7 @@ async function refreshReveal(  export async function processRefreshGroup(    ws: InternalWalletState,    refreshGroupId: string, -  options: { -    forceNow?: boolean; -  } = {}, +  options: Record<string, never> = {},  ): Promise<OperationAttemptResult> {    logger.info(`processing refresh group ${refreshGroupId}`); diff --git a/packages/taler-wallet-core/src/operations/tip.ts b/packages/taler-wallet-core/src/operations/tip.ts index ec7546992..28c3cda52 100644 --- a/packages/taler-wallet-core/src/operations/tip.ts +++ b/packages/taler-wallet-core/src/operations/tip.ts @@ -164,9 +164,7 @@ export async function prepareTip(  export async function processTip(    ws: InternalWalletState,    walletTipId: string, -  options: { -    forceNow?: boolean; -  } = {}, +  options: Record<string, never> = {},  ): Promise<OperationAttemptResult> {    const tipRecord = await ws.db      .mktx((x) => [x.tips]) diff --git a/packages/taler-wallet-core/src/operations/transactions.ts b/packages/taler-wallet-core/src/operations/transactions.ts index c03d2aa3d..1c2ce34bb 100644 --- a/packages/taler-wallet-core/src/operations/transactions.ts +++ b/packages/taler-wallet-core/src/operations/transactions.ts @@ -63,12 +63,15 @@ import {    PeerPullPaymentInitiationRecord,  } from "../db.js";  import { InternalWalletState } from "../internal-wallet-state.js"; +import { PendingTaskType } from "../pending-types.js";  import { checkDbInvariant } from "../util/invariants.js"; -import { RetryTags } from "../util/retries.js"; +import { constructTaskIdentifier, TaskIdentifiers } from "../util/retries.js";  import {    makeTombstoneId,    makeTransactionId,    parseId, +  resetOperationTimeout, +  runOperationWithErrorReporting,    TombstoneTag,  } from "./common.js";  import { processDepositGroup } from "./deposits.js"; @@ -79,6 +82,7 @@ import {    extractContractData,    processPurchasePay,  } from "./pay-merchant.js"; +import { processPeerPullCredit } from "./pay-peer.js";  import { processRefreshGroup } from "./refresh.js";  import { processTip } from "./tip.js";  import { @@ -152,7 +156,7 @@ export async function getTransactionById(          if (!withdrawalGroupRecord) throw Error("not found"); -        const opId = RetryTags.forWithdrawal(withdrawalGroupRecord); +        const opId = TaskIdentifiers.forWithdrawal(withdrawalGroupRecord);          const ort = await tx.operationRetries.get(opId);          if ( @@ -215,7 +219,7 @@ export async function getTransactionById(            Amounts.zeroOfAmount(contractData.amount),          ); -        const payOpId = RetryTags.forPay(purchase); +        const payOpId = TaskIdentifiers.forPay(purchase);          const payRetryRecord = await tx.operationRetries.get(payOpId);          return buildTransactionForPurchase( @@ -237,7 +241,7 @@ export async function getTransactionById(          if (!tipRecord) throw Error("not found");          const retries = await tx.operationRetries.get( -          RetryTags.forTipPickup(tipRecord), +          TaskIdentifiers.forTipPickup(tipRecord),          );          return buildTransactionForTip(tipRecord, retries);        }); @@ -250,7 +254,7 @@ export async function getTransactionById(          if (!depositRecord) throw Error("not found");          const retries = await tx.operationRetries.get( -          RetryTags.forDeposit(depositRecord), +          TaskIdentifiers.forDeposit(depositRecord),          );          return buildTransactionForDeposit(depositRecord, retries);        }); @@ -359,11 +363,11 @@ export async function getTransactionById(          if (pushInc.withdrawalGroupId) {            wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId);            if (wg) { -            const withdrawalOpId = RetryTags.forWithdrawal(wg); +            const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);              wgOrt = await tx.operationRetries.get(withdrawalOpId);            }          } -        const pushIncOpId = RetryTags.forPeerPushCredit(pushInc); +        const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pushInc);          let pushIncOrt = await tx.operationRetries.get(pushIncOpId);          return buildTransactionForPeerPushCredit( @@ -394,11 +398,12 @@ export async function getTransactionById(          if (pushInc.withdrawalGroupId) {            wg = await tx.withdrawalGroups.get(pushInc.withdrawalGroupId);            if (wg) { -            const withdrawalOpId = RetryTags.forWithdrawal(wg); +            const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);              wgOrt = await tx.operationRetries.get(withdrawalOpId);            }          } -        const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pushInc); +        const pushIncOpId = +          TaskIdentifiers.forPeerPullPaymentInitiation(pushInc);          let pushIncOrt = await tx.operationRetries.get(pushIncOpId);          return buildTransactionForPeerPullCredit( @@ -1109,11 +1114,11 @@ export async function getTransactions(          if (pi.withdrawalGroupId) {            wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId);            if (wg) { -            const withdrawalOpId = RetryTags.forWithdrawal(wg); +            const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);              wgOrt = await tx.operationRetries.get(withdrawalOpId);            }          } -        const pushIncOpId = RetryTags.forPeerPushCredit(pi); +        const pushIncOpId = TaskIdentifiers.forPeerPushCredit(pi);          let pushIncOrt = await tx.operationRetries.get(pushIncOpId);          checkDbInvariant(!!ct); @@ -1142,11 +1147,11 @@ export async function getTransactions(          if (pi.withdrawalGroupId) {            wg = await tx.withdrawalGroups.get(pi.withdrawalGroupId);            if (wg) { -            const withdrawalOpId = RetryTags.forWithdrawal(wg); +            const withdrawalOpId = TaskIdentifiers.forWithdrawal(wg);              wgOrt = await tx.operationRetries.get(withdrawalOpId);            }          } -        const pushIncOpId = RetryTags.forPeerPullPaymentInitiation(pi); +        const pushIncOpId = TaskIdentifiers.forPeerPullPaymentInitiation(pi);          let pushIncOrt = await tx.operationRetries.get(pushIncOpId);          checkDbInvariant(!!ct); @@ -1166,7 +1171,7 @@ export async function getTransactions(            return;          }          let required = false; -        const opId = RetryTags.forRefresh(rg); +        const opId = TaskIdentifiers.forRefresh(rg);          if (transactionsRequest?.includeRefreshes) {            required = true;          } else if (rg.operationStatus !== RefreshOperationStatus.Finished) { @@ -1195,7 +1200,7 @@ export async function getTransactions(            return;          } -        const opId = RetryTags.forWithdrawal(wsr); +        const opId = TaskIdentifiers.forWithdrawal(wsr);          const ort = await tx.operationRetries.get(opId);          switch (wsr.wgInfo.withdrawalType) { @@ -1238,7 +1243,7 @@ export async function getTransactions(          if (shouldSkipCurrency(transactionsRequest, amount.currency)) {            return;          } -        const opId = RetryTags.forDeposit(dg); +        const opId = TaskIdentifiers.forDeposit(dg);          const retryRecord = await tx.operationRetries.get(opId);          transactions.push(buildTransactionForDeposit(dg, retryRecord)); @@ -1309,7 +1314,7 @@ export async function getTransactions(            );          }); -        const payOpId = RetryTags.forPay(purchase); +        const payOpId = TaskIdentifiers.forPay(purchase);          const payRetryRecord = await tx.operationRetries.get(payOpId);          transactions.push(            await buildTransactionForPurchase( @@ -1333,7 +1338,7 @@ export async function getTransactions(          if (!tipRecord.acceptedTimestamp) {            return;          } -        const opId = RetryTags.forTipPickup(tipRecord); +        const opId = TaskIdentifiers.forTipPickup(tipRecord);          const retryRecord = await tx.operationRetries.get(opId);          transactions.push(buildTransactionForTip(tipRecord, retryRecord));        }); @@ -1359,6 +1364,77 @@ export async function getTransactions(    return { transactions: [...txNotPending, ...txPending] };  } +export type ParsedTransactionIdentifier = +  | { tag: TransactionType.Deposit; depositGroupId: string } +  | { tag: TransactionType.Payment; proposalId: string } +  | { tag: TransactionType.PeerPullDebit; peerPullPaymentIncomingId: string } +  | { tag: TransactionType.PeerPullCredit; pursePub: string } +  | { tag: TransactionType.PeerPushCredit; peerPushPaymentIncomingId: string } +  | { tag: TransactionType.PeerPushDebit; pursePub: string } +  | { tag: TransactionType.Refresh; refreshGroupId: string } +  | { tag: TransactionType.Refund; proposalId: string; executionTime: string } +  | { tag: TransactionType.Tip; walletTipId: string } +  | { tag: TransactionType.Withdrawal; withdrawalGroupId: string }; + +/** + * Parse a transaction identifier string into a typed, structured representation. + */ +export function parseTransactionIdentifier( +  transactionId: string, +): ParsedTransactionIdentifier | undefined { +  const { type, args: rest } = parseId("any", transactionId); + +  switch (type) { +    case TransactionType.Deposit: +      return { tag: TransactionType.Deposit, depositGroupId: rest[0] }; +    case TransactionType.Payment: +      return { tag: TransactionType.Payment, proposalId: rest[0] }; +    case TransactionType.PeerPullCredit: +      return { tag: TransactionType.PeerPullCredit, pursePub: rest[0] }; +    case TransactionType.PeerPullDebit: +      return { +        tag: TransactionType.PeerPullDebit, +        peerPullPaymentIncomingId: rest[0], +      }; +    case TransactionType.PeerPushCredit: +      return { +        tag: TransactionType.PeerPushCredit, +        peerPushPaymentIncomingId: rest[0], +      }; +    case TransactionType.PeerPushDebit: +      return { tag: TransactionType.PeerPushDebit, pursePub: rest[0] }; +    case TransactionType.Refresh: +      return { tag: TransactionType.Refresh, refreshGroupId: rest[0] }; +    case TransactionType.Refund: +      return { +        tag: TransactionType.Refund, +        proposalId: rest[0], +        executionTime: rest[1], +      }; +    case TransactionType.Tip: +      return { +        tag: TransactionType.Tip, +        walletTipId: rest[0], +      }; +    case TransactionType.Withdrawal: +      return { +        tag: TransactionType.Withdrawal, +        withdrawalGroupId: rest[0], +      }; +    default: +      return undefined; +  } +} + +export function stopLongpolling(ws: InternalWalletState, taskId: string) { +  const longpoll = ws.activeLongpoll[taskId]; +  if (longpoll) { +    logger.info(`cancelling long-polling for ${taskId}`); +    longpoll.cancel(); +    delete ws.activeLongpoll[taskId]; +  } +} +  /**   * Immediately retry the underlying operation   * of a transaction. @@ -1369,34 +1445,86 @@ export async function retryTransaction(  ): Promise<void> {    logger.info(`retrying transaction ${transactionId}`); -  const { type, args: rest } = parseId("any", transactionId); +  const parsedTx = parseTransactionIdentifier(transactionId); -  switch (type) { +  if (!parsedTx) { +    throw Error("invalid transaction identifier"); +  } + +  // FIXME: We currently don't cancel active long-polling tasks here. + +  switch (parsedTx.tag) { +    case TransactionType.PeerPullCredit: { +      const taskId = constructTaskIdentifier({ +        tag: PendingTaskType.PeerPullInitiation, +        pursePub: parsedTx.pursePub, +      }); +      await resetOperationTimeout(ws, taskId); +      stopLongpolling(ws, taskId); +      await runOperationWithErrorReporting(ws, taskId, () => +        processPeerPullCredit(ws, parsedTx.pursePub), +      ); +      break; +    }      case TransactionType.Deposit: { -      const depositGroupId = rest[0]; -      processDepositGroup(ws, depositGroupId, { -        forceNow: true, +      const taskId = constructTaskIdentifier({ +        tag: PendingTaskType.Deposit, +        depositGroupId: parsedTx.depositGroupId,        }); +      await resetOperationTimeout(ws, taskId); +      stopLongpolling(ws, taskId); +      await runOperationWithErrorReporting(ws, taskId, () => +        processDepositGroup(ws, parsedTx.depositGroupId), +      );        break;      }      case TransactionType.Withdrawal: { -      const withdrawalGroupId = rest[0]; -      await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true }); +      // FIXME: Abort current long-poller! +      const taskId = constructTaskIdentifier({ +        tag: PendingTaskType.Withdraw, +        withdrawalGroupId: parsedTx.withdrawalGroupId, +      }); +      await resetOperationTimeout(ws, taskId); +      stopLongpolling(ws, taskId); +      await runOperationWithErrorReporting(ws, taskId, () => +        processWithdrawalGroup(ws, parsedTx.withdrawalGroupId), +      );        break;      }      case TransactionType.Payment: { -      const proposalId = rest[0]; -      await processPurchasePay(ws, proposalId, { forceNow: true }); +      const taskId = constructTaskIdentifier({ +        tag: PendingTaskType.Purchase, +        proposalId: parsedTx.proposalId, +      }); +      await resetOperationTimeout(ws, taskId); +      stopLongpolling(ws, taskId); +      await runOperationWithErrorReporting(ws, taskId, () => +        processPurchasePay(ws, parsedTx.proposalId), +      );        break;      }      case TransactionType.Tip: { -      const walletTipId = rest[0]; -      await processTip(ws, walletTipId, { forceNow: true }); +      const taskId = constructTaskIdentifier({ +        tag: PendingTaskType.TipPickup, +        walletTipId: parsedTx.walletTipId, +      }); +      await resetOperationTimeout(ws, taskId); +      stopLongpolling(ws, taskId); +      await runOperationWithErrorReporting(ws, taskId, () => +        processTip(ws, parsedTx.walletTipId), +      );        break;      }      case TransactionType.Refresh: { -      const refreshGroupId = rest[0]; -      await processRefreshGroup(ws, refreshGroupId, { forceNow: true }); +      const taskId = constructTaskIdentifier({ +        tag: PendingTaskType.Refresh, +        refreshGroupId: parsedTx.refreshGroupId, +      }); +      await resetOperationTimeout(ws, taskId); +      stopLongpolling(ws, taskId); +      await runOperationWithErrorReporting(ws, taskId, () => +        processRefreshGroup(ws, parsedTx.refreshGroupId), +      );        break;      }      default: diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index 9dfd72678..5729b8458 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -109,7 +109,7 @@ import { DbAccess, GetReadOnlyAccess } from "../util/query.js";  import {    OperationAttemptResult,    OperationAttemptResultType, -  RetryTags, +  TaskIdentifiers,  } from "../util/retries.js";  import {    WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, @@ -1023,7 +1023,6 @@ export async function processWithdrawalGroup(    ws: InternalWalletState,    withdrawalGroupId: string,    options: { -    forceNow?: boolean;    } = {},  ): Promise<OperationAttemptResult> {    logger.trace("processing withdrawal group", withdrawalGroupId); @@ -1037,10 +1036,10 @@ export async function processWithdrawalGroup(      throw Error(`withdrawal group ${withdrawalGroupId} not found`);    } -  const retryTag = RetryTags.forWithdrawal(withdrawalGroup); +  const retryTag = TaskIdentifiers.forWithdrawal(withdrawalGroup);    // We're already running! -  if (ws.activeLongpoll[retryTag] && !options.forceNow) { +  if (ws.activeLongpoll[retryTag]) {      logger.info("withdrawal group already in long-polling, returning!");      return {        type: OperationAttemptResultType.Longpoll, @@ -1532,7 +1531,7 @@ export async function getWithdrawalDetailsForUri(            .iter(r.baseUrl)            .toArray();          const retryRecord = await tx.operationRetries.get( -          RetryTags.forExchangeUpdate(r), +          TaskIdentifiers.forExchangeUpdate(r),          );          if (exchangeDetails && denominations) {            exchanges.push( @@ -2087,7 +2086,7 @@ export async function createManualWithdrawal(    // rely on retry handling to re-process the withdrawal group.    runOperationWithErrorReporting(      ws, -    RetryTags.forWithdrawal(withdrawalGroup), +    TaskIdentifiers.forWithdrawal(withdrawalGroup),      async () => {        return await processWithdrawalGroup(ws, withdrawalGroupId, {          forceNow: true, diff --git a/packages/taler-wallet-core/src/util/retries.ts b/packages/taler-wallet-core/src/util/retries.ts index 5744bf8fe..5b6645924 100644 --- a/packages/taler-wallet-core/src/util/retries.ts +++ b/packages/taler-wallet-core/src/util/retries.ts @@ -46,6 +46,7 @@ import { TalerError } from "@gnu-taler/taler-util";  import { InternalWalletState } from "../internal-wallet-state.js";  import { PendingTaskType } from "../pending-types.js";  import { GetReadWriteAccess } from "./query.js"; +import { assertUnreachable } from "./assertUnreachable.js";  const logger = new Logger("util/retries.ts"); @@ -176,7 +177,66 @@ export namespace RetryInfo {    }  } -export namespace RetryTags { +/** + * Parsed representation of task identifiers. + */ +export type ParsedTaskIdentifier = +  | { +      tag: PendingTaskType.Withdraw; +      withdrawalGroupId: string; +    } +  | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string } +  | { tag: PendingTaskType.Backup; backupProviderBaseUrl: string } +  | { tag: PendingTaskType.Deposit; depositGroupId: string } +  | { tag: PendingTaskType.ExchangeCheckRefresh; exchangeBaseUrl: string } +  | { tag: PendingTaskType.ExchangeUpdate; exchangeBaseUrl: string } +  | { tag: PendingTaskType.PeerPullDebit; peerPullPaymentIncomingId: string } +  | { tag: PendingTaskType.PeerPullInitiation; pursePub: string } +  | { tag: PendingTaskType.PeerPushCredit; peerPushPaymentIncomingId: string } +  | { tag: PendingTaskType.PeerPushInitiation; pursePub: string } +  | { tag: PendingTaskType.Purchase; proposalId: string } +  | { tag: PendingTaskType.Recoup; recoupGroupId: string } +  | { tag: PendingTaskType.TipPickup; walletTipId: string } +  | { tag: PendingTaskType.Refresh; refreshGroupId: string }; + +export function parseTaskIdentifier(x: string): ParsedTaskIdentifier { +  throw Error("not yet implemented"); +} + +export function constructTaskIdentifier(p: ParsedTaskIdentifier): string { +  switch (p.tag) { +    case PendingTaskType.Backup: +      return `${p.tag}:${p.backupProviderBaseUrl}`; +    case PendingTaskType.Deposit: +      return `${p.tag}:${p.depositGroupId}`; +    case PendingTaskType.ExchangeCheckRefresh: +      return `${p.tag}:${p.exchangeBaseUrl}`; +    case PendingTaskType.ExchangeUpdate: +      return `${p.tag}:${p.exchangeBaseUrl}`; +    case PendingTaskType.PeerPullDebit: +      return `${p.tag}:${p.peerPullPaymentIncomingId}`; +    case PendingTaskType.PeerPushCredit: +      return `${p.tag}:${p.peerPushPaymentIncomingId}`; +    case PendingTaskType.PeerPullInitiation: +      return `${p.tag}:${p.pursePub}`; +    case PendingTaskType.PeerPushInitiation: +      return `${p.tag}:${p.pursePub}`; +    case PendingTaskType.Purchase: +      return `${p.tag}:${p.proposalId}`; +    case PendingTaskType.Recoup: +      return `${p.tag}:${p.recoupGroupId}`; +    case PendingTaskType.Refresh: +      return `${p.tag}:${p.refreshGroupId}`; +    case PendingTaskType.TipPickup: +      return `${p.tag}:${p.walletTipId}`; +    case PendingTaskType.Withdraw: +      return `${p.tag}:${p.withdrawalGroupId}`; +    default: +      assertUnreachable(p); +  } +} + +export namespace TaskIdentifiers {    export function forWithdrawal(wg: WithdrawalGroupRecord): string {      return `${PendingTaskType.Withdraw}:${wg.withdrawalGroupId}`;    } @@ -227,19 +287,6 @@ export namespace RetryTags {    ): string {      return `${PendingTaskType.PeerPushCredit}:${ppi.peerPushPaymentIncomingId}`;    } -  export function byPaymentProposalId(proposalId: string): string { -    return `${PendingTaskType.Purchase}:${proposalId}`; -  } -  export function byPeerPushPaymentInitiationPursePub( -    pursePub: string, -  ): string { -    return `${PendingTaskType.PeerPushInitiation}:${pursePub}`; -  } -  export function byPeerPullPaymentInitiationPursePub( -    pursePub: string, -  ): string { -    return `${PendingTaskType.PeerPullInitiation}:${pursePub}`; -  }  }  export async function scheduleRetryInTx( diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index a036be86c..47724efdc 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -265,7 +265,7 @@ import {    GetReadOnlyAccess,    GetReadWriteAccess,  } from "./util/query.js"; -import { OperationAttemptResult, RetryTags } from "./util/retries.js"; +import { OperationAttemptResult, TaskIdentifiers } from "./util/retries.js";  import { TimerAPI, TimerGroup } from "./util/timer.js";  import {    WALLET_BANK_INTEGRATION_PROTOCOL_VERSION, @@ -306,17 +306,15 @@ async function callOperationHandler(          forceNow,        });      case PendingTaskType.Refresh: -      return await processRefreshGroup(ws, pending.refreshGroupId, { -        forceNow, -      }); +      return await processRefreshGroup(ws, pending.refreshGroupId);      case PendingTaskType.Withdraw:        return await processWithdrawalGroup(ws, pending.withdrawalGroupId, {          forceNow,        });      case PendingTaskType.TipPickup: -      return await processTip(ws, pending.tipId, { forceNow }); +      return await processTip(ws, pending.tipId);      case PendingTaskType.Purchase: -      return await processPurchase(ws, pending.proposalId, { forceNow }); +      return await processPurchase(ws, pending.proposalId);      case PendingTaskType.Recoup:        return await processRecoupGroupHandler(ws, pending.recoupGroupId, {          forceNow, @@ -324,9 +322,7 @@ async function callOperationHandler(      case PendingTaskType.ExchangeCheckRefresh:        return await autoRefresh(ws, pending.exchangeBaseUrl);      case PendingTaskType.Deposit: { -      return await processDepositGroup(ws, pending.depositGroupId, { -        forceNow, -      }); +      return await processDepositGroup(ws, pending.depositGroupId);      }      case PendingTaskType.Backup:        return await processBackupForProvider(ws, pending.backupProviderBaseUrl); @@ -691,7 +687,7 @@ async function getExchanges(        for (const r of exchangeRecords) {          const exchangeDetails = await getExchangeDetails(tx, r.baseUrl);          const opRetryRecord = await tx.operationRetries.get( -          RetryTags.forExchangeUpdate(r), +          TaskIdentifiers.forExchangeUpdate(r),          );          exchanges.push(            makeExchangeListItem(r, exchangeDetails, opRetryRecord?.lastError), @@ -1285,9 +1281,7 @@ async function dispatchRequestInternal<Op extends WalletApiOperation>(              RefreshReason.Manual,            );          }); -      processRefreshGroup(ws, refreshGroupId.refreshGroupId, { -        forceNow: true, -      }).catch((x) => { +      processRefreshGroup(ws, refreshGroupId.refreshGroupId).catch((x) => {          logger.error(x);        });        return { @@ -1753,6 +1747,7 @@ class InternalWalletStateImpl implements InternalWalletState {      for (const key of Object.keys(this.activeLongpoll)) {        logger.trace(`cancelling active longpoll ${key}`);        this.activeLongpoll[key].cancel(); +      delete this.activeLongpoll[key];      }    } | 
