schedule exchange updating

This commit is contained in:
Florian Dold 2020-09-02 14:44:36 +05:30
parent 8d0081b622
commit 8a3ac7f08b
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
6 changed files with 113 additions and 40 deletions

View File

@ -8,7 +8,7 @@ import { IDBFactory, IDBDatabase } from "idb-bridge";
* with each major change. When incrementing the major version, * with each major change. When incrementing the major version,
* the wallet should import data from the previous version. * the wallet should import data from the previous version.
*/ */
const TALER_DB_NAME = "taler-walletdb-v8"; const TALER_DB_NAME = "taler-walletdb-v9";
/** /**
* Current database minor version, should be incremented * Current database minor version, should be incremented

View File

@ -30,6 +30,8 @@ import {
WireFee, WireFee,
ExchangeUpdateReason, ExchangeUpdateReason,
ExchangeUpdatedEventRecord, ExchangeUpdatedEventRecord,
initRetryInfo,
updateRetryInfoTimeout,
} from "../types/dbTypes"; } from "../types/dbTypes";
import { canonicalizeBaseUrl } from "../util/helpers"; import { canonicalizeBaseUrl } from "../util/helpers";
import * as Amounts from "../util/amounts"; import * as Amounts from "../util/amounts";
@ -43,7 +45,12 @@ import {
WALLET_CACHE_BREAKER_CLIENT_VERSION, WALLET_CACHE_BREAKER_CLIENT_VERSION,
WALLET_EXCHANGE_PROTOCOL_VERSION, WALLET_EXCHANGE_PROTOCOL_VERSION,
} from "./versions"; } from "./versions";
import { getTimestampNow, Duration, isTimestampExpired } from "../util/time"; import {
getTimestampNow,
Duration,
isTimestampExpired,
durationFromSpec,
} from "../util/time";
import { compare } from "../util/libtoolVersion"; import { compare } from "../util/libtoolVersion";
import { createRecoupGroup, processRecoupGroup } from "./recoup"; import { createRecoupGroup, processRecoupGroup } from "./recoup";
import { TalerErrorCode } from "../TalerErrorCode"; import { TalerErrorCode } from "../TalerErrorCode";
@ -56,6 +63,7 @@ import { Logger } from "../util/logging";
import { URL } from "../util/url"; import { URL } from "../util/url";
import { reconcileReserveHistory } from "../util/reserveHistoryUtil"; import { reconcileReserveHistory } from "../util/reserveHistoryUtil";
import { checkDbInvariant } from "../util/invariants"; import { checkDbInvariant } from "../util/invariants";
import { NotificationType } from "../types/notifications";
const logger = new Logger("exchanges.ts"); const logger = new Logger("exchanges.ts");
@ -86,17 +94,23 @@ async function denominationRecordFromKeys(
return d; return d;
} }
async function setExchangeError( async function handleExchangeUpdateError(
ws: InternalWalletState, ws: InternalWalletState,
baseUrl: string, baseUrl: string,
err: TalerErrorDetails, err: TalerErrorDetails,
): Promise<void> { ): Promise<void> {
logger.warn(`last error for exchange ${baseUrl}:`, err); await ws.db.runWithWriteTransaction([Stores.exchanges], async (tx) => {
const mut = (exchange: ExchangeRecord): ExchangeRecord => { const exchange = await tx.get(Stores.exchanges, baseUrl);
if (!exchange) {
return;
}
exchange.retryInfo.retryCounter++;
updateRetryInfoTimeout(exchange.retryInfo);
exchange.lastError = err; exchange.lastError = err;
return exchange; });
}; if (err) {
await ws.db.mutate(Stores.exchanges, baseUrl, mut); ws.notify({ type: NotificationType.ExchangeOperationError, error: err });
}
} }
function getExchangeRequestTimeout(e: ExchangeRecord): Duration { function getExchangeRequestTimeout(e: ExchangeRecord): Duration {
@ -142,7 +156,7 @@ async function updateExchangeWithKeys(
exchangeBaseUrl: baseUrl, exchangeBaseUrl: baseUrl,
}, },
); );
await setExchangeError(ws, baseUrl, opErr); await handleExchangeUpdateError(ws, baseUrl, opErr);
throw new OperationFailedAndReportedError(opErr); throw new OperationFailedAndReportedError(opErr);
} }
@ -158,7 +172,7 @@ async function updateExchangeWithKeys(
walletProtocolVersion: WALLET_EXCHANGE_PROTOCOL_VERSION, walletProtocolVersion: WALLET_EXCHANGE_PROTOCOL_VERSION,
}, },
); );
await setExchangeError(ws, baseUrl, opErr); await handleExchangeUpdateError(ws, baseUrl, opErr);
throw new OperationFailedAndReportedError(opErr); throw new OperationFailedAndReportedError(opErr);
} }
@ -198,10 +212,13 @@ async function updateExchangeWithKeys(
masterPublicKey: exchangeKeysJson.master_public_key, masterPublicKey: exchangeKeysJson.master_public_key,
protocolVersion: protocolVersion, protocolVersion: protocolVersion,
signingKeys: exchangeKeysJson.signkeys, signingKeys: exchangeKeysJson.signkeys,
nextUpdateTime: getExpiryTimestamp(resp), nextUpdateTime: getExpiryTimestamp(resp, {
minDuration: durationFromSpec({ hours: 1 }),
}),
}; };
r.updateStatus = ExchangeUpdateStatus.FetchWire; r.updateStatus = ExchangeUpdateStatus.FetchWire;
r.lastError = undefined; r.lastError = undefined;
r.retryInfo = initRetryInfo(false);
await tx.put(Stores.exchanges, r); await tx.put(Stores.exchanges, r);
for (const newDenom of newDenominations) { for (const newDenom of newDenominations) {
@ -433,6 +450,7 @@ async function updateExchangeWithWireInfo(
}; };
r.updateStatus = ExchangeUpdateStatus.FetchTerms; r.updateStatus = ExchangeUpdateStatus.FetchTerms;
r.lastError = undefined; r.lastError = undefined;
r.retryInfo = initRetryInfo(false);
await tx.put(Stores.exchanges, r); await tx.put(Stores.exchanges, r);
}); });
} }
@ -443,7 +461,7 @@ export async function updateExchangeFromUrl(
forceNow = false, forceNow = false,
): Promise<ExchangeRecord> { ): Promise<ExchangeRecord> {
const onOpErr = (e: TalerErrorDetails): Promise<void> => const onOpErr = (e: TalerErrorDetails): Promise<void> =>
setExchangeError(ws, baseUrl, e); handleExchangeUpdateError(ws, baseUrl, e);
return await guardOperationException( return await guardOperationException(
() => updateExchangeFromUrlImpl(ws, baseUrl, forceNow), () => updateExchangeFromUrlImpl(ws, baseUrl, forceNow),
onOpErr, onOpErr,
@ -460,6 +478,7 @@ async function updateExchangeFromUrlImpl(
baseUrl: string, baseUrl: string,
forceNow = false, forceNow = false,
): Promise<ExchangeRecord> { ): Promise<ExchangeRecord> {
logger.trace(`updating exchange info for ${baseUrl}`);
const now = getTimestampNow(); const now = getTimestampNow();
baseUrl = canonicalizeBaseUrl(baseUrl); baseUrl = canonicalizeBaseUrl(baseUrl);
@ -480,6 +499,7 @@ async function updateExchangeFromUrlImpl(
termsOfServiceAcceptedTimestamp: undefined, termsOfServiceAcceptedTimestamp: undefined,
termsOfServiceLastEtag: undefined, termsOfServiceLastEtag: undefined,
termsOfServiceText: undefined, termsOfServiceText: undefined,
retryInfo: initRetryInfo(false),
}; };
await ws.db.put(Stores.exchanges, newExchangeRecord); await ws.db.put(Stores.exchanges, newExchangeRecord);
} else { } else {
@ -488,40 +508,30 @@ async function updateExchangeFromUrlImpl(
if (!rec) { if (!rec) {
return; return;
} }
if (rec.updateStatus != ExchangeUpdateStatus.FetchKeys && !forceNow) { if (rec.updateStatus != ExchangeUpdateStatus.FetchKeys) {
const t = rec.details?.nextUpdateTime;
if (!forceNow && t && !isTimestampExpired(t)) {
return; return;
} }
}
if (rec.updateStatus != ExchangeUpdateStatus.FetchKeys && forceNow) { if (rec.updateStatus != ExchangeUpdateStatus.FetchKeys && forceNow) {
rec.updateReason = ExchangeUpdateReason.Forced; rec.updateReason = ExchangeUpdateReason.Forced;
} }
rec.updateStarted = now; rec.updateStarted = now;
rec.updateStatus = ExchangeUpdateStatus.FetchKeys; rec.updateStatus = ExchangeUpdateStatus.FetchKeys;
rec.lastError = undefined; rec.lastError = undefined;
rec.retryInfo = initRetryInfo(false);
t.put(Stores.exchanges, rec); t.put(Stores.exchanges, rec);
}); });
} }
r = await ws.db.get(Stores.exchanges, baseUrl);
checkDbInvariant(!!r);
const t = r.details?.nextUpdateTime;
if (!forceNow && t && !isTimestampExpired(t)) {
logger.trace("using cached exchange info");
return r;
}
await updateExchangeWithKeys(ws, baseUrl); await updateExchangeWithKeys(ws, baseUrl);
await updateExchangeWithWireInfo(ws, baseUrl); await updateExchangeWithWireInfo(ws, baseUrl);
await updateExchangeWithTermsOfService(ws, baseUrl); await updateExchangeWithTermsOfService(ws, baseUrl);
await updateExchangeFinalize(ws, baseUrl); await updateExchangeFinalize(ws, baseUrl);
const updatedExchange = await ws.db.get(Stores.exchanges, baseUrl); const updatedExchange = await ws.db.get(Stores.exchanges, baseUrl);
checkDbInvariant(!!updatedExchange);
if (!updatedExchange) {
// This should practically never happen
throw Error("exchange not found");
}
return updatedExchange; return updatedExchange;
} }

View File

@ -56,10 +56,6 @@ async function gatherExchangePending(
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false, onlyDue = false,
): Promise<void> { ): Promise<void> {
if (onlyDue) {
// FIXME: exchanges should also be updated regularly
return;
}
await tx.iter(Stores.exchanges).forEach((e) => { await tx.iter(Stores.exchanges).forEach((e) => {
switch (e.updateStatus) { switch (e.updateStatus) {
case ExchangeUpdateStatus.Finished: case ExchangeUpdateStatus.Finished:
@ -79,7 +75,7 @@ async function gatherExchangePending(
type: PendingOperationType.Bug, type: PendingOperationType.Bug,
givesLifeness: false, givesLifeness: false,
message: message:
"Exchange record does not have details, but no update in progress.", "Exchange record does not have details, but no update finished.",
details: { details: {
exchangeBaseUrl: e.baseUrl, exchangeBaseUrl: e.baseUrl,
}, },
@ -90,14 +86,28 @@ async function gatherExchangePending(
type: PendingOperationType.Bug, type: PendingOperationType.Bug,
givesLifeness: false, givesLifeness: false,
message: message:
"Exchange record does not have wire info, but no update in progress.", "Exchange record does not have wire info, but no update finished.",
details: { details: {
exchangeBaseUrl: e.baseUrl, exchangeBaseUrl: e.baseUrl,
}, },
}); });
} }
if (e.details && e.details.nextUpdateTime.t_ms < now.t_ms) {
resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate,
givesLifeness: false,
stage: ExchangeUpdateOperationStage.FetchKeys,
exchangeBaseUrl: e.baseUrl,
lastError: e.lastError,
reason: "scheduled",
});
break;
}
break; break;
case ExchangeUpdateStatus.FetchKeys: case ExchangeUpdateStatus.FetchKeys:
if (onlyDue && e.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate, type: PendingOperationType.ExchangeUpdate,
givesLifeness: false, givesLifeness: false,
@ -108,6 +118,9 @@ async function gatherExchangePending(
}); });
break; break;
case ExchangeUpdateStatus.FetchWire: case ExchangeUpdateStatus.FetchWire:
if (onlyDue && e.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate, type: PendingOperationType.ExchangeUpdate,
givesLifeness: false, givesLifeness: false,
@ -118,6 +131,9 @@ async function gatherExchangePending(
}); });
break; break;
case ExchangeUpdateStatus.FinalizeUpdate: case ExchangeUpdateStatus.FinalizeUpdate:
if (onlyDue && e.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate, type: PendingOperationType.ExchangeUpdate,
givesLifeness: false, givesLifeness: false,

View File

@ -113,6 +113,7 @@ export function updateRetryInfoTimeout(
r.nextRetry = { t_ms: "never" }; r.nextRetry = { t_ms: "never" };
return; return;
} }
r.active = true;
const t = const t =
now.t_ms + p.backoffDelta.d_ms * Math.pow(p.backoffBase, r.retryCounter); now.t_ms + p.backoffDelta.d_ms * Math.pow(p.backoffBase, r.retryCounter);
r.nextRetry = { t_ms: t }; r.nextRetry = { t_ms: t };
@ -642,6 +643,11 @@ export interface ExchangeRecord {
updateReason?: ExchangeUpdateReason; updateReason?: ExchangeUpdateReason;
lastError?: TalerErrorDetails; lastError?: TalerErrorDetails;
/**
* Retry status for fetching updated information about the exchange.
*/
retryInfo: RetryInfo;
} }

