implement and test auto-refresh

This commit is contained in:
Florian Dold 2020-09-03 20:38:26 +05:30
parent bf9c2ae7f9
commit f51a59bc72
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
9 changed files with 152 additions and 9 deletions

View File

@ -68,6 +68,7 @@ import {
AmountString, AmountString,
ApplyRefundRequest, ApplyRefundRequest,
codecForApplyRefundResponse, codecForApplyRefundResponse,
codecForAny,
} from "taler-wallet-core"; } from "taler-wallet-core";
import { URL } from "url"; import { URL } from "url";
import axios, { AxiosError } from "axios"; import axios, { AxiosError } from "axios";
@ -79,6 +80,7 @@ import {
MerchantOrderPrivateStatusResponse, MerchantOrderPrivateStatusResponse,
} from "./merchantApiTypes"; } from "./merchantApiTypes";
import { ApplyRefundResponse } from "taler-wallet-core"; import { ApplyRefundResponse } from "taler-wallet-core";
import { PendingOperationsResponse } from "taler-wallet-core";
const exec = util.promisify(require("child_process").exec); const exec = util.promisify(require("child_process").exec);
@ -1562,6 +1564,15 @@ export class WalletCli {
throw new OperationFailedError(resp.error); throw new OperationFailedError(resp.error);
} }
async getPendingOperations(): Promise<PendingOperationsResponse> {
const resp = await this.apiRequest("getPendingOperations", {});
if (resp.type === "response") {
// FIXME: validate properly!
return codecForAny().decode(resp.result);
}
throw new OperationFailedError(resp.error);
}
async getTransactions(): Promise<TransactionsResponse> { async getTransactions(): Promise<TransactionsResponse> {
const resp = await this.apiRequest("getTransactions", {}); const resp = await this.apiRequest("getTransactions", {});
if (resp.type === "response") { if (resp.type === "response") {

View File

@ -64,3 +64,4 @@ export * from "./types/talerTypes";
export * from "./types/walletTypes"; export * from "./types/walletTypes";
export * from "./types/notifications"; export * from "./types/notifications";
export * from "./types/transactions"; export * from "./types/transactions";
export * from "./types/pending";

View File

@ -303,6 +303,9 @@ async function updateExchangeFinalize(
} }
r.addComplete = true; r.addComplete = true;
r.updateStatus = ExchangeUpdateStatus.Finished; r.updateStatus = ExchangeUpdateStatus.Finished;
// Reset time to next auto refresh check,
// as now new denominations might be available.
r.nextRefreshCheck = undefined;
await tx.put(Stores.exchanges, r); await tx.put(Stores.exchanges, r);
const updateEvent: ExchangeUpdatedEventRecord = { const updateEvent: ExchangeUpdatedEventRecord = {
exchangeBaseUrl: exchange.baseUrl, exchangeBaseUrl: exchange.baseUrl,

View File

@ -36,6 +36,8 @@ import {
PayEventRecord, PayEventRecord,
WalletContractData, WalletContractData,
getRetryDuration, getRetryDuration,
CoinRecord,
DenominationRecord,
} from "../types/dbTypes"; } from "../types/dbTypes";
import { NotificationType } from "../types/notifications"; import { NotificationType } from "../types/notifications";
import { import {
@ -65,6 +67,7 @@ import {
Duration, Duration,
durationMax, durationMax,
durationMin, durationMin,
isTimestampExpired,
} from "../util/time"; } from "../util/time";
import { strcmp, canonicalJson } from "../util/helpers"; import { strcmp, canonicalJson } from "../util/helpers";
import { import {
@ -285,6 +288,19 @@ export function selectPayCoins(
return undefined; return undefined;
} }
export function isSpendableCoin(coin: CoinRecord, denom: DenominationRecord): boolean {
if (coin.suspended) {
return false;
}
if (coin.status !== CoinStatus.Fresh) {
return false;
}
if (isTimestampExpired(denom.stampExpireDeposit)) {
return false;
}
return true;
}
/** /**
* Select coins from the wallet's database that can be used * Select coins from the wallet's database that can be used
* to pay for the given contract. * to pay for the given contract.
@ -370,10 +386,7 @@ async function getCoinsForPayment(
); );
continue; continue;
} }
if (coin.suspended) { if (!isSpendableCoin(coin, denom)) {
continue;
}
if (coin.status !== CoinStatus.Fresh) {
continue; continue;
} }
acis.push({ acis.push({

View File

@ -102,7 +102,13 @@ async function gatherExchangePending(
lastError: e.lastError, lastError: e.lastError,
reason: "scheduled", reason: "scheduled",
}); });
break; }
if (e.details && (!e.nextRefreshCheck || e.nextRefreshCheck.t_ms < now.t_ms)) {
resp.pendingOperations.push({
type: PendingOperationType.ExchangeCheckRefresh,
exchangeBaseUrl: e.baseUrl,
givesLifeness: false,
});
} }
break; break;
case ExchangeUpdateStatus.FetchKeys: case ExchangeUpdateStatus.FetchKeys:

View File

@ -42,8 +42,23 @@ import {
import { guardOperationException } from "./errors"; import { guardOperationException } from "./errors";
import { NotificationType } from "../types/notifications"; import { NotificationType } from "../types/notifications";
import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto"; import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto";
import { getTimestampNow, Duration } from "../util/time"; import {
import { readSuccessResponseJsonOrThrow, HttpResponse } from "../util/http"; getTimestampNow,
Duration,
Timestamp,
isTimestampExpired,
durationFromSpec,
timestampMin,
timestampAddDuration,
timestampDifference,
durationMax,
durationMul,
} from "../util/time";
import {
readSuccessResponseJsonOrThrow,
HttpResponse,
throwUnexpectedRequestError,
} from "../util/http";
import { import {
codecForExchangeMeltResponse, codecForExchangeMeltResponse,
codecForExchangeRevealResponse, codecForExchangeRevealResponse,
@ -635,7 +650,86 @@ export async function createRefreshGroup(
}; };
} }
/**
* Timestamp after which the wallet would do the next check for an auto-refresh.
*/
function getAutoRefreshCheckThreshold(d: DenominationRecord): Timestamp {
const delta = timestampDifference(d.stampExpireWithdraw, d.stampExpireDeposit);
const deltaDiv = durationMul(delta, 0.75);
return timestampAddDuration(d.stampExpireWithdraw, deltaDiv);
}
/**
* Timestamp after which the wallet would do an auto-refresh.
*/
function getAutoRefreshExecuteThreshold(d: DenominationRecord): Timestamp {
const delta = timestampDifference(d.stampExpireWithdraw, d.stampExpireDeposit);
const deltaDiv = durationMul(delta, 0.5);
return timestampAddDuration(d.stampExpireWithdraw, deltaDiv);
}
export async function autoRefresh( export async function autoRefresh(
ws: InternalWalletState, ws: InternalWalletState,
exchangeBaseUrl: string, exchangeBaseUrl: string,
): Promise<void> {} ): Promise<void> {
await ws.db.runWithWriteTransaction(
[
Stores.coins,
Stores.denominations,
Stores.refreshGroups,
Stores.exchanges,
],
async (tx) => {
const exchange = await tx.get(Stores.exchanges, exchangeBaseUrl);
if (!exchange) {
return;
}
const coins = await tx
.iterIndexed(Stores.coins.exchangeBaseUrlIndex, exchangeBaseUrl)
.toArray();
const refreshCoins: CoinPublicKey[] = [];
for (const coin of coins) {
if (coin.status !== CoinStatus.Fresh) {
continue;
}
if (coin.suspended) {
continue;
}
const denom = await tx.get(Stores.denominations, [
exchangeBaseUrl,
coin.denomPub,
]);
if (!denom) {
logger.warn("denomination not in database");
continue;
}
const executeThreshold = getAutoRefreshExecuteThreshold(denom);
if (isTimestampExpired(executeThreshold)) {
refreshCoins.push(coin);
}
}
if (refreshCoins.length > 0) {
await createRefreshGroup(ws, tx, refreshCoins, RefreshReason.Scheduled);
}
const denoms = await tx
.iterIndexed(Stores.denominations.exchangeBaseUrlIndex, exchangeBaseUrl)
.toArray();
let minCheckThreshold = timestampAddDuration(
getTimestampNow(),
durationFromSpec({ days: 1 }),
);
for (const denom of denoms) {
const checkThreshold = getAutoRefreshCheckThreshold(denom);
const executeThreshold = getAutoRefreshExecuteThreshold(denom);
if (isTimestampExpired(executeThreshold)) {
// No need to consider this denomination, we already did an auto refresh check.
continue;
}
minCheckThreshold = timestampMin(minCheckThreshold, checkThreshold);
}
exchange.nextRefreshCheck = minCheckThreshold;
await tx.put(Stores.exchanges, exchange);
},
);
}

View File

@ -538,6 +538,7 @@ export enum RefreshReason {
AbortPay = "abort-pay", AbortPay = "abort-pay",
Recoup = "recoup", Recoup = "recoup",
BackupRestored = "backup-restored", BackupRestored = "backup-restored",
Scheduled = "scheduled",
} }
/** /**

View File

@ -144,6 +144,13 @@ export function durationMax(d1: Duration, d2: Duration): Duration {
return { d_ms: Math.max(d1.d_ms, d2.d_ms) }; return { d_ms: Math.max(d1.d_ms, d2.d_ms) };
} }
export function durationMul(d: Duration, n: number): Duration {
if (d.d_ms === "forever") {
return { d_ms: "forever" };
}
return { d_ms: Math.round( d.d_ms * n) };
}
export function timestampCmp(t1: Timestamp, t2: Timestamp): number { export function timestampCmp(t1: Timestamp, t2: Timestamp): number {
if (t1.t_ms === "never") { if (t1.t_ms === "never") {
if (t2.t_ms === "never") { if (t2.t_ms === "never") {

View File

@ -373,7 +373,13 @@ export class Wallet {
private async runRetryLoopImpl(): Promise<void> { private async runRetryLoopImpl(): Promise<void> {
while (!this.stopped) { while (!this.stopped) {
const pending = await this.getPendingOperations({ onlyDue: true }); const pending = await this.getPendingOperations({ onlyDue: true });
if (pending.pendingOperations.length === 0) { let numDueAndLive = 0;
for (const p of pending.pendingOperations) {
if (p.givesLifeness) {
numDueAndLive++;
}
}
if (numDueAndLive === 0) {
const allPending = await this.getPendingOperations({ onlyDue: false }); const allPending = await this.getPendingOperations({ onlyDue: false });
let numPending = 0; let numPending = 0;
let numGivingLiveness = 0; let numGivingLiveness = 0;
@ -404,6 +410,7 @@ export class Wallet {
} else { } else {
// FIXME: maybe be a bit smarter about executing these // FIXME: maybe be a bit smarter about executing these
// operations in parallel? // operations in parallel?
logger.trace(`running ${pending.pendingOperations.length} pending operations`);
for (const p of pending.pendingOperations) { for (const p of pending.pendingOperations) {
try { try {
await this.processOnePendingOperation(p); await this.processOnePendingOperation(p);