towards handling frozen refreshes

This commit is contained in:
Florian Dold 2021-08-24 14:25:46 +02:00
parent 7553ae7c74
commit 408d8e9fc8
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
8 changed files with 174 additions and 40 deletions

View File

@ -0,0 +1,38 @@
/*
This file is part of GNU Taler
(C) 2021 Taler Systems S.A.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
/**
* Functional programming utilities.
*/
export namespace fnutil {
export function all<T>(arr: T[], f: (x: T) => boolean): boolean {
for (const x of arr) {
if (!f(x)) {
return false;
}
}
return true;
}
export function any<T>(arr: T[], f: (x: T) => boolean): boolean {
for (const x of arr) {
if (f(x)) {
return true;
}
}
return false;
}
}

View File

@ -19,3 +19,4 @@ export * from "./walletTypes.js";
export * from "./i18n.js"; export * from "./i18n.js";
export * from "./logging.js"; export * from "./logging.js";
export * from "./url.js"; export * from "./url.js";
export { fnutil } from "./fnutils.js";

View File

@ -915,6 +915,17 @@ export interface TipRecord {
retryInfo: RetryInfo; retryInfo: RetryInfo;
} }
export enum RefreshCoinStatus {
Pending = "pending",
Finished = "finished",
/**
* The refresh for this coin has been frozen, because of a permanent error.
* More info in lastErrorPerCoin.
*/
Frozen = "frozen",
}
export interface RefreshGroupRecord { export interface RefreshGroupRecord {
/** /**
* Retry info, even present when the operation isn't active to allow indexing * Retry info, even present when the operation isn't active to allow indexing
@ -926,8 +937,15 @@ export interface RefreshGroupRecord {
lastErrorPerCoin: { [coinIndex: number]: TalerErrorDetails }; lastErrorPerCoin: { [coinIndex: number]: TalerErrorDetails };
/**
* Unique, randomly generated identifier for this group of
* refresh operations.
*/
refreshGroupId: string; refreshGroupId: string;
/**
* Reason why this refresh group has been created.
*/
reason: RefreshReason; reason: RefreshReason;
oldCoinPubs: string[]; oldCoinPubs: string[];
@ -946,7 +964,7 @@ export interface RefreshGroupRecord {
* it will be marked as finished, but no refresh session will * it will be marked as finished, but no refresh session will
* be created. * be created.
*/ */
finishedPerCoin: boolean[]; statusPerCoin: RefreshCoinStatus[];
timestampCreated: Timestamp; timestampCreated: Timestamp;
@ -954,6 +972,11 @@ export interface RefreshGroupRecord {
* Timestamp when the refresh session finished. * Timestamp when the refresh session finished.
*/ */
timestampFinished: Timestamp | undefined; timestampFinished: Timestamp | undefined;
/**
* No coins are pending, but at least one is frozen.
*/
frozen?: boolean;
} }
/** /**
@ -1162,6 +1185,9 @@ export interface PurchaseRecord {
/** /**
* Downloaded and parsed proposal data. * Downloaded and parsed proposal data.
*
* FIXME: Move this into another object store,
* to improve read/write perf on purchases.
*/ */
download: ProposalDownload; download: ProposalDownload;

View File

@ -66,6 +66,7 @@ import {
CoinSourceType, CoinSourceType,
CoinStatus, CoinStatus,
ProposalStatus, ProposalStatus,
RefreshCoinStatus,
RefundState, RefundState,
WALLET_BACKUP_STATE_KEY, WALLET_BACKUP_STATE_KEY,
} from "../../db.js"; } from "../../db.js";
@ -440,7 +441,7 @@ export async function exportBackup(
estimated_output_amount: Amounts.stringify( estimated_output_amount: Amounts.stringify(
rg.estimatedOutputPerCoin[i], rg.estimatedOutputPerCoin[i],
), ),
finished: rg.finishedPerCoin[i], finished: rg.statusPerCoin[i] === RefreshCoinStatus.Finished,
input_amount: Amounts.stringify(rg.inputPerCoin[i]), input_amount: Amounts.stringify(rg.inputPerCoin[i]),
refresh_session: refreshSession, refresh_session: refreshSession,
}); });

View File

