include (pending) wallet balance in pending ops response

This commit is contained in:
Florian Dold 2020-03-06 19:39:55 +05:30
parent 7c7d3e001e
commit 4e76edf129
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
4 changed files with 137 additions and 100 deletions

View File

@ -18,7 +18,7 @@
* Imports. * Imports.
*/ */
import { WalletBalance, WalletBalanceEntry } from "../types/walletTypes"; import { WalletBalance, WalletBalanceEntry } from "../types/walletTypes";
import { Database } from "../util/query"; import { Database, TransactionHandle } from "../util/query";
import { InternalWalletState } from "./state"; import { InternalWalletState } from "./state";
import { Stores, TipRecord, CoinStatus } from "../types/dbTypes"; import { Stores, TipRecord, CoinStatus } from "../types/dbTypes";
import * as Amounts from "../util/amounts"; import * as Amounts from "../util/amounts";
@ -28,13 +28,14 @@ import { Logger } from "../util/logging";
const logger = new Logger("withdraw.ts"); const logger = new Logger("withdraw.ts");
/** /**
* Get detailed balance information, sliced by exchange and by currency. * Get balance information.
*/ */
export async function getBalances( export async function getBalancesInsideTransaction(
ws: InternalWalletState, ws: InternalWalletState,
tx: TransactionHandle,
): Promise<WalletBalance> { ): Promise<WalletBalance> {
logger.trace("starting to compute balance");
/** /**
* Add amount to a balance field, both for * Add amount to a balance field, both for
* the slicing by exchange and currency. * the slicing by exchange and currency.
*/ */
@ -73,76 +74,85 @@ export async function getBalances(
byExchange: {}, byExchange: {},
}; };
await ws.db.runWithReadTransaction( await tx.iter(Stores.coins).forEach(c => {
[Stores.coins, Stores.refreshGroups, Stores.reserves, Stores.purchases, Stores.withdrawalSession], if (c.suspended) {
async tx => { return;
await tx.iter(Stores.coins).forEach(c => { }
if (c.suspended) { if (c.status === CoinStatus.Fresh) {
return; addTo(balanceStore, "available", c.currentAmount, c.exchangeBaseUrl);
} }
if (c.status === CoinStatus.Fresh) { });
addTo(balanceStore, "available", c.currentAmount, c.exchangeBaseUrl); await tx.iter(Stores.refreshGroups).forEach(r => {
} // Don't count finished refreshes, since the refresh already resulted
}); // in coins being added to the wallet.
await tx.iter(Stores.refreshGroups).forEach(r => { if (r.timestampFinished) {
// Don't count finished refreshes, since the refresh already resulted return;
// in coins being added to the wallet. }
if (r.timestampFinished) { for (let i = 0; i < r.oldCoinPubs.length; i++) {
return; const session = r.refreshSessionPerCoin[i];
} if (session) {
for (let i = 0; i < r.oldCoinPubs.length; i++) {
const session = r.refreshSessionPerCoin[i];
if (session) {
addTo(
balanceStore,
"pendingIncoming",
session.amountRefreshOutput,
session.exchangeBaseUrl,
);
addTo(
balanceStore,
"pendingIncomingRefresh",
session.amountRefreshOutput,
session.exchangeBaseUrl,
);
}
}
});
await tx.iter(Stores.withdrawalSession).forEach(wds => {
let w = wds.totalCoinValue;
for (let i = 0; i < wds.planchets.length; i++) {
if (wds.withdrawn[i]) {
const p = wds.planchets[i];
if (p) {
w = Amounts.sub(w, p.coinValue).amount;
}
}
}
addTo( addTo(
balanceStore, balanceStore,
"pendingIncoming", "pendingIncoming",
w, session.amountRefreshOutput,
wds.exchangeBaseUrl, session.exchangeBaseUrl,
); );
}); addTo(
balanceStore,
"pendingIncomingRefresh",
session.amountRefreshOutput,
session.exchangeBaseUrl,
);
}
}
});
await tx.iter(Stores.purchases).forEach(t => { await tx.iter(Stores.withdrawalSession).forEach(wds => {
if (t.timestampFirstSuccessfulPay) { let w = wds.totalCoinValue;
return; for (let i = 0; i < wds.planchets.length; i++) {
if (wds.withdrawn[i]) {
const p = wds.planchets[i];
if (p) {
w = Amounts.sub(w, p.coinValue).amount;
} }
for (const c of t.payReq.coins) { }
addTo( }
balanceStore, addTo(balanceStore, "pendingIncoming", w, wds.exchangeBaseUrl);
"pendingPayment", });
Amounts.parseOrThrow(c.contribution),
c.exchange_url, await tx.iter(Stores.purchases).forEach(t => {
); if (t.timestampFirstSuccessfulPay) {
} return;
}); }
}, for (const c of t.payReq.coins) {
); addTo(
balanceStore,
"pendingPayment",
Amounts.parseOrThrow(c.contribution),
c.exchange_url,
);
}
});
logger.trace("computed balances:", balanceStore);
return balanceStore; return balanceStore;
} }
/**
* Get detailed balance information, sliced by exchange and by currency.
*/
export async function getBalances(
ws: InternalWalletState,
): Promise<WalletBalance> {
logger.trace("starting to compute balance");
return await ws.db.runWithReadTransaction([
Stores.coins,
Stores.refreshGroups,
Stores.reserves,
Stores.purchases,
Stores.withdrawalSession,
],
async tx => {
return getBalancesInsideTransaction(ws, tx);
});
}

