throttling diagnostics and request timeouts

This commit is contained in:
Florian Dold 2020-08-20 16:27:20 +05:30
parent ddf9171c5b
commit 421e613f92
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
11 changed files with 159 additions and 35 deletions

View File

@ -3202,6 +3202,13 @@ export enum TalerErrorCode {
*/
WALLET_WITHDRAWAL_OPERATION_ABORTED_BY_BANK = 7012,
/**
* An HTTP request made by the wallet timed out.
* Returned with an HTTP status code of #MHD_HTTP_UNINITIALIZED (0).
* (A value of 0 indicates that the error is generated client-side).
*/
WALLET_HTTP_REQUEST_TIMEOUT = 7013,
/**
* End of error code range.
* Returned with an HTTP status code of #MHD_HTTP_UNINITIALIZED (0).

View File

@ -29,6 +29,7 @@ import { RequestThrottler } from "../util/RequestThrottler";
import Axios from "axios";
import { OperationFailedError, makeErrorDetails } from "../operations/errors";
import { TalerErrorCode } from "../TalerErrorCode";
import { URL } from "../util/url";
/**
* Implementation of the HTTP request library interface for node.
@ -50,8 +51,20 @@ export class NodeHttpLib implements HttpRequestLibrary {
body: any,
opt?: HttpRequestOptions,
): Promise<HttpResponse> {
const parsedUrl = new URL(url);
if (this.throttlingEnabled && this.throttle.applyThrottle(url)) {
throw Error("request throttled");
throw OperationFailedError.fromCode(
TalerErrorCode.WALLET_HTTP_REQUEST_THROTTLED,
`request to origin ${parsedUrl.origin} was throttled`,
{
requestMethod: method,
requestUrl: url,
throttleStats: this.throttle.getThrottleStats(url),
});
}
let timeout: number | undefined;
if (typeof opt?.timeout?.d_ms === "number") {
timeout = opt.timeout.d_ms;
}
const resp = await Axios({
method,
@ -61,6 +74,7 @@ export class NodeHttpLib implements HttpRequestLibrary {
validateStatus: () => true,
transformResponse: (x) => x,
data: body,
timeout,
});
const respText = resp.data;

View File

@ -43,7 +43,7 @@ import {
WALLET_CACHE_BREAKER_CLIENT_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "./versions";
import { getTimestampNow } from "../util/time";
import { getTimestampNow, Duration } from "../util/time";
import { compare } from "../util/libtoolVersion";
import { createRecoupGroup, processRecoupGroup } from "./recoup";
import { TalerErrorCode } from "../TalerErrorCode";
@ -96,6 +96,10 @@ async function setExchangeError(
await ws.db.mutate(Stores.exchanges, baseUrl, mut);
}
function getExchangeRequestTimeout(e: ExchangeRecord): Duration {
return { d_ms: 5000 };
}
/**
* Fetch the exchange's /keys and update our database accordingly.
*
@ -117,7 +121,9 @@ async function updateExchangeWithKeys(
const keysUrl = new URL("keys", baseUrl);
keysUrl.searchParams.set("cacheBreaker", WALLET_CACHE_BREAKER_CLIENT_VERSION);
const resp = await ws.http.get(keysUrl.href);
const resp = await ws.http.get(keysUrl.href, {
timeout: getExchangeRequestTimeout(existingExchangeRecord),
});
const exchangeKeysJson = await readSuccessResponseJsonOrThrow(
resp,
codecForExchangeKeysJson(),
@ -303,7 +309,10 @@ async function updateExchangeWithTermsOfService(
Accept: "text/plain",
};
const resp = await ws.http.get(reqUrl.href, { headers });
const resp = await ws.http.get(reqUrl.href, {
headers,
timeout: getExchangeRequestTimeout(exchange),
});
const tosText = await readSuccessResponseTextOrThrow(resp);
const tosEtag = resp.headers.get("etag") || undefined;
@ -361,7 +370,9 @@ async function updateExchangeWithWireInfo(
const reqUrl = new URL("wire", exchangeBaseUrl);
reqUrl.searchParams.set("cacheBreaker", WALLET_CACHE_BREAKER_CLIENT_VERSION);
const resp = await ws.http.get(reqUrl.href);
const resp = await ws.http.get(reqUrl.href, {
timeout: getExchangeRequestTimeout(exchange),
});
const wireInfo = await readSuccessResponseJsonOrThrow(
resp,
codecForExchangeWireJson(),

View File

@ -35,6 +35,7 @@ import {
updateRetryInfoTimeout,
PayEventRecord,
WalletContractData,
getRetryDuration,
} from "../types/dbTypes";
import { NotificationType } from "../types/notifications";
import {
@ -58,7 +59,13 @@ import { parsePayUri } from "../util/taleruri";
import { guardOperationException, OperationFailedError } from "./errors";
import { createRefreshGroup, getTotalRefreshCost } from "./refresh";
import { InternalWalletState, EXCHANGE_COINS_LOCK } from "./state";
import { getTimestampNow, timestampAddDuration } from "../util/time";
import {
getTimestampNow,
timestampAddDuration,
Duration,
durationMax,
durationMin,
} from "../util/time";
import { strcmp, canonicalJson } from "../util/helpers";
import {
readSuccessResponseJsonOrThrow,
@ -588,6 +595,17 @@ async function resetDownloadProposalRetry(
});
}
function getProposalRequestTimeout(proposal: ProposalRecord): Duration {
return durationMax(
{ d_ms: 60000 },
durationMin({ d_ms: 5000 }, getRetryDuration(proposal.retryInfo)),
);
}
function getPurchaseRequestTimeout(purchase: PurchaseRecord): Duration {
return { d_ms: 5000 };
}
async function processDownloadProposalImpl(
ws: InternalWalletState,
proposalId: string,
@ -620,7 +638,9 @@ async function processDownloadProposalImpl(
requestBody.token = proposal.claimToken;
}
const resp = await ws.http.postJson(orderClaimUrl, requestBody);
const resp = await ws.http.postJson(orderClaimUrl, requestBody, {
timeout: getProposalRequestTimeout(proposal),
});
const proposalResp = await readSuccessResponseJsonOrThrow(
resp,
codecForProposal(),
@ -886,7 +906,9 @@ export async function submitPay(
logger.trace("making pay request", JSON.stringify(reqBody, undefined, 2));
const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], () =>
ws.http.postJson(payUrl, reqBody),
ws.http.postJson(payUrl, reqBody, {
timeout: getPurchaseRequestTimeout(purchase),
}),
);
const merchantResp = await readSuccessResponseJsonOrThrow(

View File

@ -40,7 +40,7 @@ import {
import { codecForRecoupConfirmation } from "../types/talerTypes";
import { NotificationType } from "../types/notifications";
import { forceQueryReserve } from "./reserves";
import { forceQueryReserve, getReserveRequestTimeout } from "./reserves";
import { Amounts } from "../util/amounts";
import { createRefreshGroup, processRefreshGroup } from "./refresh";
@ -154,7 +154,9 @@ async function recoupWithdrawCoin(
const recoupRequest = await ws.cryptoApi.createRecoupRequest(coin);
const reqUrl = new URL(`/coins/${coin.coinPub}/recoup`, coin.exchangeBaseUrl);
const resp = await ws.http.postJson(reqUrl.href, recoupRequest);
const resp = await ws.http.postJson(reqUrl.href, recoupRequest, {
timeout: getReserveRequestTimeout(reserve),
});
const recoupConfirmation = await readSuccessResponseJsonOrThrow(
resp,
codecForRecoupConfirmation(),

View File

@ -42,7 +42,7 @@ import {
import { guardOperationException } from "./errors";
import { NotificationType } from "../types/notifications";
import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto";
import { getTimestampNow } from "../util/time";
import { getTimestampNow, Duration } from "../util/time";
import { readSuccessResponseJsonOrThrow, HttpResponse } from "../util/http";
import {
codecForExchangeMeltResponse,
@ -211,6 +211,10 @@ async function refreshCreateSession(
ws.notify({ type: NotificationType.RefreshStarted });
}
function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration {
return { d_ms: 5000 };
}
async function refreshMelt(
ws: InternalWalletState,
refreshGroupId: string,
@ -249,12 +253,11 @@ async function refreshMelt(
};
logger.trace(`melt request for coin:`, meltReq);
const resp = await ws.runSequentialized(
[EXCHANGE_COINS_LOCK],
async () => {
return await ws.http.postJson(reqUrl.href, meltReq);
},
);
const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], async () => {
return await ws.http.postJson(reqUrl.href, meltReq, {
timeout: getRefreshRequestTimeout(refreshGroup),
});
});
const meltResponse = await readSuccessResponseJsonOrThrow(
resp,
@ -346,12 +349,11 @@ async function refreshReveal(
refreshSession.exchangeBaseUrl,
);
const resp = await ws.runSequentialized(
[EXCHANGE_COINS_LOCK],
async () => {
return await ws.http.postJson(reqUrl.href, req);
},
);
const resp = await ws.runSequentialized([EXCHANGE_COINS_LOCK], async () => {
return await ws.http.postJson(reqUrl.href, req, {
timeout: getRefreshRequestTimeout(refreshGroup),
});
});
const reveal = await readSuccessResponseJsonOrThrow(
resp,

View File

@ -35,6 +35,7 @@ import {
WithdrawalSourceType,
ReserveHistoryRecord,
ReserveBankInfo,
getRetryDuration,
} from "../types/dbTypes";
import { Logger } from "../util/logging";
import { Amounts } from "../util/amounts";
@ -64,7 +65,12 @@ import {
} from "./errors";
import { NotificationType } from "../types/notifications";
import { codecForReserveStatus } from "../types/ReserveStatus";
import { getTimestampNow } from "../util/time";
import {
getTimestampNow,
Duration,
durationMin,
durationMax,
} from "../util/time";
import {
reconcileReserveHistory,
summarizeReserveHistory,
@ -331,10 +337,16 @@ async function registerReserveWithBank(
return;
}
const bankStatusUrl = bankInfo.statusUrl;
const httpResp = await ws.http.postJson(bankStatusUrl, {
reserve_pub: reservePub,
selected_exchange: bankInfo.exchangePaytoUri,
});
const httpResp = await ws.http.postJson(
bankStatusUrl,
{
reserve_pub: reservePub,
selected_exchange: bankInfo.exchangePaytoUri,
},
{
timeout: getReserveRequestTimeout(reserve),
},
);
await readSuccessResponseJsonOrThrow(
httpResp,
codecForBankWithdrawalOperationPostResponse(),
@ -371,6 +383,13 @@ async function processReserveBankStatus(
);
}
export function getReserveRequestTimeout(r: ReserveRecord): Duration {
return durationMax(
{ d_ms: 60000 },
durationMin({ d_ms: 5000 }, getRetryDuration(r.retryInfo)),
);
}
async function processReserveBankStatusImpl(
ws: InternalWalletState,
reservePub: string,
@ -388,7 +407,9 @@ async function processReserveBankStatusImpl(
return;
}
const statusResp = await ws.http.get(bankStatusUrl);
const statusResp = await ws.http.get(bankStatusUrl, {
timeout: getReserveRequestTimeout(reserve),
});
const status = await readSuccessResponseJsonOrThrow(
statusResp,
codecForWithdrawOperationStatusResponse(),
@ -501,6 +522,9 @@ async function updateReserve(
const resp = await ws.http.get(
new URL(`reserves/${reservePub}`, reserve.exchangeBaseUrl).href,
{
timeout: getReserveRequestTimeout(reserve),
},
);
const result = await readSuccessResponseJsonOrErrorCode(

View File

@ -117,6 +117,17 @@ export function updateRetryInfoTimeout(
r.nextRetry = { t_ms: t };
}
export function getRetryDuration(
r: RetryInfo,
p: RetryPolicy = defaultRetryPolicy,
): Duration {
if (p.backoffDelta.d_ms === "forever") {
return { d_ms: "forever" };
}
const t = p.backoffDelta.d_ms * Math.pow(p.backoffBase, r.retryCounter);
return { d_ms: t };
}
export function initRetryInfo(
active = true,
p: RetryPolicy = defaultRetryPolicy,

View File

@ -30,25 +30,25 @@ const logger = new Logger("RequestThrottler.ts");
/**
* Maximum request per second, per origin.
*/
const MAX_PER_SECOND = 50;
const MAX_PER_SECOND = 100;
/**
* Maximum request per minute, per origin.
*/
const MAX_PER_MINUTE = 100;
const MAX_PER_MINUTE = 500;
/**
* Maximum request per hour, per origin.
*/
const MAX_PER_HOUR = 1000;
const MAX_PER_HOUR = 2000;
/**
* Throttling state for one origin.
*/
class OriginState {
private tokensSecond: number = MAX_PER_SECOND;
private tokensMinute: number = MAX_PER_MINUTE;
private tokensHour: number = MAX_PER_HOUR;
tokensSecond: number = MAX_PER_SECOND;
tokensMinute: number = MAX_PER_MINUTE;
tokensHour: number = MAX_PER_HOUR;
private lastUpdate = getTimestampNow();
private refill(): void {
@ -57,6 +57,9 @@ class OriginState {
if (d.d_ms === "forever") {
throw Error("assertion failed");
}
if (d.d_ms < 0) {
return;
}
const d_s = d.d_ms / 1000;
this.tokensSecond = Math.min(
MAX_PER_SECOND,
@ -129,4 +132,20 @@ export class RequestThrottler {
const origin = new URL(requestUrl).origin;
return this.getState(origin).applyThrottle();
}
/**
* Get the throttle statistics for a particular URL.
*/
getThrottleStats(requestUrl: string): Record<string, unknown> {
const origin = new URL(requestUrl).origin;
const state = this.getState(origin);
return {
tokensHour: state.tokensHour,
tokensMinute: state.tokensMinute,
tokensSecond: state.tokensSecond,
maxTokensHour: MAX_PER_HOUR,
maxTokensMinute: MAX_PER_MINUTE,
maxTokensSecond: MAX_PER_SECOND,
}
}
}

View File

@ -26,6 +26,7 @@ import { Codec } from "./codec";
import { OperationFailedError, makeErrorDetails } from "../operations/errors";
import { TalerErrorCode } from "../TalerErrorCode";
import { Logger } from "./logging";
import { Duration } from "./time";
const logger = new Logger("http.ts");
@ -43,6 +44,7 @@ export interface HttpResponse {
export interface HttpRequestOptions {
headers?: { [name: string]: string };
timeout?: Duration,
}
export enum HttpResponseStatus {

View File

@ -95,6 +95,16 @@ export function durationMin(d1: Duration, d2: Duration): Duration {
return { d_ms: Math.min(d1.d_ms, d2.d_ms) };
}
export function durationMax(d1: Duration, d2: Duration): Duration {
if (d1.d_ms === "forever") {
return { d_ms: "forever" };
}
if (d2.d_ms === "forever") {
return { d_ms: "forever" };
}
return { d_ms: Math.max(d1.d_ms, d2.d_ms) };
}
export function timestampCmp(t1: Timestamp, t2: Timestamp): number {
if (t1.t_ms === "never") {
if (t2.t_ms === "never") {