From e2f7bc79cd4326f769f370f230041101a099d98c Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Tue, 18 Aug 2020 18:23:06 +0530 Subject: introduce locking to avoid certain simultaneous requests to the exchange --- packages/taler-wallet-core/src/operations/state.ts | 53 +++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) (limited to 'packages/taler-wallet-core/src/operations/state.ts') diff --git a/packages/taler-wallet-core/src/operations/state.ts b/packages/taler-wallet-core/src/operations/state.ts index cfec85d0f..582dd92d3 100644 --- a/packages/taler-wallet-core/src/operations/state.ts +++ b/packages/taler-wallet-core/src/operations/state.ts @@ -22,11 +22,15 @@ import { Logger } from "../util/logging"; import { PendingOperationsResponse } from "../types/pending"; import { WalletNotification } from "../types/notifications"; import { Database } from "../util/query"; +import { openPromise, OpenedPromise } from "../util/promiseUtils"; type NotificationListener = (n: WalletNotification) => void; const logger = new Logger("state.ts"); +export const EXCHANGE_COINS_LOCK = "exchange-coins-lock"; +export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock"; + export class InternalWalletState { cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; memoProcessReserve: AsyncOpMemoMap = new AsyncOpMemoMap(); @@ -41,6 +45,16 @@ export class InternalWalletState { listeners: NotificationListener[] = []; + /** + * Promises that are waiting for a particular resource. + */ + private resourceWaiters: Record[]> = {}; + + /** + * Resources that are currently locked. + */ + private resourceLocks: Set = new Set(); + constructor( public db: Database, public http: HttpRequestLibrary, @@ -49,7 +63,7 @@ export class InternalWalletState { this.cryptoApi = new CryptoApi(cryptoWorkerFactory); } - public notify(n: WalletNotification): void { + notify(n: WalletNotification): void { logger.trace("Notification", n); for (const l of this.listeners) { const nc = JSON.parse(JSON.stringify(n)); @@ -62,4 +76,41 @@ export class InternalWalletState { addNotificationListener(f: (n: WalletNotification) => void): void { this.listeners.push(f); } + + /** + * Run an async function after acquiring a list of locks, identified + * by string tokens. + */ + async runSequentialized(tokens: string[], f: () => Promise) { + // Make sure locks are always acquired in the same order + tokens = [... tokens].sort(); + + for (const token of tokens) { + if (this.resourceLocks.has(token)) { + const p = openPromise(); + let waitList = this.resourceWaiters[token]; + if (!waitList) { + waitList = this.resourceWaiters[token] = []; + } + waitList.push(p); + await p.promise; + } + this.resourceLocks.add(token); + } + + try { + logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`); + const result = await f(); + logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`); + return result; + } finally { + for (const token of tokens) { + this.resourceLocks.delete(token); + let waiter = (this.resourceWaiters[token] ?? []).shift(); + if (waiter) { + waiter.resolve(); + } + } + } + } } -- cgit v1.2.3