simplify pending transactions, make more tests pass again

This commit is contained in:
Florian Dold 2021-06-10 16:32:37 +02:00
parent 7b7e3b4565
commit 8ad36d89f5
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
12 changed files with 176 additions and 431 deletions

View File

@ -217,6 +217,14 @@ export function timestampDifference(t1: Timestamp, t2: Timestamp): Duration {
return { d_ms: Math.abs(t1.t_ms - t2.t_ms) }; return { d_ms: Math.abs(t1.t_ms - t2.t_ms) };
} }
export function timestampToIsoString(t: Timestamp): string {
if (t.t_ms === "never") {
return "<never>";
} else {
return new Date(t.t_ms).toISOString();
}
}
export function timestampIsBetween( export function timestampIsBetween(
t: Timestamp, t: Timestamp,
start: Timestamp, start: Timestamp,

View File

@ -167,6 +167,10 @@ export async function runTimetravelAutorefreshTest(t: GlobalTestState) {
merchant, merchant,
}); });
// At this point, the original coins should've been refreshed.
// It would be too late to refresh them now, as we're past
// the two year deposit expiration.
await wallet.runUntilDone(); await wallet.runUntilDone();
const orderResp = await MerchantPrivateApi.createOrder(merchant, "default", { const orderResp = await MerchantPrivateApi.createOrder(merchant, "default", {

View File

@ -515,25 +515,11 @@ export interface DenominationRecord {
exchangeBaseUrl: string; exchangeBaseUrl: string;
} }
export enum ExchangeUpdateStatus {
FetchKeys = "fetch-keys",
FetchWire = "fetch-wire",
FetchTerms = "fetch-terms",
FinalizeUpdate = "finalize-update",
Finished = "finished",
}
export interface ExchangeBankAccount { export interface ExchangeBankAccount {
payto_uri: string; payto_uri: string;
master_sig: string; master_sig: string;
} }
export enum ExchangeUpdateReason {
Initial = "initial",
Forced = "forced",
Scheduled = "scheduled",
}
export interface ExchangeDetailsRecord { export interface ExchangeDetailsRecord {
/** /**
* Master public key of the exchange. * Master public key of the exchange.
@ -582,16 +568,6 @@ export interface ExchangeDetailsRecord {
*/ */
termsOfServiceAcceptedEtag: string | undefined; termsOfServiceAcceptedEtag: string | undefined;
/**
* Timestamp for last update.
*/
lastUpdateTime: Timestamp;
/**
* When should we next update the information about the exchange?
*/
nextUpdateTime: Timestamp;
wireInfo: WireInfo; wireInfo: WireInfo;
} }
@ -629,27 +605,16 @@ export interface ExchangeRecord {
permanent: boolean; permanent: boolean;
/** /**
* Time when the update to the exchange has been started or * Last time when the exchange was updated.
* undefined if no update is in progress.
*/ */
updateStarted: Timestamp | undefined; lastUpdate: Timestamp | undefined;
/** /**
* Status of updating the info about the exchange. * Next scheduled update for the exchange.
* *
* FIXME: Adapt this to recent changes regarding how * (This field must always be present, so we can index on the timestamp.)
* updating exchange details works.
*/ */
updateStatus: ExchangeUpdateStatus; nextUpdate: Timestamp;
updateReason?: ExchangeUpdateReason;
lastError?: TalerErrorDetails;
/**
* Retry status for fetching updated information about the exchange.
*/
retryInfo: RetryInfo;
/** /**
* Next time that we should check if coins need to be refreshed. * Next time that we should check if coins need to be refreshed.
@ -657,7 +622,14 @@ export interface ExchangeRecord {
* Updated whenever the exchange's denominations are updated or when * Updated whenever the exchange's denominations are updated or when
* the refresh check has been done. * the refresh check has been done.
*/ */
nextRefreshCheck?: Timestamp; nextRefreshCheck: Timestamp;
lastError?: TalerErrorDetails;
/**
* Retry status for fetching updated information about the exchange.
*/
retryInfo: RetryInfo;
} }
/** /**

View File

@ -31,7 +31,6 @@ import {
import { import {
WalletContractData, WalletContractData,
DenomSelectionState, DenomSelectionState,
ExchangeUpdateStatus,
DenominationStatus, DenominationStatus,
CoinSource, CoinSource,
CoinSourceType, CoinSourceType,
@ -265,8 +264,9 @@ export async function importBackup(
}, },
permanent: true, permanent: true,
retryInfo: initRetryInfo(false), retryInfo: initRetryInfo(false),
updateStarted: { t_ms: "never" }, lastUpdate: undefined,
updateStatus: ExchangeUpdateStatus.Finished, nextUpdate: getTimestampNow(),
nextRefreshCheck: getTimestampNow(),
}); });
} }
@ -307,9 +307,7 @@ export async function importBackup(
auditor_url: x.auditor_url, auditor_url: x.auditor_url,
denomination_keys: x.denomination_keys, denomination_keys: x.denomination_keys,
})), })),
lastUpdateTime: { t_ms: "never" },
masterPublicKey: backupExchangeDetails.master_public_key, masterPublicKey: backupExchangeDetails.master_public_key,
nextUpdateTime: { t_ms: "never" },
protocolVersion: backupExchangeDetails.protocol_version, protocolVersion: backupExchangeDetails.protocol_version,
reserveClosingDelay: backupExchangeDetails.reserve_closing_delay, reserveClosingDelay: backupExchangeDetails.reserve_closing_delay,
signingKeys: backupExchangeDetails.signing_keys.map((x) => ({ signingKeys: backupExchangeDetails.signing_keys.map((x) => ({

View File

@ -42,9 +42,7 @@ import {
DenominationRecord, DenominationRecord,
DenominationStatus, DenominationStatus,
ExchangeRecord, ExchangeRecord,
ExchangeUpdateStatus,
WireFee, WireFee,
ExchangeUpdateReason,
ExchangeDetailsRecord, ExchangeDetailsRecord,
WireInfo, WireInfo,
WalletStoresV1, WalletStoresV1,
@ -299,11 +297,11 @@ async function provideExchangeRecord(
r = { r = {
permanent: true, permanent: true,
baseUrl: baseUrl, baseUrl: baseUrl,
updateStatus: ExchangeUpdateStatus.FetchKeys,
updateStarted: now,
updateReason: ExchangeUpdateReason.Initial,
retryInfo: initRetryInfo(false), retryInfo: initRetryInfo(false),
detailsPointer: undefined, detailsPointer: undefined,
lastUpdate: undefined,
nextUpdate: now,
nextRefreshCheck: now,
}; };
await tx.exchanges.put(r); await tx.exchanges.put(r);
} }
@ -411,6 +409,27 @@ async function updateExchangeFromUrlImpl(
const r = await provideExchangeRecord(ws, baseUrl, now); const r = await provideExchangeRecord(ws, baseUrl, now);
if (!forceNow && r && !isTimestampExpired(r.nextUpdate)) {
const res = await ws.db.mktx((x) => ({
exchanges: x.exchanges,
exchangeDetails: x.exchangeDetails,
})).runReadOnly(async (tx) => {
const exchange = await tx.exchanges.get(baseUrl);
if (!exchange) {
return;
}
const exchangeDetails = await getExchangeDetails(tx, baseUrl);
if (!exchangeDetails) {
return;
}
return { exchange, exchangeDetails };
});
if (res) {
logger.info("using existing exchange info");
return res;
}
}
logger.info("updating exchange /keys info"); logger.info("updating exchange /keys info");
const timeout = getExchangeRequestTimeout(r); const timeout = getExchangeRequestTimeout(r);
@ -460,11 +479,9 @@ async function updateExchangeFromUrlImpl(
details = { details = {
auditors: keysInfo.auditors, auditors: keysInfo.auditors,
currency: keysInfo.currency, currency: keysInfo.currency,
lastUpdateTime: now,
masterPublicKey: keysInfo.masterPublicKey, masterPublicKey: keysInfo.masterPublicKey,
protocolVersion: keysInfo.protocolVersion, protocolVersion: keysInfo.protocolVersion,
signingKeys: keysInfo.signingKeys, signingKeys: keysInfo.signingKeys,
nextUpdateTime: keysInfo.expiry,
reserveClosingDelay: keysInfo.reserveClosingDelay, reserveClosingDelay: keysInfo.reserveClosingDelay,
exchangeBaseUrl: r.baseUrl, exchangeBaseUrl: r.baseUrl,
wireInfo, wireInfo,
@ -472,12 +489,13 @@ async function updateExchangeFromUrlImpl(
termsOfServiceAcceptedEtag: undefined, termsOfServiceAcceptedEtag: undefined,
termsOfServiceLastEtag: tosDownload.tosEtag, termsOfServiceLastEtag: tosDownload.tosEtag,
}; };
r.updateStatus = ExchangeUpdateStatus.FetchWire;
// FIXME: only update if pointer got updated // FIXME: only update if pointer got updated
r.lastError = undefined; r.lastError = undefined;
r.retryInfo = initRetryInfo(false); r.retryInfo = initRetryInfo(false);
r.lastUpdate = getTimestampNow();
r.nextUpdate = keysInfo.expiry,
// New denominations might be available. // New denominations might be available.
r.nextRefreshCheck = undefined; r.nextRefreshCheck = getTimestampNow();
r.detailsPointer = { r.detailsPointer = {
currency: details.currency, currency: details.currency,
masterPublicKey: details.masterPublicKey, masterPublicKey: details.masterPublicKey,

View File

@ -468,7 +468,7 @@ async function recordConfirmPay(
const p = await tx.proposals.get(proposal.proposalId); const p = await tx.proposals.get(proposal.proposalId);
if (p) { if (p) {
p.proposalStatus = ProposalStatus.ACCEPTED; p.proposalStatus = ProposalStatus.ACCEPTED;
p.lastError = undefined; delete p.lastError;
p.retryInfo = initRetryInfo(false); p.retryInfo = initRetryInfo(false);
await tx.proposals.put(p); await tx.proposals.put(p);
} }

View File

@ -18,7 +18,6 @@
* Imports. * Imports.
*/ */
import { import {
ExchangeUpdateStatus,
ProposalStatus, ProposalStatus,
ReserveRecordStatus, ReserveRecordStatus,
AbortStatus, AbortStatus,
@ -27,31 +26,13 @@ import {
import { import {
PendingOperationsResponse, PendingOperationsResponse,
PendingOperationType, PendingOperationType,
ExchangeUpdateOperationStage,
ReserveType, ReserveType,
} from "../pending-types"; } from "../pending-types";
import { import { getTimestampNow, Timestamp } from "@gnu-taler/taler-util";
Duration,
getTimestampNow,
Timestamp,
getDurationRemaining,
durationMin,
} from "@gnu-taler/taler-util";
import { InternalWalletState } from "./state"; import { InternalWalletState } from "./state";
import { getBalancesInsideTransaction } from "./balance"; import { getBalancesInsideTransaction } from "./balance";
import { getExchangeDetails } from "./exchanges.js";
import { GetReadOnlyAccess } from "../util/query.js"; import { GetReadOnlyAccess } from "../util/query.js";
function updateRetryDelay(
oldDelay: Duration,
now: Timestamp,
retryTimestamp: Timestamp,
): Duration {
const remaining = getDurationRemaining(retryTimestamp, now);
const nextDelay = durationMin(oldDelay, remaining);
return nextDelay;
}
async function gatherExchangePending( async function gatherExchangePending(
tx: GetReadOnlyAccess<{ tx: GetReadOnlyAccess<{
exchanges: typeof WalletStoresV1.exchanges; exchanges: typeof WalletStoresV1.exchanges;
@ -59,97 +40,22 @@ async function gatherExchangePending(
}>, }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
await tx.exchanges.iter().forEachAsync(async (e) => { await tx.exchanges.iter().forEachAsync(async (e) => {
switch (e.updateStatus) {
case ExchangeUpdateStatus.Finished:
if (e.lastError) {
resp.pendingOperations.push({
type: PendingOperationType.Bug,
givesLifeness: false,
message:
"Exchange record is in FINISHED state but has lastError set",
details: {
exchangeBaseUrl: e.baseUrl,
},
});
}
const details = await getExchangeDetails(tx, e.baseUrl);
const keysUpdateRequired =
details && details.nextUpdateTime.t_ms < now.t_ms;
if (keysUpdateRequired) {
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate, type: PendingOperationType.ExchangeUpdate,
givesLifeness: false, givesLifeness: false,
stage: ExchangeUpdateOperationStage.FetchKeys, timestampDue: e.nextUpdate,
exchangeBaseUrl: e.baseUrl, exchangeBaseUrl: e.baseUrl,
lastError: e.lastError, lastError: e.lastError,
reason: "scheduled",
}); });
}
if (
details &&
(!e.nextRefreshCheck || e.nextRefreshCheck.t_ms < now.t_ms)
) {
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.ExchangeCheckRefresh, type: PendingOperationType.ExchangeCheckRefresh,
exchangeBaseUrl: e.baseUrl, timestampDue: e.nextRefreshCheck,
givesLifeness: false, givesLifeness: false,
});
}
break;
case ExchangeUpdateStatus.FetchKeys:
if (onlyDue && e.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate,
givesLifeness: false,
stage: ExchangeUpdateOperationStage.FetchKeys,
exchangeBaseUrl: e.baseUrl, exchangeBaseUrl: e.baseUrl,
lastError: e.lastError,
reason: e.updateReason || "unknown",
}); });
break;
case ExchangeUpdateStatus.FetchWire:
if (onlyDue && e.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate,
givesLifeness: false,
stage: ExchangeUpdateOperationStage.FetchWire,
exchangeBaseUrl: e.baseUrl,
lastError: e.lastError,
reason: e.updateReason || "unknown",
});
break;
case ExchangeUpdateStatus.FinalizeUpdate:
if (onlyDue && e.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate,
givesLifeness: false,
stage: ExchangeUpdateOperationStage.FinalizeUpdate,
exchangeBaseUrl: e.baseUrl,
lastError: e.lastError,
reason: e.updateReason || "unknown",
});
break;
default:
resp.pendingOperations.push({
type: PendingOperationType.Bug,
givesLifeness: false,
message: "Unknown exchangeUpdateStatus",
details: {
exchangeBaseUrl: e.baseUrl,
exchangeUpdateStatus: e.updateStatus,
},
});
break;
}
}); });
} }
@ -157,16 +63,11 @@ async function gatherReservePending(
tx: GetReadOnlyAccess<{ reserves: typeof WalletStoresV1.reserves }>, tx: GetReadOnlyAccess<{ reserves: typeof WalletStoresV1.reserves }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
// FIXME: this should be optimized by using an index for "onlyDue==true".
await tx.reserves.iter().forEach((reserve) => { await tx.reserves.iter().forEach((reserve) => {
const reserveType = reserve.bankInfo const reserveType = reserve.bankInfo
? ReserveType.TalerBankWithdraw ? ReserveType.TalerBankWithdraw
: ReserveType.Manual; : ReserveType.Manual;
if (!reserve.retryInfo.active) {
return;
}
switch (reserve.reserveStatus) { switch (reserve.reserveStatus) {
case ReserveRecordStatus.DORMANT: case ReserveRecordStatus.DORMANT:
// nothing to report as pending // nothing to report as pending
@ -174,17 +75,10 @@ async function gatherReservePending(
case ReserveRecordStatus.WAIT_CONFIRM_BANK: case ReserveRecordStatus.WAIT_CONFIRM_BANK:
case ReserveRecordStatus.QUERYING_STATUS: case ReserveRecordStatus.QUERYING_STATUS:
case ReserveRecordStatus.REGISTERING_BANK: case ReserveRecordStatus.REGISTERING_BANK:
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
reserve.retryInfo.nextRetry,
);
if (onlyDue && reserve.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.Reserve, type: PendingOperationType.Reserve,
givesLifeness: true, givesLifeness: true,
timestampDue: reserve.retryInfo.nextRetry,
stage: reserve.reserveStatus, stage: reserve.reserveStatus,
timestampCreated: reserve.timestampCreated, timestampCreated: reserve.timestampCreated,
reserveType, reserveType,
@ -193,15 +87,7 @@ async function gatherReservePending(
}); });
break; break;
default: default:
resp.pendingOperations.push({ // FIXME: report problem!
type: PendingOperationType.Bug,
givesLifeness: false,
message: "Unknown reserve record status",
details: {
reservePub: reserve.reservePub,
reserveStatus: reserve.reserveStatus,
},
});
break; break;
} }
}); });
@ -211,24 +97,15 @@ async function gatherRefreshPending(
tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>, tx: GetReadOnlyAccess<{ refreshGroups: typeof WalletStoresV1.refreshGroups }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
await tx.refreshGroups.iter().forEach((r) => { await tx.refreshGroups.iter().forEach((r) => {
if (r.timestampFinished) { if (r.timestampFinished) {
return; return;
} }
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
r.retryInfo.nextRetry,
);
if (onlyDue && r.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.Refresh, type: PendingOperationType.Refresh,
givesLifeness: true, givesLifeness: true,
timestampDue: r.retryInfo.nextRetry,
refreshGroupId: r.refreshGroupId, refreshGroupId: r.refreshGroupId,
finishedPerCoin: r.finishedPerCoin, finishedPerCoin: r.finishedPerCoin,
retryInfo: r.retryInfo, retryInfo: r.retryInfo,
@ -243,20 +120,11 @@ async function gatherWithdrawalPending(
}>, }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
await tx.withdrawalGroups.iter().forEachAsync(async (wsr) => { await tx.withdrawalGroups.iter().forEachAsync(async (wsr) => {
if (wsr.timestampFinish) { if (wsr.timestampFinish) {
return; return;
} }
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
wsr.retryInfo.nextRetry,
);
if (onlyDue && wsr.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
let numCoinsWithdrawn = 0; let numCoinsWithdrawn = 0;
let numCoinsTotal = 0; let numCoinsTotal = 0;
await tx.planchets.indexes.byGroup await tx.planchets.indexes.byGroup
@ -270,8 +138,7 @@ async function gatherWithdrawalPending(
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.Withdraw, type: PendingOperationType.Withdraw,
givesLifeness: true, givesLifeness: true,
numCoinsTotal, timestampDue: wsr.retryInfo.nextRetry,
numCoinsWithdrawn,
withdrawalGroupId: wsr.withdrawalGroupId, withdrawalGroupId: wsr.withdrawalGroupId,
lastError: wsr.lastError, lastError: wsr.lastError,
retryInfo: wsr.retryInfo, retryInfo: wsr.retryInfo,
@ -283,42 +150,15 @@ async function gatherProposalPending(
tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>, tx: GetReadOnlyAccess<{ proposals: typeof WalletStoresV1.proposals }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
await tx.proposals.iter().forEach((proposal) => { await tx.proposals.iter().forEach((proposal) => {
if (proposal.proposalStatus == ProposalStatus.PROPOSED) { if (proposal.proposalStatus == ProposalStatus.PROPOSED) {
if (onlyDue) { // Nothing to do, user needs to choose.
return;
}
const dl = proposal.download;
if (!dl) {
resp.pendingOperations.push({
type: PendingOperationType.Bug,
message: "proposal is in invalid state",
details: {},
givesLifeness: false,
});
} else {
resp.pendingOperations.push({
type: PendingOperationType.ProposalChoice,
givesLifeness: false,
merchantBaseUrl: dl.contractData.merchantBaseUrl,
proposalId: proposal.proposalId,
proposalTimestamp: proposal.timestamp,
});
}
} else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) { } else if (proposal.proposalStatus == ProposalStatus.DOWNLOADING) {
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
proposal.retryInfo.nextRetry,
);
if (onlyDue && proposal.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.ProposalDownload, type: PendingOperationType.ProposalDownload,
givesLifeness: true, givesLifeness: true,
timestampDue: proposal.retryInfo.nextRetry,
merchantBaseUrl: proposal.merchantBaseUrl, merchantBaseUrl: proposal.merchantBaseUrl,
orderId: proposal.orderId, orderId: proposal.orderId,
proposalId: proposal.proposalId, proposalId: proposal.proposalId,
@ -334,24 +174,16 @@ async function gatherTipPending(
tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>, tx: GetReadOnlyAccess<{ tips: typeof WalletStoresV1.tips }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
await tx.tips.iter().forEach((tip) => { await tx.tips.iter().forEach((tip) => {
if (tip.pickedUpTimestamp) { if (tip.pickedUpTimestamp) {
return; return;
} }
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
tip.retryInfo.nextRetry,
);
if (onlyDue && tip.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
if (tip.acceptedTimestamp) { if (tip.acceptedTimestamp) {
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.TipPickup, type: PendingOperationType.TipPickup,
givesLifeness: true, givesLifeness: true,
timestampDue: tip.retryInfo.nextRetry,
merchantBaseUrl: tip.merchantBaseUrl, merchantBaseUrl: tip.merchantBaseUrl,
tipId: tip.walletTipId, tipId: tip.walletTipId,
merchantTipId: tip.merchantTipId, merchantTipId: tip.merchantTipId,
@ -364,42 +196,29 @@ async function gatherPurchasePending(
tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>, tx: GetReadOnlyAccess<{ purchases: typeof WalletStoresV1.purchases }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
await tx.purchases.iter().forEach((pr) => { await tx.purchases.iter().forEach((pr) => {
if (pr.paymentSubmitPending && pr.abortStatus === AbortStatus.None) { if (pr.paymentSubmitPending && pr.abortStatus === AbortStatus.None) {
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
pr.payRetryInfo.nextRetry,
);
if (!onlyDue || pr.payRetryInfo.nextRetry.t_ms <= now.t_ms) {
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.Pay, type: PendingOperationType.Pay,
givesLifeness: true, givesLifeness: true,
timestampDue: pr.payRetryInfo.nextRetry,
isReplay: false, isReplay: false,
proposalId: pr.proposalId, proposalId: pr.proposalId,
retryInfo: pr.payRetryInfo, retryInfo: pr.payRetryInfo,
lastError: pr.lastPayError, lastError: pr.lastPayError,
}); });
} }
}
if (pr.refundQueryRequested) { if (pr.refundQueryRequested) {
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
pr.refundStatusRetryInfo.nextRetry,
);
if (!onlyDue || pr.refundStatusRetryInfo.nextRetry.t_ms <= now.t_ms) {
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.RefundQuery, type: PendingOperationType.RefundQuery,
givesLifeness: true, givesLifeness: true,
timestampDue: pr.refundStatusRetryInfo.nextRetry,
proposalId: pr.proposalId, proposalId: pr.proposalId,
retryInfo: pr.refundStatusRetryInfo, retryInfo: pr.refundStatusRetryInfo,
lastError: pr.lastRefundStatusError, lastError: pr.lastRefundStatusError,
}); });
} }
}
}); });
} }
@ -407,23 +226,15 @@ async function gatherRecoupPending(
tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>, tx: GetReadOnlyAccess<{ recoupGroups: typeof WalletStoresV1.recoupGroups }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
await tx.recoupGroups.iter().forEach((rg) => { await tx.recoupGroups.iter().forEach((rg) => {
if (rg.timestampFinished) { if (rg.timestampFinished) {
return; return;
} }
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
rg.retryInfo.nextRetry,
);
if (onlyDue && rg.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.Recoup, type: PendingOperationType.Recoup,
givesLifeness: true, givesLifeness: true,
timestampDue: rg.retryInfo.nextRetry,
recoupGroupId: rg.recoupGroupId, recoupGroupId: rg.recoupGroupId,
retryInfo: rg.retryInfo, retryInfo: rg.retryInfo,
lastError: rg.lastError, lastError: rg.lastError,
@ -435,23 +246,15 @@ async function gatherDepositPending(
tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>, tx: GetReadOnlyAccess<{ depositGroups: typeof WalletStoresV1.depositGroups }>,
now: Timestamp, now: Timestamp,
resp: PendingOperationsResponse, resp: PendingOperationsResponse,
onlyDue = false,
): Promise<void> { ): Promise<void> {
await tx.depositGroups.iter().forEach((dg) => { await tx.depositGroups.iter().forEach((dg) => {
if (dg.timestampFinished) { if (dg.timestampFinished) {
return; return;
} }
resp.nextRetryDelay = updateRetryDelay(
resp.nextRetryDelay,
now,
dg.retryInfo.nextRetry,
);
if (onlyDue && dg.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.Deposit, type: PendingOperationType.Deposit,
givesLifeness: true, givesLifeness: true,
timestampDue: dg.retryInfo.nextRetry,
depositGroupId: dg.depositGroupId, depositGroupId: dg.depositGroupId,
retryInfo: dg.retryInfo, retryInfo: dg.retryInfo,
lastError: dg.lastError, lastError: dg.lastError,
@ -461,7 +264,6 @@ async function gatherDepositPending(
export async function getPendingOperations( export async function getPendingOperations(
ws: InternalWalletState, ws: InternalWalletState,
{ onlyDue = false } = {},
): Promise<PendingOperationsResponse> { ): Promise<PendingOperationsResponse> {
const now = getTimestampNow(); const now = getTimestampNow();
return await ws.db return await ws.db
@ -482,20 +284,18 @@ export async function getPendingOperations(
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const walletBalance = await getBalancesInsideTransaction(ws, tx); const walletBalance = await getBalancesInsideTransaction(ws, tx);
const resp: PendingOperationsResponse = { const resp: PendingOperationsResponse = {
nextRetryDelay: { d_ms: Number.MAX_SAFE_INTEGER },
onlyDue: onlyDue,
walletBalance, walletBalance,
pendingOperations: [], pendingOperations: [],
}; };
await gatherExchangePending(tx, now, resp, onlyDue); await gatherExchangePending(tx, now, resp);
await gatherReservePending(tx, now, resp, onlyDue); await gatherReservePending(tx, now, resp);
await gatherRefreshPending(tx, now, resp, onlyDue); await gatherRefreshPending(tx, now, resp);
await gatherWithdrawalPending(tx, now, resp, onlyDue); await gatherWithdrawalPending(tx, now, resp);
await gatherProposalPending(tx, now, resp, onlyDue); await gatherProposalPending(tx, now, resp);
await gatherTipPending(tx, now, resp, onlyDue); await gatherTipPending(tx, now, resp);
await gatherPurchasePending(tx, now, resp, onlyDue); await gatherPurchasePending(tx, now, resp);
await gatherRecoupPending(tx, now, resp, onlyDue); await gatherRecoupPending(tx, now, resp);
await gatherDepositPending(tx, now, resp, onlyDue); await gatherDepositPending(tx, now, resp);
return resp; return resp;
}); });
} }

View File

@ -32,6 +32,7 @@ import {
RefreshGroupId, RefreshGroupId,
RefreshReason, RefreshReason,
TalerErrorDetails, TalerErrorDetails,
timestampToIsoString,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { AmountJson, Amounts } from "@gnu-taler/taler-util"; import { AmountJson, Amounts } from "@gnu-taler/taler-util";
import { amountToPretty } from "@gnu-taler/taler-util"; import { amountToPretty } from "@gnu-taler/taler-util";
@ -864,7 +865,12 @@ export async function autoRefresh(
ws: InternalWalletState, ws: InternalWalletState,
exchangeBaseUrl: string, exchangeBaseUrl: string,
): Promise<void> { ): Promise<void> {
logger.info(`doing auto-refresh check for '${exchangeBaseUrl}'`);
await updateExchangeFromUrl(ws, exchangeBaseUrl, true); await updateExchangeFromUrl(ws, exchangeBaseUrl, true);
let minCheckThreshold = timestampAddDuration(
getTimestampNow(),
durationFromSpec({ days: 1 }),
);
await ws.db await ws.db
.mktx((x) => ({ .mktx((x) => ({
coins: x.coins, coins: x.coins,
@ -899,28 +905,20 @@ export async function autoRefresh(
const executeThreshold = getAutoRefreshExecuteThreshold(denom); const executeThreshold = getAutoRefreshExecuteThreshold(denom);
if (isTimestampExpired(executeThreshold)) { if (isTimestampExpired(executeThreshold)) {
refreshCoins.push(coin); refreshCoins.push(coin);
} else {
const checkThreshold = getAutoRefreshCheckThreshold(denom);
minCheckThreshold = timestampMin(minCheckThreshold, checkThreshold);
} }
} }
if (refreshCoins.length > 0) { if (refreshCoins.length > 0) {
await createRefreshGroup(ws, tx, refreshCoins, RefreshReason.Scheduled); await createRefreshGroup(ws, tx, refreshCoins, RefreshReason.Scheduled);
} }
logger.info(
const denoms = await tx.denominations.indexes.byExchangeBaseUrl `current wallet time: ${timestampToIsoString(getTimestampNow())}`,
.iter(exchangeBaseUrl) );
.toArray(); logger.info(
let minCheckThreshold = timestampAddDuration( `next refresh check at ${timestampToIsoString(minCheckThreshold)}`,
getTimestampNow(),
durationFromSpec({ days: 1 }),
); );
for (const denom of denoms) {
const checkThreshold = getAutoRefreshCheckThreshold(denom);
const executeThreshold = getAutoRefreshExecuteThreshold(denom);
if (isTimestampExpired(executeThreshold)) {
// No need to consider this denomination, we already did an auto refresh check.
continue;
}
minCheckThreshold = timestampMin(minCheckThreshold, checkThreshold);
}
exchange.nextRefreshCheck = minCheckThreshold; exchange.nextRefreshCheck = minCheckThreshold;
await tx.exchanges.put(exchange); await tx.exchanges.put(exchange);
}); });

View File

@ -34,7 +34,6 @@ import { ReserveRecordStatus } from "./db.js";
import { RetryInfo } from "./util/retries.js"; import { RetryInfo } from "./util/retries.js";
export enum PendingOperationType { export enum PendingOperationType {
Bug = "bug",
ExchangeUpdate = "exchange-update", ExchangeUpdate = "exchange-update",
ExchangeCheckRefresh = "exchange-check-refresh", ExchangeCheckRefresh = "exchange-check-refresh",
Pay = "pay", Pay = "pay",
@ -44,7 +43,6 @@ export enum PendingOperationType {
Reserve = "reserve", Reserve = "reserve",
Recoup = "recoup", Recoup = "recoup",
RefundQuery = "refund-query", RefundQuery = "refund-query",
TipChoice = "tip-choice",
TipPickup = "tip-pickup", TipPickup = "tip-pickup",
Withdraw = "withdraw", Withdraw = "withdraw",
Deposit = "deposit", Deposit = "deposit",
@ -55,16 +53,13 @@ export enum PendingOperationType {
*/ */
export type PendingOperationInfo = PendingOperationInfoCommon & export type PendingOperationInfo = PendingOperationInfoCommon &
( (
| PendingBugOperation
| PendingExchangeUpdateOperation | PendingExchangeUpdateOperation
| PendingExchangeCheckRefreshOperation | PendingExchangeCheckRefreshOperation
| PendingPayOperation | PendingPayOperation
| PendingProposalChoiceOperation
| PendingProposalDownloadOperation | PendingProposalDownloadOperation
| PendingRefreshOperation | PendingRefreshOperation
| PendingRefundQueryOperation | PendingRefundQueryOperation
| PendingReserveOperation | PendingReserveOperation
| PendingTipChoiceOperation
| PendingTipPickupOperation | PendingTipPickupOperation
| PendingWithdrawOperation | PendingWithdrawOperation
| PendingRecoupOperation | PendingRecoupOperation
@ -76,8 +71,6 @@ export type PendingOperationInfo = PendingOperationInfoCommon &
*/ */
export interface PendingExchangeUpdateOperation { export interface PendingExchangeUpdateOperation {
type: PendingOperationType.ExchangeUpdate; type: PendingOperationType.ExchangeUpdate;
stage: ExchangeUpdateOperationStage;
reason: string;
exchangeBaseUrl: string; exchangeBaseUrl: string;
lastError: TalerErrorDetails | undefined; lastError: TalerErrorDetails | undefined;
} }
@ -91,26 +84,6 @@ export interface PendingExchangeCheckRefreshOperation {
exchangeBaseUrl: string; exchangeBaseUrl: string;
} }
/**
* Some internal error happened in the wallet. This pending operation
* should *only* be reported for problems in the wallet, not when
* a problem with a merchant/exchange/etc. occurs.
*/
export interface PendingBugOperation {
type: PendingOperationType.Bug;
message: string;
details: any;
}
/**
* Current state of an exchange update operation.
*/
export enum ExchangeUpdateOperationStage {
FetchKeys = "fetch-keys",
FetchWire = "fetch-wire",
FinalizeUpdate = "finalize-update",
}
export enum ReserveType { export enum ReserveType {
/** /**
* Manually created. * Manually created.
@ -183,17 +156,6 @@ export interface PendingTipPickupOperation {
merchantTipId: string; merchantTipId: string;
} }
/**
* The wallet has been offered a tip, and the user now needs to
* decide whether to accept or reject the tip.
*/
export interface PendingTipChoiceOperation {
type: PendingOperationType.TipChoice;
tipId: string;
merchantBaseUrl: string;
merchantTipId: string;
}
/** /**
* The wallet is signing coins and then sending them to * The wallet is signing coins and then sending them to
* the merchant. * the merchant.
@ -232,8 +194,6 @@ export interface PendingWithdrawOperation {
lastError: TalerErrorDetails | undefined; lastError: TalerErrorDetails | undefined;
retryInfo: RetryInfo; retryInfo: RetryInfo;
withdrawalGroupId: string; withdrawalGroupId: string;
numCoinsWithdrawn: number;
numCoinsTotal: number;
} }
/** /**
@ -257,13 +217,18 @@ export interface PendingOperationInfoCommon {
/** /**
* Set to true if the operation indicates that something is really in progress, * Set to true if the operation indicates that something is really in progress,
* as opposed to some regular scheduled operation or a permanent failure. * as opposed to some regular scheduled operation that can be tried later.
*/ */
givesLifeness: boolean; givesLifeness: boolean;
/** /**
* Retry info, not available on all pending operations. * Timestamp when the pending operation should be executed next.
* If it is available, it must have the same name. */
timestampDue: Timestamp;
/**
* Retry info. Currently used to stop the wallet after any operation
* exceeds a number of retries.
*/ */
retryInfo?: RetryInfo; retryInfo?: RetryInfo;
} }
@ -281,15 +246,4 @@ export interface PendingOperationsResponse {
* Current wallet balance, including pending balances. * Current wallet balance, including pending balances.
*/ */
walletBalance: BalancesResponse; walletBalance: BalancesResponse;
/**
* When is the next pending operation due to be re-tried?
*/
nextRetryDelay: Duration;
/**
* Does this response only include pending operations that
* are due to be executed right now?
*/
onlyDue: boolean;
} }

View File

@ -121,7 +121,6 @@ export namespace ContractTermsUtil {
* to forgettable fields and other restrictions for forgettable JSON. * to forgettable fields and other restrictions for forgettable JSON.
*/ */
export function validateForgettable(anyJson: any): boolean { export function validateForgettable(anyJson: any): boolean {
console.warn("calling validateForgettable", anyJson);
if (typeof anyJson === "string") { if (typeof anyJson === "string") {
return true; return true;
} }
@ -206,7 +205,6 @@ export namespace ContractTermsUtil {
} }
} }
} else { } else {
console.warn("invalid type");
return false; return false;
} }
} }

View File

@ -81,10 +81,11 @@ export function initRetryInfo(
retryCounter: 0, retryCounter: 0,
}; };
} }
const now = getTimestampNow();
const info = { const info = {
firstTry: getTimestampNow(), firstTry: now,
active: true, active: true,
nextRetry: { t_ms: 0 }, nextRetry: now,
retryCounter: 0, retryCounter: 0,
}; };
updateRetryInfoTimeout(info, p); updateRetryInfoTimeout(info, p);

View File

@ -27,7 +27,15 @@ import {
codecForAny, codecForAny,
codecForDeleteTransactionRequest, codecForDeleteTransactionRequest,
DeleteTransactionRequest, DeleteTransactionRequest,
durationFromSpec,
durationMax,
durationMin,
getDurationRemaining,
isTimestampExpired,
j2s,
TalerErrorCode, TalerErrorCode,
Timestamp,
timestampMin,
WalletCurrencyInfo, WalletCurrencyInfo,
} from "@gnu-taler/taler-util"; } from "@gnu-taler/taler-util";
import { CryptoWorkerFactory } from "./crypto/workers/cryptoApi"; import { CryptoWorkerFactory } from "./crypto/workers/cryptoApi";
@ -105,11 +113,8 @@ import {
AuditorTrustRecord, AuditorTrustRecord,
CoinRecord, CoinRecord,
CoinSourceType, CoinSourceType,
DenominationRecord,
ExchangeDetailsRecord, ExchangeDetailsRecord,
ExchangeRecord, ExchangeRecord,
PurchaseRecord,
RefundState,
ReserveRecord, ReserveRecord,
ReserveRecordStatus, ReserveRecordStatus,
WalletStoresV1, WalletStoresV1,
@ -164,7 +169,6 @@ import {
ManualWithdrawalDetails, ManualWithdrawalDetails,
PreparePayResult, PreparePayResult,
PrepareTipResult, PrepareTipResult,
PurchaseDetails,
RecoveryLoadRequest, RecoveryLoadRequest,
RefreshReason, RefreshReason,
ReturnCoinsRequest, ReturnCoinsRequest,
@ -180,7 +184,6 @@ import { AsyncOpMemoSingle } from "./util/asyncMemo";
import { HttpRequestLibrary } from "./util/http"; import { HttpRequestLibrary } from "./util/http";
import { Logger } from "@gnu-taler/taler-util"; import { Logger } from "@gnu-taler/taler-util";
import { AsyncCondition } from "./util/promiseUtils"; import { AsyncCondition } from "./util/promiseUtils";
import { Duration, durationMin } from "@gnu-taler/taler-util";
import { TimerGroup } from "./util/timer"; import { TimerGroup } from "./util/timer";
import { getExchangeTrust } from "./operations/currencies.js"; import { getExchangeTrust } from "./operations/currencies.js";
import { DbAccess } from "./util/query.js"; import { DbAccess } from "./util/query.js";
@ -261,9 +264,6 @@ export class Wallet {
): Promise<void> { ): Promise<void> {
logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`); logger.trace(`running pending ${JSON.stringify(pending, undefined, 2)}`);
switch (pending.type) { switch (pending.type) {
case PendingOperationType.Bug:
// Nothing to do, will just be displayed to the user
return;
case PendingOperationType.ExchangeUpdate: case PendingOperationType.ExchangeUpdate:
await updateExchangeFromUrl(this.ws, pending.exchangeBaseUrl, forceNow); await updateExchangeFromUrl(this.ws, pending.exchangeBaseUrl, forceNow);
break; break;
@ -280,15 +280,9 @@ export class Wallet {
forceNow, forceNow,
); );
break; break;
case PendingOperationType.ProposalChoice:
// Nothing to do, user needs to accept/reject
break;
case PendingOperationType.ProposalDownload: case PendingOperationType.ProposalDownload:
await processDownloadProposal(this.ws, pending.proposalId, forceNow); await processDownloadProposal(this.ws, pending.proposalId, forceNow);
break; break;
case PendingOperationType.TipChoice:
// Nothing to do, user needs to accept/reject
break;
case PendingOperationType.TipPickup: case PendingOperationType.TipPickup:
await processTip(this.ws, pending.tipId, forceNow); await processTip(this.ws, pending.tipId, forceNow);
break; break;
@ -316,9 +310,11 @@ export class Wallet {
* Process pending operations. * Process pending operations.
*/ */
public async runPending(forceNow = false): Promise<void> { public async runPending(forceNow = false): Promise<void> {
const onlyDue = !forceNow; const pendingOpsResponse = await this.getPendingOperations();
const pendingOpsResponse = await this.getPendingOperations({ onlyDue });
for (const p of pendingOpsResponse.pendingOperations) { for (const p of pendingOpsResponse.pendingOperations) {
if (!forceNow && !isTimestampExpired(p.timestampDue)) {
continue;
}
try { try {
await this.processOnePendingOperation(p, forceNow); await this.processOnePendingOperation(p, forceNow);
} catch (e) { } catch (e) {
@ -364,7 +360,7 @@ export class Wallet {
if (!maxRetries) { if (!maxRetries) {
return; return;
} }
this.getPendingOperations({ onlyDue: false }) this.getPendingOperations()
.then((pending) => { .then((pending) => {
for (const p of pending.pendingOperations) { for (const p of pending.pendingOperations) {
if (p.retryInfo && p.retryInfo.retryCounter > maxRetries) { if (p.retryInfo && p.retryInfo.retryCounter > maxRetries) {
@ -408,51 +404,53 @@ export class Wallet {
} }
private async runRetryLoopImpl(): Promise<void> { private async runRetryLoopImpl(): Promise<void> {
let iteration = 0; for (let iteration = 0; !this.stopped; iteration++) {
for (; !this.stopped; iteration++) { const pending = await this.getPendingOperations();
const pending = await this.getPendingOperations({ onlyDue: true }); logger.trace(`pending operations: ${j2s(pending)}`);
let numDueAndLive = 0;
for (const p of pending.pendingOperations) {
if (p.givesLifeness) {
numDueAndLive++;
}
}
// Make sure that we run tasks that don't give lifeness at least
// one time.
if (iteration !== 0 && numDueAndLive === 0) {
const allPending = await this.getPendingOperations({ onlyDue: false });
let numPending = 0;
let numGivingLiveness = 0; let numGivingLiveness = 0;
for (const p of allPending.pendingOperations) { let numDue = 0;
numPending++; let minDue: Timestamp = { t_ms: "never" };
for (const p of pending.pendingOperations) {
minDue = timestampMin(minDue, p.timestampDue);
if (isTimestampExpired(p.timestampDue)) {
numDue++;
}
if (p.givesLifeness) { if (p.givesLifeness) {
numGivingLiveness++; numGivingLiveness++;
} }
} }
let dt: Duration; // Make sure that we run tasks that don't give lifeness at least
if ( // one time.
allPending.pendingOperations.length === 0 || if (iteration !== 0 && numDue === 0) {
allPending.nextRetryDelay.d_ms === Number.MAX_SAFE_INTEGER // We've executed pending, due operations at least one.
) { // Now we don't have any more operations available,
// Wait for 5 seconds // and need to wait.
dt = { d_ms: 5000 };
} else { // Wait for at most 5 seconds to the next check.
dt = durationMin({ d_ms: 5000 }, allPending.nextRetryDelay); const dt = durationMin(
} durationFromSpec({
seconds: 5,
}),
getDurationRemaining(minDue),
);
logger.trace(`waiting for at most ${dt.d_ms} ms`)
const timeout = this.timerGroup.resolveAfter(dt); const timeout = this.timerGroup.resolveAfter(dt);
this.ws.notify({ this.ws.notify({
type: NotificationType.WaitingForRetry, type: NotificationType.WaitingForRetry,
numGivingLiveness, numGivingLiveness,
numPending, numPending: pending.pendingOperations.length,
}); });
// Wait until either the timeout, or we are notified (via the latch)
// that more work might be available.
await Promise.race([timeout, this.latch.wait()]); await Promise.race([timeout, this.latch.wait()]);
} else { } else {
// FIXME: maybe be a bit smarter about executing these
// operations in parallel?
logger.trace( logger.trace(
`running ${pending.pendingOperations.length} pending operations`, `running ${pending.pendingOperations.length} pending operations`,
); );
for (const p of pending.pendingOperations) { for (const p of pending.pendingOperations) {
if (!isTimestampExpired(p.timestampDue)) {
continue;
}
try { try {
await this.processOnePendingOperation(p); await this.processOnePendingOperation(p);
} catch (e) { } catch (e) {
@ -650,12 +648,8 @@ export class Wallet {
} }
} }
async getPendingOperations({ async getPendingOperations(): Promise<PendingOperationsResponse> {
onlyDue = false, return this.ws.memoGetPending.memo(() => getPendingOperations(this.ws));
} = {}): Promise<PendingOperationsResponse> {
return this.ws.memoGetPending.memo(() =>
getPendingOperations(this.ws, { onlyDue }),
);
} }
async acceptExchangeTermsOfService( async acceptExchangeTermsOfService(