@ -45,6 +45,7 @@ import {
RefreshSessionRecord, RefreshSessionRecord,
WireInfo, WireInfo,
WalletStoresV1, WalletStoresV1,
RefreshCoinStatus,
} from "../../db.js"; } from "../../db.js";
import { PayCoinSelection } from "../../util/coinSelection.js"; import { PayCoinSelection } from "../../util/coinSelection.js";
import { j2s } from "@gnu-taler/taler-util"; import { j2s } from "@gnu-taler/taler-util";
@ -831,8 +832,10 @@ export async function importBackup(
lastError: undefined, lastError: undefined,
lastErrorPerCoin: {}, lastErrorPerCoin: {},
oldCoinPubs: backupRefreshGroup.old_coins.map((x) => x.coin_pub), oldCoinPubs: backupRefreshGroup.old_coins.map((x) => x.coin_pub),
finishedPerCoin: backupRefreshGroup.old_coins.map( statusPerCoin: backupRefreshGroup.old_coins.map((x) =>
(x) => x.finished, x.finished
? RefreshCoinStatus.Finished
: RefreshCoinStatus.Pending,
), ),
inputPerCoin: backupRefreshGroup.old_coins.map((x) => inputPerCoin: backupRefreshGroup.old_coins.map((x) =>
Amounts.parseOrThrow(x.input_amount), Amounts.parseOrThrow(x.input_amount),

View File

@ -27,6 +27,7 @@ import {
AbortStatus, AbortStatus,
WalletStoresV1, WalletStoresV1,
BackupProviderStateTag, BackupProviderStateTag,
RefreshCoinStatus,
} from "../db.js"; } from "../db.js";
import { import {
PendingOperationsResponse, PendingOperationsResponse,
@ -111,12 +112,17 @@ async function gatherRefreshPending(
if (r.timestampFinished) { if (r.timestampFinished) {
return; return;
} }
if (r.frozen) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingTaskType.Refresh, type: PendingTaskType.Refresh,
givesLifeness: true, givesLifeness: true,
timestampDue: r.retryInfo.nextRetry, timestampDue: r.retryInfo.nextRetry,
refreshGroupId: r.refreshGroupId, refreshGroupId: r.refreshGroupId,
finishedPerCoin: r.finishedPerCoin, finishedPerCoin: r.statusPerCoin.map(
(x) => x === RefreshCoinStatus.Finished,
),
retryInfo: r.retryInfo, retryInfo: r.retryInfo,
}); });
}); });

View File

