introduce locking to avoid certain simultaneous requests to the exchange

This commit is contained in:
Florian Dold 2020-08-18 18:23:06 +05:30
parent 53cd347b1c
commit e2f7bc79cd
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
3 changed files with 77 additions and 16 deletions

View File

@ -57,7 +57,7 @@ import { Logger } from "../util/logging";
import { parsePayUri } from "../util/taleruri"; import { parsePayUri } from "../util/taleruri";
import { guardOperationException, OperationFailedError } from "./errors"; import { guardOperationException, OperationFailedError } from "./errors";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh"; import { createRefreshGroup, getTotalRefreshCost } from "./refresh";
import { InternalWalletState } from "./state"; import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state";
import { getTimestampNow, timestampAddDuration } from "../util/time"; import { getTimestampNow, timestampAddDuration } from "../util/time";
import { strcmp, canonicalJson } from "../util/helpers"; import { strcmp, canonicalJson } from "../util/helpers";
import { readSuccessResponseJsonOrThrow } from "../util/http"; import { readSuccessResponseJsonOrThrow } from "../util/http";
@ -796,7 +796,9 @@ export async function submitPay(
logger.trace("making pay request", JSON.stringify(reqBody, undefined, 2)); logger.trace("making pay request", JSON.stringify(reqBody, undefined, 2));
const resp = await ws.http.postJson(payUrl, reqBody); const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () =>
ws.http.postJson(payUrl, reqBody),
);
const merchantResp = await readSuccessResponseJsonOrThrow( const merchantResp = await readSuccessResponseJsonOrThrow(
resp, resp,

View File

@ -29,7 +29,7 @@ import {
} from "../types/dbTypes"; } from "../types/dbTypes";
import { amountToPretty } from "../util/helpers"; import { amountToPretty } from "../util/helpers";
import { TransactionHandle } from "../util/query"; import { TransactionHandle } from "../util/query";
import { InternalWalletState } from "./state"; import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state";
import { Logger } from "../util/logging"; import { Logger } from "../util/logging";
import { getWithdrawDenomList } from "./withdraw"; import { getWithdrawDenomList } from "./withdraw";
import { updateExchangeFromUrl } from "./exchanges"; import { updateExchangeFromUrl } from "./exchanges";
@ -43,7 +43,7 @@ import { guardOperationException } from "./errors";
import { NotificationType } from "../types/notifications"; import { NotificationType } from "../types/notifications";
import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto"; import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto";
import { getTimestampNow } from "../util/time"; import { getTimestampNow } from "../util/time";
import { readSuccessResponseJsonOrThrow } from "../util/http"; import { readSuccessResponseJsonOrThrow, HttpResponse } from "../util/http";
import { import {
codecForExchangeMeltResponse, codecForExchangeMeltResponse,
codecForExchangeRevealResponse, codecForExchangeRevealResponse,
@ -248,7 +248,14 @@ async function refreshMelt(
value_with_fee: Amounts.stringify(refreshSession.amountRefreshInput), value_with_fee: Amounts.stringify(refreshSession.amountRefreshInput),
}; };
logger.trace(`melt request for coin:`, meltReq); logger.trace(`melt request for coin:`, meltReq);
const resp = await ws.http.postJson(reqUrl.href, meltReq);
const resp = await ws.runSequentialized(
[EXCHANGE_COINS_LOCK],
async () => {
return await ws.http.postJson(reqUrl.href, meltReq);
},
);
const meltResponse = await readSuccessResponseJsonOrThrow( const meltResponse = await readSuccessResponseJsonOrThrow(
resp, resp,
codecForExchangeMeltResponse(), codecForExchangeMeltResponse(),
@ -339,7 +346,13 @@ async function refreshReveal(
refreshSession.exchangeBaseUrl, refreshSession.exchangeBaseUrl,
); );
const resp = await ws.http.postJson(reqUrl.href, req); const resp = await ws.runSequentialized(
[EXCHANGE_COINS_LOCK],
async () => {
return await ws.http.postJson(reqUrl.href, req);
},
);
const reveal = await readSuccessResponseJsonOrThrow( const reveal = await readSuccessResponseJsonOrThrow(
resp, resp,
codecForExchangeRevealResponse(), codecForExchangeRevealResponse(),
@ -446,6 +459,9 @@ async function incrementRefreshRetry(
} }
} }
/**
* Actually process a refresh group that has been created.
*/
export async function processRefreshGroup( export async function processRefreshGroup(
ws: InternalWalletState, ws: InternalWalletState,
refreshGroupId: string, refreshGroupId: string,
@ -557,15 +573,7 @@ export async function createRefreshGroup(
await tx.put(Stores.refreshGroups, refreshGroup); await tx.put(Stores.refreshGroups, refreshGroup);
const processAsync = async (): Promise<void> => { logger.trace(`created refresh group ${refreshGroupId}`);
try {
await processRefreshGroup(ws, refreshGroupId);
} catch (e) {
logger.trace(`Error during refresh: ${e}`);
}
};
processAsync();
return { return {
refreshGroupId, refreshGroupId,

View File

@ -22,11 +22,15 @@ import { Logger } from "../util/logging";
import { PendingOperationsResponse } from "../types/pending"; import { PendingOperationsResponse } from "../types/pending";
import { WalletNotification } from "../types/notifications"; import { WalletNotification } from "../types/notifications";
import { Database } from "../util/query"; import { Database } from "../util/query";
import { openPromise, OpenedPromise } from "../util/promiseUtils";
type NotificationListener = (n: WalletNotification) => void; type NotificationListener = (n: WalletNotification) => void;
const logger = new Logger("state.ts"); const logger = new Logger("state.ts");
export const EXCHANGE_COINS_LOCK = "exchange-coins-lock";
export const EXCHANGE_RESERVES_LOCK = "exchange-reserves-lock";
export class InternalWalletState { export class InternalWalletState {
cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {};
memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap(); memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
@ -41,6 +45,16 @@ export class InternalWalletState {
listeners: NotificationListener[] = []; listeners: NotificationListener[] = [];
/**
* Promises that are waiting for a particular resource.
*/
private resourceWaiters: Record<string, OpenedPromise<void>[]> = {};
/**
* Resources that are currently locked.
*/
private resourceLocks: Set<string> = new Set();
constructor( constructor(
public db: Database, public db: Database,
public http: HttpRequestLibrary, public http: HttpRequestLibrary,
@ -49,7 +63,7 @@ export class InternalWalletState {
this.cryptoApi = new CryptoApi(cryptoWorkerFactory); this.cryptoApi = new CryptoApi(cryptoWorkerFactory);
} }
public notify(n: WalletNotification): void { notify(n: WalletNotification): void {
logger.trace("Notification", n); logger.trace("Notification", n);
for (const l of this.listeners) { for (const l of this.listeners) {
const nc = JSON.parse(JSON.stringify(n)); const nc = JSON.parse(JSON.stringify(n));
@ -62,4 +76,41 @@ export class InternalWalletState {
addNotificationListener(f: (n: WalletNotification) => void): void { addNotificationListener(f: (n: WalletNotification) => void): void {
this.listeners.push(f); this.listeners.push(f);
} }
/**
* Run an async function after acquiring a list of locks, identified
* by string tokens.
*/
async runSequentialized<T>(tokens: string[], f: () => Promise<T>) {
// Make sure locks are always acquired in the same order
tokens = [... tokens].sort();
for (const token of tokens) {
if (this.resourceLocks.has(token)) {
const p = openPromise<void>();
let waitList = this.resourceWaiters[token];
if (!waitList) {
waitList = this.resourceWaiters[token] = [];
}
waitList.push(p);
await p.promise;
}
this.resourceLocks.add(token);
}
try {
logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`);
const result = await f();
logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`);
return result;
} finally {
for (const token of tokens) {
this.resourceLocks.delete(token);
let waiter = (this.resourceWaiters[token] ?? []).shift();
if (waiter) {
waiter.resolve();
}
}
}
}
} }