diff --git a/packages/taler-wallet-core/src/operations/pay.ts b/packages/taler-wallet-core/src/operations/pay.ts index c3dd6c6d3..996e1c1ec 100644 --- a/packages/taler-wallet-core/src/operations/pay.ts +++ b/packages/taler-wallet-core/src/operations/pay.ts @@ -57,7 +57,7 @@ import { Logger } from "../util/logging"; import { parsePayUri } from "../util/taleruri"; import { guardOperationException, OperationFailedError } from "./errors"; import { createRefreshGroup, getTotalRefreshCost } from "./refresh"; -import { InternalWalletState } from "./state"; +import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state"; import { getTimestampNow, timestampAddDuration } from "../util/time"; import { strcmp, canonicalJson } from "../util/helpers"; import { readSuccessResponseJsonOrThrow } from "../util/http"; @@ -796,7 +796,9 @@ export async function submitPay( logger.trace("making pay request", JSON.stringify(reqBody, undefined, 2)); - const resp = await ws.http.postJson(payUrl, reqBody); + const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () => + ws.http.postJson(payUrl, reqBody), + ); const merchantResp = await readSuccessResponseJsonOrThrow( resp, diff --git a/packages/taler-wallet-core/src/operations/refresh.ts b/packages/taler-wallet-core/src/operations/refresh.ts index 52325281b..409ae58cc 100644 --- a/packages/taler-wallet-core/src/operations/refresh.ts +++ b/packages/taler-wallet-core/src/operations/refresh.ts @@ -29,7 +29,7 @@ import { } from "../types/dbTypes"; import { amountToPretty } from "../util/helpers"; import { TransactionHandle } from "../util/query"; -import { InternalWalletState } from "./state"; +import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state"; import { Logger } from "../util/logging"; import { getWithdrawDenomList } from "./withdraw"; import { updateExchangeFromUrl } from "./exchanges"; @@ -43,7 +43,7 @@ import { guardOperationException } from "./errors"; import { NotificationType } from "../types/notifications"; import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto"; import { getTimestampNow } from "../util/time"; -import { readSuccessResponseJsonOrThrow } from "../util/http"; +import { readSuccessResponseJsonOrThrow, HttpResponse } from "../util/http"; import { codecForExchangeMeltResponse, codecForExchangeRevealResponse, @@ -248,7 +248,14 @@ async function refreshMelt( value_with_fee: Amounts.stringify(refreshSession.amountRefreshInput), }; logger.trace(`melt request for coin:`, meltReq); - const resp = await ws.http.postJson(reqUrl.href, meltReq); + + const resp = await ws.runSequentialized( + [EXCHANGE_COINS_LOCK], + async () => { + return await ws.http.postJson(reqUrl.href, meltReq); + }, + ); + const meltResponse = await readSuccessResponseJsonOrThrow( resp, codecForExchangeMeltResponse(), @@ -339,7 +346,13 @@ async function refreshReveal( refreshSession.exchangeBaseUrl, ); - const resp = await ws.http.postJson(reqUrl.href, req); + const resp = await ws.runSequentialized( + [EXCHANGE_COINS_LOCK], + async () => { + return await ws.http.postJson(reqUrl.href, req); + }, + ); + const reveal = await readSuccessResponseJsonOrThrow( resp, codecForExchangeRevealResponse(), @@ -446,6 +459,9 @@ async function incrementRefreshRetry( } } +/** + * Actually process a refresh group that has been created. + */ export async function processRefreshGroup( ws: InternalWalletState, refreshGroupId: string, @@ -557,15 +573,7 @@ export async function createRefreshGroup( await tx.put(Stores.refreshGroups, refreshGroup); - const processAsync = async (): Promise => { - try { - await processRefreshGroup(ws, refreshGroupId); - } catch (e) { - logger.trace(`Error during refresh: ${e}`); - } - }; - - processAsync(); + logger.trace(`created refresh group ${refreshGroupId}`); return { refreshGroupId, diff --git a/packages/taler-wallet-core/src/operations/state.ts b/packages/taler-wallet-core/src/operations/state.ts index cfec85d0f..582dd92d3 100644 --- a/packages/taler-wallet-core/src/operations/state.ts +++ b/packages/taler-wallet-core/src/operations/state.ts @@ -22,11 +22,15 @@ import { Logger } from "../util/logging"; import { PendingOperationsResponse } from "../types/pending"; import { WalletNotification } from "../types/notifications"; import { Database } from "../util/query"; +import { openPromise, OpenedPromise } from "../util/promiseUtils"; type NotificationListener = (n: WalletNotification) => void; const logger = new Logger("state.ts"); +export const EXCHANGE_COINS_LOCK = "exchange-coins-lock"; +export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock"; + export class InternalWalletState { cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; memoProcessReserve: AsyncOpMemoMap = new AsyncOpMemoMap(); @@ -41,6 +45,16 @@ export class InternalWalletState { listeners: NotificationListener[] = []; + /** + * Promises that are waiting for a particular resource. + */ + private resourceWaiters: Record[]> = {}; + + /** + * Resources that are currently locked. + */ + private resourceLocks: Set = new Set(); + constructor( public db: Database, public http: HttpRequestLibrary, @@ -49,7 +63,7 @@ export class InternalWalletState { this.cryptoApi = new CryptoApi(cryptoWorkerFactory); } - public notify(n: WalletNotification): void { + notify(n: WalletNotification): void { logger.trace("Notification", n); for (const l of this.listeners) { const nc = JSON.parse(JSON.stringify(n)); @@ -62,4 +76,41 @@ export class InternalWalletState { addNotificationListener(f: (n: WalletNotification) => void): void { this.listeners.push(f); } + + /** + * Run an async function after acquiring a list of locks, identified + * by string tokens. + */ + async runSequentialized(tokens: string[], f: () => Promise) { + // Make sure locks are always acquired in the same order + tokens = [... tokens].sort(); + + for (const token of tokens) { + if (this.resourceLocks.has(token)) { + const p = openPromise(); + let waitList = this.resourceWaiters[token]; + if (!waitList) { + waitList = this.resourceWaiters[token] = []; + } + waitList.push(p); + await p.promise; + } + this.resourceLocks.add(token); + } + + try { + logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`); + const result = await f(); + logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`); + return result; + } finally { + for (const token of tokens) { + this.resourceLocks.delete(token); + let waiter = (this.resourceWaiters[token] ?? []).shift(); + if (waiter) { + waiter.resolve(); + } + } + } + } }