@ -20,6 +20,7 @@ import {
CoinSourceType, CoinSourceType,
CoinStatus, CoinStatus,
DenominationRecord, DenominationRecord,
RefreshCoinStatus,
RefreshGroupRecord, RefreshGroupRecord,
RefreshPlanchet, RefreshPlanchet,
WalletStoresV1, WalletStoresV1,
@ -28,6 +29,7 @@ import {
codecForExchangeMeltResponse, codecForExchangeMeltResponse,
codecForExchangeRevealResponse, codecForExchangeRevealResponse,
CoinPublicKey, CoinPublicKey,
fnutil,
NotificationType, NotificationType,
RefreshGroupId, RefreshGroupId,
RefreshReason, RefreshReason,
@ -37,7 +39,11 @@ import {
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { AmountJson, Amounts } from "@gnu-taler/taler-util"; import { AmountJson, Amounts } from "@gnu-taler/taler-util";
import { amountToPretty } from "@gnu-taler/taler-util"; import { amountToPretty } from "@gnu-taler/taler-util";
import { readSuccessResponseJsonOrThrow } from "../util/http.js"; import {
HttpResponseStatus,
readSuccessResponseJsonOrThrow,
readUnexpectedResponseDetails,
} from "../util/http.js";
import { checkDbInvariant } from "../util/invariants.js"; import { checkDbInvariant } from "../util/invariants.js";
import { Logger } from "@gnu-taler/taler-util"; import { Logger } from "@gnu-taler/taler-util";
import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js"; import { initRetryInfo, updateRetryInfoTimeout } from "../util/retries.js";
@ -99,6 +105,26 @@ export function getTotalRefreshCost(
return totalCost; return totalCost;
} }
function updateGroupStatus(rg: RefreshGroupRecord): void {
let allDone = fnutil.all(
rg.statusPerCoin,
(x) => x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Frozen,
);
let anyFrozen = fnutil.any(
rg.statusPerCoin,
(x) => x === RefreshCoinStatus.Frozen,
);
if (allDone) {
if (anyFrozen) {
rg.frozen = true;
rg.retryInfo = initRetryInfo();
} else {
rg.timestampFinished = getTimestampNow();
rg.retryInfo = initRetryInfo();
}
}
}
/** /**
* Create a refresh session for one particular coin inside a refresh group. * Create a refresh session for one particular coin inside a refresh group.
*/ */
@ -121,7 +147,9 @@ async function refreshCreateSession(
if (!refreshGroup) { if (!refreshGroup) {
return; return;
} }
if (refreshGroup.finishedPerCoin[coinIndex]) { if (
refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished
) {
return; return;
} }
const existingRefreshSession = const existingRefreshSession =
@ -211,18 +239,9 @@ async function refreshCreateSession(
if (!rg) { if (!rg) {
return; return;
} }
rg.finishedPerCoin[coinIndex] = true; rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
let allDone = true; updateGroupStatus(rg);
for (const f of rg.finishedPerCoin) {
if (!f) {
allDone = false;
break;
}
}
if (allDone) {
rg.timestampFinished = getTimestampNow();
rg.retryInfo = initRetryInfo();
}
await tx.refreshGroups.put(rg); await tx.refreshGroups.put(rg);
}); });
ws.notify({ type: NotificationType.RefreshUnwarranted }); ws.notify({ type: NotificationType.RefreshUnwarranted });
@ -358,6 +377,31 @@ async function refreshMelt(
}); });
}); });
if (resp.status === HttpResponseStatus.NotFound) {
const errDetails = await readUnexpectedResponseDetails(resp);
await ws.db
.mktx((x) => ({
refreshGroups: x.refreshGroups,
}))
.runReadWrite(async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
}
if (rg.timestampFinished) {
return;
}
if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
return;
}
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Frozen;
rg.lastErrorPerCoin[coinIndex] = errDetails;
updateGroupStatus(rg);
await tx.refreshGroups.put(rg);
});
return;
}
const meltResponse = await readSuccessResponseJsonOrThrow( const meltResponse = await readSuccessResponseJsonOrThrow(
resp, resp,
codecForExchangeMeltResponse(), codecForExchangeMeltResponse(),
@ -598,18 +642,8 @@ async function refreshReveal(
if (!rs) { if (!rs) {
return; return;
} }
rg.finishedPerCoin[coinIndex] = true; rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
let allDone = true; updateGroupStatus(rg);
for (const f of rg.finishedPerCoin) {
if (!f) {
allDone = false;
break;
}
}
if (allDone) {
rg.timestampFinished = getTimestampNow();
rg.retryInfo = initRetryInfo();
}
for (const coin of coins) { for (const coin of coins) {
await tx.coins.put(coin); await tx.coins.put(coin);
} }
@ -728,7 +762,7 @@ async function processRefreshSession(
if (!refreshGroup) { if (!refreshGroup) {
return; return;
} }
if (refreshGroup.finishedPerCoin[coinIndex]) { if (refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished) {
return; return;
} }
if (!refreshGroup.refreshSessionPerCoin[coinIndex]) { if (!refreshGroup.refreshSessionPerCoin[coinIndex]) {
@ -744,7 +778,7 @@ async function processRefreshSession(
} }
const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
if (!refreshSession) { if (!refreshSession) {
if (!refreshGroup.finishedPerCoin[coinIndex]) { if (refreshGroup.statusPerCoin[coinIndex] !== RefreshCoinStatus.Finished) {
throw Error( throw Error(
"BUG: refresh session was not created and coin not marked as finished", "BUG: refresh session was not created and coin not marked as finished",
); );
@ -826,13 +860,13 @@ export async function createRefreshGroup(
const refreshGroup: RefreshGroupRecord = { const refreshGroup: RefreshGroupRecord = {
timestampFinished: undefined, timestampFinished: undefined,
finishedPerCoin: oldCoinPubs.map((x) => false), statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending),
lastError: undefined, lastError: undefined,
lastErrorPerCoin: {}, lastErrorPerCoin: {},
oldCoinPubs: oldCoinPubs.map((x) => x.coinPub), oldCoinPubs: oldCoinPubs.map((x) => x.coinPub),
reason, reason,
refreshGroupId, refreshGroupId,
refreshSessionPerCoin: oldCoinPubs.map((x) => undefined), refreshSessionPerCoin: oldCoinPubs.map(() => undefined),
retryInfo: initRetryInfo(), retryInfo: initRetryInfo(),
inputPerCoin, inputPerCoin,
estimatedOutputPerCoin, estimatedOutputPerCoin,

View File

@ -24,10 +24,7 @@
/** /**
* Imports * Imports
*/ */
import { import { OperationFailedError, makeErrorDetails } from "../errors.js";
OperationFailedError,
makeErrorDetails,
} from "../errors.js";
import { import {
Logger, Logger,
Duration, Duration,
@ -68,6 +65,7 @@ export enum HttpResponseStatus {
Gone = 210, Gone = 210,
NotModified = 304, NotModified = 304,
PaymentRequired = 402, PaymentRequired = 402,
NotFound = 404,
Conflict = 409, Conflict = 409,
} }
@ -158,6 +156,33 @@ export async function readTalerErrorResponse(
return errJson; return errJson;
} }
export async function readUnexpectedResponseDetails(
httpResponse: HttpResponse,
): Promise<TalerErrorDetails> {
const errJson = await httpResponse.json();
const talerErrorCode = errJson.code;
if (typeof talerErrorCode !== "number") {
return makeErrorDetails(
TalerErrorCode.WALLET_RECEIVED_MALFORMED_RESPONSE,
"Error response did not contain error code",
{
requestUrl: httpResponse.requestUrl,
requestMethod: httpResponse.requestMethod,
httpStatusCode: httpResponse.status,
},
);
}
return makeErrorDetails(
TalerErrorCode.WALLET_UNEXPECTED_REQUEST_ERROR,
"Unexpected error code in response",
{
requestUrl: httpResponse.requestUrl,
httpStatusCode: httpResponse.status,
errorResponse: errJson,
},
);
}
export async function readSuccessResponseJsonOrErrorCode<T>( export async function readSuccessResponseJsonOrErrorCode<T>(
httpResponse: HttpResponse, httpResponse: HttpResponse,
codec: Codec<T>, codec: Codec<T>,