diff options
| author | Florian Dold <florian@dold.me> | 2022-09-23 18:56:21 +0200 | 
|---|---|---|
| committer | Florian Dold <florian@dold.me> | 2022-09-23 20:38:26 +0200 | 
| commit | 72336b149b4c27715e4e2f7610ec4007ecccdbd9 (patch) | |
| tree | f209095fc98fd47d4681499662276f4cde5486ba /packages/taler-wallet-core/src | |
| parent | 9811e19252ef859099fa5c16d703808f6c778a94 (diff) | |
wallet-core: do not block when accepting a manual withdrawal
Diffstat (limited to 'packages/taler-wallet-core/src')
| -rw-r--r-- | packages/taler-wallet-core/src/internal-wallet-state.ts | 16 | ||||
| -rw-r--r-- | packages/taler-wallet-core/src/operations/withdraw.ts | 80 | ||||
| -rw-r--r-- | packages/taler-wallet-core/src/wallet.ts | 71 | 
3 files changed, 113 insertions, 54 deletions
diff --git a/packages/taler-wallet-core/src/internal-wallet-state.ts b/packages/taler-wallet-core/src/internal-wallet-state.ts index ad6afe3c3..b8415a469 100644 --- a/packages/taler-wallet-core/src/internal-wallet-state.ts +++ b/packages/taler-wallet-core/src/internal-wallet-state.ts @@ -127,6 +127,12 @@ export interface RecoupOperations {  export type NotificationListener = (n: WalletNotification) => void; +export interface ActiveLongpollInfo { +  [opId: string]: { +    cancel: () => void; +  }; +} +  /**   * Internal, shard wallet state that is used by the implementation   * of wallet operations. @@ -135,12 +141,10 @@ export type NotificationListener = (n: WalletNotification) => void;   * as it's an opaque implementation detail.   */  export interface InternalWalletState { -  memoProcessReserve: AsyncOpMemoMap<void>; -  memoMakePlanchet: AsyncOpMemoMap<void>; -  memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>; -  memoGetBalance: AsyncOpMemoSingle<BalancesResponse>; -  memoProcessRefresh: AsyncOpMemoMap<void>; -  memoProcessRecoup: AsyncOpMemoMap<void>; +  /** +   * Active longpoll operations. +   */ +  activeLongpoll: ActiveLongpollInfo;    cryptoApi: TalerCryptoInterface; diff --git a/packages/taler-wallet-core/src/operations/withdraw.ts b/packages/taler-wallet-core/src/operations/withdraw.ts index cedb62361..4901fdc86 100644 --- a/packages/taler-wallet-core/src/operations/withdraw.ts +++ b/packages/taler-wallet-core/src/operations/withdraw.ts @@ -28,6 +28,7 @@ import {    Amounts,    AmountString,    BankWithdrawDetails, +  CancellationToken,    canonicalizeBaseUrl,    codecForBankWithdrawalOperationPostResponse,    codecForReserveStatus, @@ -106,7 +107,12 @@ import {    WALLET_BANK_INTEGRATION_PROTOCOL_VERSION,    WALLET_EXCHANGE_PROTOCOL_VERSION,  } from "../versions.js"; -import { makeCoinAvailable, storeOperationPending } from "../wallet.js"; +import { +  makeCoinAvailable, +  runOperationWithErrorReporting, +  storeOperationError, +  storeOperationPending, +} from "../wallet.js";  import {    getExchangeDetails,    getExchangePaytoUri, @@ -962,6 +968,7 @@ export async function updateWithdrawalDenoms(  async function queryReserve(    ws: InternalWalletState,    withdrawalGroupId: string, +  cancellationToken: CancellationToken,  ): Promise<{ ready: boolean }> {    const withdrawalGroup = await getWithdrawalGroupRecordTx(ws.db, {      withdrawalGroupId, @@ -982,6 +989,7 @@ async function queryReserve(    const resp = await ws.http.get(reserveUrl.href, {      timeout: getReserveRequestTimeout(withdrawalGroup), +    cancellationToken,    });    const result = await readSuccessResponseJsonOrErrorCode( @@ -1044,6 +1052,16 @@ export async function processWithdrawalGroup(      throw Error(`withdrawal group ${withdrawalGroupId} not found`);    } +  const retryTag = RetryTags.forWithdrawal(withdrawalGroup); + +  // We're already running! +  if (ws.activeLongpoll[retryTag]) { +    logger.info("withdrawal group already in long-polling, returning!"); +    return { +      type: OperationAttemptResultType.Longpoll, +    }; +  } +    switch (withdrawalGroup.status) {      case WithdrawalGroupStatus.RegisteringBank:        await processReserveBankStatus(ws, withdrawalGroupId); @@ -1051,15 +1069,45 @@ export async function processWithdrawalGroup(          forceNow: true,        });      case WithdrawalGroupStatus.QueryingStatus: { -      const res = await queryReserve(ws, withdrawalGroupId); -      if (res.ready) { -        return await processWithdrawalGroup(ws, withdrawalGroupId, { -          forceNow: true, -        }); -      } +      const doQueryAsync = async () => { +        if (ws.stopped) { +          logger.info("not long-polling reserve, wallet already stopped"); +          await storeOperationPending(ws, retryTag); +          return; +        } +        const cts = CancellationToken.create(); +        let res: { ready: boolean } | undefined = undefined; +        try { +          ws.activeLongpoll[retryTag] = { +            cancel: () => { +              logger.info("cancel of reserve longpoll requested"); +              cts.cancel(); +            }, +          }; +          res = await queryReserve(ws, withdrawalGroupId, cts.token); +        } catch (e) { +          await storeOperationError( +            ws, +            retryTag, +            getErrorDetailFromException(e), +          ); +          return; +        } +        delete ws.activeLongpoll[retryTag]; +        logger.info( +          `active longpoll keys (2) ${Object.keys(ws.activeLongpoll)}`, +        ); +        if (!res.ready) { +          await storeOperationPending(ws, retryTag); +        } +        ws.latch.trigger(); +      }; +      doQueryAsync(); +      logger.info( +        "returning early from withdrawal for long-polling in background", +      );        return { -        type: OperationAttemptResultType.Pending, -        result: undefined, +        type: OperationAttemptResultType.Longpoll,        };      }      case WithdrawalGroupStatus.WaitConfirmBank: { @@ -1912,10 +1960,16 @@ export async function createManualWithdrawal(        return await getFundingPaytoUris(tx, withdrawalGroup.withdrawalGroupId);      }); -  // Start withdrawal in the background. -  await processWithdrawalGroup(ws, withdrawalGroupId, { forceNow: true }).catch( -    (err) => { -      logger.error("Processing withdrawal (after creation) failed:", err); +  // Start withdrawal in the background (do not await!) +  // FIXME: We could also interrupt the task look if it is waiting and +  // rely on retry handling to re-process the withdrawal group. +  runOperationWithErrorReporting( +    ws, +    RetryTags.forWithdrawal(withdrawalGroup), +    async () => { +      return await processWithdrawalGroup(ws, withdrawalGroupId, { +        forceNow: true, +      });      },    ); diff --git a/packages/taler-wallet-core/src/wallet.ts b/packages/taler-wallet-core/src/wallet.ts index 7890259f2..dd47d7ce3 100644 --- a/packages/taler-wallet-core/src/wallet.ts +++ b/packages/taler-wallet-core/src/wallet.ts @@ -64,38 +64,37 @@ import {    codecForSetWalletDeviceIdRequest,    codecForTestPayArgs,    codecForTrackDepositGroupRequest, +  codecForTransactionByIdRequest,    codecForTransactionsRequest,    codecForWithdrawFakebankRequest,    codecForWithdrawTestBalance,    CoinDumpJson,    CoreApiResponse, +  DenominationInfo,    Duration,    durationFromSpec,    durationMin,    ExchangeFullDetails, +  ExchangeListItem,    ExchangesListResponse, +  FeeDescription,    GetExchangeTosResult,    j2s,    KnownBankAccounts,    Logger,    ManualWithdrawalDetails,    NotificationType, +  OperationMap,    parsePaytoUri, -  PaytoUri,    RefreshReason,    TalerErrorCode, -  URL, -  WalletNotification, -  WalletCoreVersion, -  ExchangeListItem, -  OperationMap, -  FeeDescription,    TalerErrorDetail, -  codecForTransactionByIdRequest, -  DenominationInfo,    KnownBankAccountsInfo,    codecForAddKnownBankAccounts,    codecForForgetKnownBankAccounts, +  URL, +  WalletCoreVersion, +  WalletNotification,  } from "@gnu-taler/taler-util";  import { TalerCryptoInterface } from "./crypto/cryptoImplementation.js";  import { @@ -119,6 +118,7 @@ import {    TalerError,  } from "./errors.js";  import { +  ActiveLongpollInfo,    ExchangeOperations,    InternalWalletState,    MerchantInfo, @@ -235,7 +235,6 @@ import {    OperationAttemptResult,    OperationAttemptResultType,    RetryInfo, -  runOperationHandlerForResult,  } from "./util/retries.js";  import { TimerAPI, TimerGroup } from "./util/timer.js";  import { @@ -392,27 +391,21 @@ export async function storeOperationPending(      });  } -/** - * Execute one operation based on the pending operation info record. - * - * Store success/failure result in the database. - */ -async function processOnePendingOperation( +export async function runOperationWithErrorReporting(    ws: InternalWalletState, -  pending: PendingTaskInfo, -  forceNow = false, +  opId: string, +  f: () => Promise<OperationAttemptResult>,  ): Promise<void> { -  logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`);    let maybeError: TalerErrorDetail | undefined;    try { -    const resp = await callOperationHandler(ws, pending, forceNow); +    const resp = await f();      switch (resp.type) {        case OperationAttemptResultType.Error: -        return await storeOperationError(ws, pending.id, resp.errorDetail); +        return await storeOperationError(ws, opId, resp.errorDetail);        case OperationAttemptResultType.Finished: -        return await storeOperationFinished(ws, pending.id); +        return await storeOperationFinished(ws, opId);        case OperationAttemptResultType.Pending: -        return await storeOperationPending(ws, pending.id); +        return await storeOperationPending(ws, opId);        case OperationAttemptResultType.Longpoll:          break;      } @@ -421,7 +414,7 @@ async function processOnePendingOperation(        logger.warn("operation processed resulted in error");        logger.warn(`error was: ${j2s(e.errorDetail)}`);        maybeError = e.errorDetail; -      return await storeOperationError(ws, pending.id, maybeError!); +      return await storeOperationError(ws, opId, maybeError!);      } else if (e instanceof Error) {        // This is a bug, as we expect pending operations to always        // do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED @@ -435,7 +428,7 @@ async function processOnePendingOperation(          },          `unexpected exception (message: ${e.message})`,        ); -      return await storeOperationError(ws, pending.id, maybeError); +      return await storeOperationError(ws, opId, maybeError);      } else {        logger.error("Uncaught exception, value is not even an error.");        maybeError = makeErrorDetail( @@ -443,7 +436,7 @@ async function processOnePendingOperation(          {},          `unexpected exception (not even an error)`,        ); -      return await storeOperationError(ws, pending.id, maybeError); +      return await storeOperationError(ws, opId, maybeError);      }    }  } @@ -460,7 +453,10 @@ export async function runPending(      if (!forceNow && !AbsoluteTime.isExpired(p.timestampDue)) {        continue;      } -    await processOnePendingOperation(ws, p, forceNow); +    await runOperationWithErrorReporting(ws, p.id, async () => { +      logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); +      return await callOperationHandler(ws, p, forceNow); +    });    }  } @@ -563,7 +559,10 @@ async function runTaskLoop(          if (!AbsoluteTime.isExpired(p.timestampDue)) {            continue;          } -        await processOnePendingOperation(ws, p); +        await runOperationWithErrorReporting(ws, p.id, async () => { +          logger.trace(`running pending ${JSON.stringify(p, undefined, 2)}`); +          return await callOperationHandler(ws, p); +        });          ws.notify({            type: NotificationType.PendingOperationProcessed,          }); @@ -1613,13 +1612,10 @@ export class Wallet {   * This ties together all the operation implementations.   */  class InternalWalletStateImpl implements InternalWalletState { -  memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); -  memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); -  memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> = -    new AsyncOpMemoSingle(); -  memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle(); -  memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); -  memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); +  /** +   * @see {@link InternalWalletState.activeLongpoll} +   */ +  activeLongpoll: ActiveLongpollInfo = {};    cryptoApi: TalerCryptoInterface;    cryptoDispatcher: CryptoDispatcher; @@ -1719,9 +1715,14 @@ class InternalWalletStateImpl implements InternalWalletState {     * Stop ongoing processing.     */    stop(): void { +    logger.info("stopping (at internal wallet state)");      this.stopped = true;      this.timerGroup.stopCurrentAndFutureTimers();      this.cryptoDispatcher.stop(); +    for (const key of Object.keys(this.activeLongpoll)) { +      logger.info(`cancelling active longpoll ${key}`); +      this.activeLongpoll[key].cancel(); +    }    }    async runUntilDone(  | 