View File

@ -26,7 +26,7 @@ import { Codec } from "./codec";
import { OperationFailedError, makeErrorDetails } from "../operations/errors"; import { OperationFailedError, makeErrorDetails } from "../operations/errors";
import { TalerErrorCode } from "../TalerErrorCode"; import { TalerErrorCode } from "../TalerErrorCode";
import { Logger } from "./logging"; import { Logger } from "./logging";
import { Duration, Timestamp, getTimestampNow } from "./time"; import { Duration, Timestamp, getTimestampNow, timestampAddDuration, timestampMin, timestampMax } from "./time";
const logger = new Logger("http.ts"); const logger = new Logger("http.ts");
@ -257,15 +257,24 @@ export async function readSuccessResponseTextOrThrow<T>(
/** /**
* Get the timestamp at which the response's content is considered expired. * Get the timestamp at which the response's content is considered expired.
*/ */
export function getExpiryTimestamp(httpResponse: HttpResponse): Timestamp { export function getExpiryTimestamp(
httpResponse: HttpResponse,
opt: { minDuration?: Duration },
): Timestamp {
const expiryDateMs = new Date( const expiryDateMs = new Date(
httpResponse.headers.get("expiry") ?? "", httpResponse.headers.get("expiry") ?? "",
).getTime(); ).getTime();
let t: Timestamp;
if (Number.isNaN(expiryDateMs)) { if (Number.isNaN(expiryDateMs)) {
return getTimestampNow(); t = getTimestampNow();
} else { } else {
return { t = {
t_ms: expiryDateMs, t_ms: expiryDateMs,
};
} }
if (opt.minDuration) {
const t2 = timestampAddDuration(getTimestampNow(), opt.minDuration);
return timestampMax(t, t2);
} }
return t;
} }

View File

@ -76,6 +76,38 @@ export function timestampMin(t1: Timestamp, t2: Timestamp): Timestamp {
return { t_ms: Math.min(t1.t_ms, t2.t_ms) }; return { t_ms: Math.min(t1.t_ms, t2.t_ms) };
} }
export function timestampMax(t1: Timestamp, t2: Timestamp): Timestamp {
if (t1.t_ms === "never") {
return { t_ms: "never" };
}
if (t2.t_ms === "never") {
return { t_ms: "never" };
}
return { t_ms: Math.max(t1.t_ms, t2.t_ms) };
}
const SECONDS = 1000
const MINUTES = SECONDS * 60;
const HOURS = MINUTES * 60;
export function durationFromSpec(spec: {
seconds?: number,
hours?: number,
minutes?: number,
}): Duration {
let d_ms = 0;
if (spec.seconds) {
d_ms += spec.seconds * SECONDS;
}
if (spec.minutes) {
d_ms += spec.minutes * MINUTES;
}
if (spec.hours) {
d_ms += spec.hours * HOURS;
}
return { d_ms };
}
/** /**
* Truncate a timestamp so that that it represents a multiple * Truncate a timestamp so that that it represents a multiple
* of seconds. The timestamp is always rounded down. * of seconds. The timestamp is always rounded down.