View File

@ -28,9 +28,16 @@ import {
PendingOperationType, PendingOperationType,
ExchangeUpdateOperationStage, ExchangeUpdateOperationStage,
} from "../types/pending"; } from "../types/pending";
import { Duration, getTimestampNow, Timestamp, getDurationRemaining, durationMin } from "../util/time"; import {
Duration,
getTimestampNow,
Timestamp,
getDurationRemaining,
durationMin,
} from "../util/time";
import { TransactionHandle } from "../util/query"; import { TransactionHandle } from "../util/query";
import { InternalWalletState } from "./state"; import { InternalWalletState } from "./state";
import { getBalances, getBalancesInsideTransaction } from "./balance";
function updateRetryDelay( function updateRetryDelay(
oldDelay: Duration, oldDelay: Duration,
@ -38,7 +45,7 @@ function updateRetryDelay(
retryTimestamp: Timestamp, retryTimestamp: Timestamp,
): Duration { ): Duration {
const remaining = getDurationRemaining(retryTimestamp, now); const remaining = getDurationRemaining(retryTimestamp, now);
const nextDelay = durationMin(oldDelay, remaining); const nextDelay = durationMin(oldDelay, remaining);
return nextDelay; return nextDelay;
} }
@ -110,14 +117,14 @@ async function gatherExchangePending(
}); });
break; break;
case ExchangeUpdateStatus.FinalizeUpdate: case ExchangeUpdateStatus.FinalizeUpdate:
resp.pendingOperations.push({ resp.pendingOperations.push({
type: PendingOperationType.ExchangeUpdate, type: PendingOperationType.ExchangeUpdate,
givesLifeness: false, givesLifeness: false,
stage: ExchangeUpdateOperationStage.FinalizeUpdate, stage: ExchangeUpdateOperationStage.FinalizeUpdate,
exchangeBaseUrl: e.baseUrl, exchangeBaseUrl: e.baseUrl,
lastError: e.lastError, lastError: e.lastError,
reason: e.updateReason || "unknown", reason: e.updateReason || "unknown",
}); });
break; break;
default: default:
resp.pendingOperations.push({ resp.pendingOperations.push({
@ -400,15 +407,10 @@ async function gatherPurchasePending(
export async function getPendingOperations( export async function getPendingOperations(
ws: InternalWalletState, ws: InternalWalletState,
onlyDue: boolean = false, { onlyDue = false } = {},
): Promise<PendingOperationsResponse> { ): Promise<PendingOperationsResponse> {
const resp: PendingOperationsResponse = {
nextRetryDelay: { d_ms: Number.MAX_SAFE_INTEGER },
onlyDue: onlyDue,
pendingOperations: [],
};
const now = getTimestampNow(); const now = getTimestampNow();
await ws.db.runWithReadTransaction( return await ws.db.runWithReadTransaction(
[ [
Stores.exchanges, Stores.exchanges,
Stores.reserves, Stores.reserves,
@ -420,6 +422,13 @@ export async function getPendingOperations(
Stores.purchases, Stores.purchases,
], ],
async tx => { async tx => {
const walletBalance = await getBalancesInsideTransaction(ws, tx);
const resp: PendingOperationsResponse = {
nextRetryDelay: { d_ms: Number.MAX_SAFE_INTEGER },
onlyDue: onlyDue,
walletBalance,
pendingOperations: [],
};
await gatherExchangePending(tx, now, resp, onlyDue); await gatherExchangePending(tx, now, resp, onlyDue);
await gatherReservePending(tx, now, resp, onlyDue); await gatherReservePending(tx, now, resp, onlyDue);
await gatherRefreshPending(tx, now, resp, onlyDue); await gatherRefreshPending(tx, now, resp, onlyDue);
@ -427,7 +436,7 @@ export async function getPendingOperations(
await gatherProposalPending(tx, now, resp, onlyDue); await gatherProposalPending(tx, now, resp, onlyDue);
await gatherTipPending(tx, now, resp, onlyDue); await gatherTipPending(tx, now, resp, onlyDue);
await gatherPurchasePending(tx, now, resp, onlyDue); await gatherPurchasePending(tx, now, resp, onlyDue);
return resp;
}, },
); );
return resp;
} }

View File

@ -21,7 +21,7 @@
/** /**
* Imports. * Imports.
*/ */
import { OperationError } from "./walletTypes"; import { OperationError, WalletBalance } from "./walletTypes";
import { WithdrawalSource, RetryInfo, ReserveRecordStatus } from "./dbTypes"; import { WithdrawalSource, RetryInfo, ReserveRecordStatus } from "./dbTypes";
import { Timestamp, Duration } from "../util/time"; import { Timestamp, Duration } from "../util/time";
@ -231,7 +231,19 @@ export interface PendingOperationInfoCommon {
* Response returned from the pending operations API. * Response returned from the pending operations API.
*/ */
export interface PendingOperationsResponse { export interface PendingOperationsResponse {
/**
* List of pending operations.
*/
pendingOperations: PendingOperationInfo[]; pendingOperations: PendingOperationInfo[];
/**
* Current wallet balance, including pending balances.
*/
walletBalance: WalletBalance;
/**
* When is the next pending operation due to be re-tried?
*/
nextRetryDelay: Duration; nextRetryDelay: Duration;
/** /**

View File

@ -82,7 +82,10 @@ import {
getExchangePaytoUri, getExchangePaytoUri,
acceptExchangeTermsOfService, acceptExchangeTermsOfService,
} from "./operations/exchanges"; } from "./operations/exchanges";
import { processReserve, createTalerWithdrawReserve } from "./operations/reserves"; import {
processReserve,
createTalerWithdrawReserve,
} from "./operations/reserves";
import { InternalWalletState } from "./operations/state"; import { InternalWalletState } from "./operations/state";
import { createReserve, confirmReserve } from "./operations/reserves"; import { createReserve, confirmReserve } from "./operations/reserves";
@ -111,7 +114,6 @@ import {
} from "./operations/refund"; } from "./operations/refund";
import { durationMin, Duration } from "./util/time"; import { durationMin, Duration } from "./util/time";
const builtinCurrencies: CurrencyRecord[] = [ const builtinCurrencies: CurrencyRecord[] = [
{ {
auditors: [ auditors: [
@ -225,7 +227,7 @@ export class Wallet {
*/ */
public async runPending(forceNow: boolean = false): Promise<void> { public async runPending(forceNow: boolean = false): Promise<void> {
const onlyDue = !forceNow; const onlyDue = !forceNow;
const pendingOpsResponse = await this.getPendingOperations(onlyDue); const pendingOpsResponse = await this.getPendingOperations({ onlyDue });
for (const p of pendingOpsResponse.pendingOperations) { for (const p of pendingOpsResponse.pendingOperations) {
try { try {
await this.processOnePendingOperation(p, forceNow); await this.processOnePendingOperation(p, forceNow);
@ -260,7 +262,7 @@ export class Wallet {
await p; await p;
} }
/** /**
* Run the wallet until there are no more pending operations that give * Run the wallet until there are no more pending operations that give
* liveness left. The wallet will be in a stopped state when this function * liveness left. The wallet will be in a stopped state when this function
* returns without resolving to an exception. * returns without resolving to an exception.
@ -304,10 +306,10 @@ export class Wallet {
private async runRetryLoopImpl(): Promise<void> { private async runRetryLoopImpl(): Promise<void> {
while (!this.stopped) { while (!this.stopped) {
console.log("running wallet retry loop iteration"); console.log("running wallet retry loop iteration");
let pending = await this.getPendingOperations(true); let pending = await this.getPendingOperations({ onlyDue: true });
console.log("pending ops", JSON.stringify(pending, undefined, 2)); console.log("pending ops", JSON.stringify(pending, undefined, 2));
if (pending.pendingOperations.length === 0) { if (pending.pendingOperations.length === 0) {
const allPending = await this.getPendingOperations(false); const allPending = await this.getPendingOperations({ onlyDue: false });
let numPending = 0; let numPending = 0;
let numGivingLiveness = 0; let numGivingLiveness = 0;
for (const p of allPending.pendingOperations) { for (const p of allPending.pendingOperations) {
@ -324,7 +326,7 @@ export class Wallet {
// Wait for 5 seconds // Wait for 5 seconds
dt = { d_ms: 5000 }; dt = { d_ms: 5000 };
} else { } else {
dt = durationMin({ d_ms: 5000}, allPending.nextRetryDelay); dt = durationMin({ d_ms: 5000 }, allPending.nextRetryDelay);
} }
const timeout = this.timerGroup.resolveAfter(dt); const timeout = this.timerGroup.resolveAfter(dt);
this.ws.notify({ this.ws.notify({
@ -524,11 +526,11 @@ export class Wallet {
return getHistory(this.ws, historyQuery); return getHistory(this.ws, historyQuery);
} }
async getPendingOperations( async getPendingOperations({ onlyDue = false } = {}): Promise<
onlyDue: boolean = false, PendingOperationsResponse
): Promise<PendingOperationsResponse> { > {
return this.ws.memoGetPending.memo(() => return this.ws.memoGetPending.memo(() =>
getPendingOperations(this.ws, onlyDue), getPendingOperations(this.ws, { onlyDue }),
); );
} }
@ -702,7 +704,11 @@ export class Wallet {
selectedExchange: string, selectedExchange: string,
): Promise<AcceptWithdrawalResponse> { ): Promise<AcceptWithdrawalResponse> {
try { try {
return createTalerWithdrawReserve(this.ws, talerWithdrawUri, selectedExchange); return createTalerWithdrawReserve(
this.ws,
talerWithdrawUri,
selectedExchange,
);
} finally { } finally {
this.latch.trigger(); this.latch.trigger();
} }