group refresh sessions into groups for nicer history

This commit is contained in:
Florian Dold 2019-12-15 16:59:00 +01:00
parent f4043a0f81
commit 4966376839
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
16 changed files with 427 additions and 382 deletions

29
.vscode/tasks.json vendored
View File

@ -3,17 +3,6 @@
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"type": "typescript",
"tsconfig": "tsconfig.json",
"option": "watch",
"problemMatcher": [
"$tsc-watch"
],
"group": "build",
"isBackground": true,
"promptOnClose": false
},
{
"type": "typescript",
"tsconfig": "tsconfig.json",
@ -21,24 +10,6 @@
"$tsc"
],
"group": "build"
},
{
"label": "tslint",
"type": "shell",
"command": "make lint",
"problemMatcher": {
"owner": "tslint",
"applyTo": "allDocuments",
"fileLocation": "absolute",
"severity": "warning",
"pattern": "$tslint5"
},
"group": "build"
},
{
"label": "My Task",
"type": "shell",
"command": "echo Hello"
}
]
}

View File

@ -395,7 +395,6 @@ export class CryptoImplementation {
const newAmount = Amounts.sub(cd.coin.currentAmount, coinSpend).amount;
cd.coin.currentAmount = newAmount;
cd.coin.status = CoinStatus.Dirty;
const d = buildSigPS(SignaturePurpose.WALLET_COIN_DEPOSIT)
.put(decodeCrock(contractTermsHash))
@ -509,10 +508,7 @@ export class CryptoImplementation {
valueOutput = Amounts.add(valueOutput, denom.value).amount;
}
const refreshSessionId = encodeCrock(getRandomBytes(32));
const refreshSession: RefreshSessionRecord = {
refreshSessionId,
confirmSig: encodeCrock(confirmSig),
exchangeBaseUrl,
hash: encodeCrock(sessionHash),
@ -526,7 +522,6 @@ export class CryptoImplementation {
valueOutput,
valueWithFee,
created: getTimestampNow(),
retryInfo: initRetryInfo(),
finishedTimestamp: undefined,
lastError: undefined,
};

View File

@ -330,7 +330,7 @@ advancedCli
.requiredArgument("coinPub", clk.STRING)
.action(async args => {
await withWallet(args, async wallet => {
await wallet.refresh(args.refresh.coinPub, true);
await wallet.refresh(args.refresh.coinPub);
});
});

View File

@ -74,7 +74,7 @@ export async function getBalances(
};
await ws.db.runWithReadTransaction(
[Stores.coins, Stores.refresh, Stores.reserves, Stores.purchases, Stores.withdrawalSession],
[Stores.coins, Stores.refreshGroups, Stores.reserves, Stores.purchases, Stores.withdrawalSession],
async tx => {
await tx.iter(Stores.coins).forEach(c => {
if (c.suspended) {
@ -83,39 +83,30 @@ export async function getBalances(
if (c.status === CoinStatus.Fresh) {
addTo(balanceStore, "available", c.currentAmount, c.exchangeBaseUrl);
}
if (c.status === CoinStatus.Dirty) {
addTo(
balanceStore,
"pendingIncoming",
c.currentAmount,
c.exchangeBaseUrl,
);
addTo(
balanceStore,
"pendingIncomingDirty",
c.currentAmount,
c.exchangeBaseUrl,
);
}
});
await tx.iter(Stores.refresh).forEach(r => {
await tx.iter(Stores.refreshGroups).forEach(r => {
// Don't count finished refreshes, since the refresh already resulted
// in coins being added to the wallet.
if (r.finishedTimestamp) {
return;
}
for (let i = 0; i < r.oldCoinPubs.length; i++) {
const session = r.refreshSessionPerCoin[i];
if (session) {
addTo(
balanceStore,
"pendingIncoming",
r.valueOutput,
r.exchangeBaseUrl,
session.valueOutput,
session.exchangeBaseUrl,
);
addTo(
balanceStore,
"pendingIncomingRefresh",
r.valueOutput,
r.exchangeBaseUrl,
session.valueOutput,
session.exchangeBaseUrl,
);
}
}
});
await tx.iter(Stores.withdrawalSession).forEach(wds => {

View File

@ -45,7 +45,7 @@ export async function getHistory(
Stores.exchanges,
Stores.proposals,
Stores.purchases,
Stores.refresh,
Stores.refreshGroups,
Stores.reserves,
Stores.tips,
Stores.withdrawalSession,

View File

@ -34,6 +34,7 @@ import {
PreparePayResult,
ConfirmPayResult,
OperationError,
RefreshReason,
} from "../types/walletTypes";
import {
Database
@ -65,7 +66,7 @@ import {
parseRefundUri,
getOrderDownloadUrl,
} from "../util/taleruri";
import { getTotalRefreshCost, refresh } from "./refresh";
import { getTotalRefreshCost, createRefreshGroup } from "./refresh";
import { encodeCrock, getRandomBytes } from "../crypto/talerCrypto";
import { guardOperationException } from "./errors";
import { assertUnreachable } from "../util/assertUnreachable";
@ -782,26 +783,21 @@ export async function submitPay(
console.error("coin not found");
throw Error("coin used in payment not found");
}
c.status = CoinStatus.Dirty;
c.status = CoinStatus.Dormant;
modifiedCoins.push(c);
}
await ws.db.runWithWriteTransaction(
[Stores.coins, Stores.purchases],
[Stores.coins, Stores.purchases, Stores.refreshGroups],
async tx => {
for (let c of modifiedCoins) {
await tx.put(Stores.coins, c);
}
await createRefreshGroup(tx, modifiedCoins.map((x) => ({ coinPub: x.coinPub })), RefreshReason.Pay);
await tx.put(Stores.purchases, purchase);
},
);
for (const c of purchase.payReq.coins) {
refresh(ws, c.coin_pub).catch(e => {
console.log("error in refreshing after payment:", e);
});
}
const nextUrl = getNextUrl(purchase.contractTerms);
ws.cachedNextUrl[purchase.contractTerms.fulfillment_url] = {
nextUrl,
@ -1433,7 +1429,7 @@ async function processPurchaseApplyRefundImpl(
let allRefundsProcessed = false;
await ws.db.runWithWriteTransaction(
[Stores.purchases, Stores.coins],
[Stores.purchases, Stores.coins, Stores.refreshGroups],
async tx => {
const p = await tx.get(Stores.purchases, proposalId);
if (!p) {
@ -1456,10 +1452,11 @@ async function processPurchaseApplyRefundImpl(
}
const refundAmount = Amounts.parseOrThrow(perm.refund_amount);
const refundFee = Amounts.parseOrThrow(perm.refund_fee);
c.status = CoinStatus.Dirty;
c.status = CoinStatus.Dormant;
c.currentAmount = Amounts.add(c.currentAmount, refundAmount).amount;
c.currentAmount = Amounts.sub(c.currentAmount, refundFee).amount;
await tx.put(Stores.coins, c);
await createRefreshGroup(tx, [{ coinPub: perm.coin_pub }], RefreshReason.Refund);
},
);
if (allRefundsProcessed) {
@ -1467,7 +1464,6 @@ async function processPurchaseApplyRefundImpl(
type: NotificationType.RefundFinished,
});
}
await refresh(ws, perm.coin_pub);
}
ws.notify({

View File

@ -31,7 +31,7 @@ import {
CoinStatus,
ProposalStatus,
} from "../types/dbTypes";
import { PendingOperationsResponse } from "../types/pending";
import { PendingOperationsResponse, PendingOperationType } from "../types/pending";
function updateRetryDelay(
oldDelay: Duration,
@ -59,7 +59,7 @@ async function gatherExchangePending(
case ExchangeUpdateStatus.FINISHED:
if (e.lastError) {
resp.pendingOperations.push({
type: "bug",
type: PendingOperationType.Bug,
givesLifeness: false,
message:
"Exchange record is in FINISHED state but has lastError set",
@ -70,7 +70,7 @@ async function gatherExchangePending(
}
if (!e.details) {
resp.pendingOperations.push({
type: "bug",
type: PendingOperationType.Bug,
givesLifeness: false,
message:
"Exchange record does not have details, but no update in progress.",
@ -81,7 +81,7 @@ async function gatherExchangePending(
}
if (!e.wireInfo) {
resp.pendingOperations.push({
type: "bug",
type: PendingOperationType.Bug,
givesLifeness: false,
message:
"Exchange record does not have wire info, but no update in progress.",
@ -93,7 +93,7 @@ async function gatherExchangePending(
break;
case ExchangeUpdateStatus.FETCH_KEYS:
resp.pendingOperations.push({
type: "exchange-update",
type: PendingOperationType.ExchangeUpdate,
givesLifeness: false,
stage: "fetch-keys",
exchangeBaseUrl: e.baseUrl,
@ -103,7 +103,7 @@ async function gatherExchangePending(
break;
case ExchangeUpdateStatus.FETCH_WIRE:
resp.pendingOperations.push({
type: "exchange-update",
type: PendingOperationType.ExchangeUpdate,
givesLifeness: false,
stage: "fetch-wire",
exchangeBaseUrl: e.baseUrl,
@ -113,7 +113,7 @@ async function gatherExchangePending(
break;
default:
resp.pendingOperations.push({
type: "bug",
type: PendingOperationType.Bug,
givesLifeness: false,
message: "Unknown exchangeUpdateStatus",
details: {
@ -147,7 +147,7 @@ async function gatherReservePending(
break;
}
resp.pendingOperations.push({
type: "reserve",
type: PendingOperationType.Reserve,
givesLifeness: false,
stage: reserve.reserveStatus,
timestampCreated: reserve.created,
@ -169,7 +169,7 @@ async function gatherReservePending(
return;
}
resp.pendingOperations.push({
type: "reserve",
type: PendingOperationType.Reserve,
givesLifeness: true,
stage: reserve.reserveStatus,
timestampCreated: reserve.created,
@ -180,7 +180,7 @@ async function gatherReservePending(
break;
default:
resp.pendingOperations.push({
type: "bug",
type: PendingOperationType.Bug,
givesLifeness: false,
message: "Unknown reserve record status",
details: {
@ -199,7 +199,7 @@ async function gatherRefreshPending(
resp: PendingOperationsResponse,
onlyDue: boolean = false,
): Promise<void> {
await tx.iter(Stores.refresh).forEach(r => {
await tx.iter(Stores.refreshGroups).forEach(r => {
if (r.finishedTimestamp) {
return;
}
@ -211,43 +211,15 @@ async function gatherRefreshPending(
if (onlyDue && r.retryInfo.nextRetry.t_ms > now.t_ms) {
return;
}
let refreshStatus: string;
if (r.norevealIndex === undefined) {
refreshStatus = "melt";
} else {
refreshStatus = "reveal";
}
resp.pendingOperations.push({
type: "refresh",
type: PendingOperationType.Refresh,
givesLifeness: true,
oldCoinPub: r.meltCoinPub,
refreshStatus,
refreshOutputSize: r.newDenoms.length,
refreshSessionId: r.refreshSessionId,
refreshGroupId: r.refreshGroupId,
});
});
}
async function gatherCoinsPending(
tx: TransactionHandle,
now: Timestamp,
resp: PendingOperationsResponse,
onlyDue: boolean = false,
): Promise<void> {
// Refreshing dirty coins is always due.
await tx.iter(Stores.coins).forEach(coin => {
if (coin.status == CoinStatus.Dirty) {
resp.nextRetryDelay = { d_ms: 0 };
resp.pendingOperations.push({
givesLifeness: true,
type: "dirty-coin",
coinPub: coin.coinPub,
});
}
});
}
async function gatherWithdrawalPending(
tx: TransactionHandle,
now: Timestamp,
@ -272,7 +244,7 @@ async function gatherWithdrawalPending(
);
const numCoinsTotal = wsr.withdrawn.length;
resp.pendingOperations.push({
type: "withdraw",
type: PendingOperationType.Withdraw,
givesLifeness: true,
numCoinsTotal,
numCoinsWithdrawn,
@ -294,7 +266,7 @@ async function gatherProposalPending(
return;
}
resp.pendingOperations.push({
type: "proposal-choice",
type: PendingOperationType.ProposalChoice,
givesLifeness: false,
merchantBaseUrl: proposal.download!!.contractTerms.merchant_base_url,
proposalId: proposal.proposalId,
@ -310,7 +282,7 @@ async function gatherProposalPending(
return;
}
resp.pendingOperations.push({
type: "proposal-download",
type: PendingOperationType.ProposalDownload,
givesLifeness: true,
merchantBaseUrl: proposal.merchantBaseUrl,
orderId: proposal.orderId,
@ -343,7 +315,7 @@ async function gatherTipPending(
}
if (tip.accepted) {
resp.pendingOperations.push({
type: "tip",
type: PendingOperationType.TipPickup,
givesLifeness: true,
merchantBaseUrl: tip.merchantBaseUrl,
tipId: tip.tipId,
@ -368,7 +340,7 @@ async function gatherPurchasePending(
);
if (!onlyDue || pr.payRetryInfo.nextRetry.t_ms <= now.t_ms) {
resp.pendingOperations.push({
type: "pay",
type: PendingOperationType.Pay,
givesLifeness: true,
isReplay: false,
proposalId: pr.proposalId,
@ -385,7 +357,7 @@ async function gatherPurchasePending(
);
if (!onlyDue || pr.refundStatusRetryInfo.nextRetry.t_ms <= now.t_ms) {
resp.pendingOperations.push({
type: "refund-query",
type: PendingOperationType.RefundQuery,
givesLifeness: true,
proposalId: pr.proposalId,
retryInfo: pr.refundStatusRetryInfo,
@ -403,7 +375,7 @@ async function gatherPurchasePending(
);
if (!onlyDue || pr.refundApplyRetryInfo.nextRetry.t_ms <= now.t_ms) {
resp.pendingOperations.push({
type: "refund-apply",
type: PendingOperationType.RefundApply,
numRefundsDone,
numRefundsPending,
givesLifeness: true,
@ -429,7 +401,7 @@ export async function getPendingOperations(
[
Stores.exchanges,
Stores.reserves,
Stores.refresh,
Stores.refreshGroups,
Stores.coins,
Stores.withdrawalSession,
Stores.proposals,
@ -440,7 +412,6 @@ export async function getPendingOperations(
await gatherExchangePending(tx, now, resp, onlyDue);
await gatherReservePending(tx, now, resp, onlyDue);
await gatherRefreshPending(tx, now, resp, onlyDue);
await gatherCoinsPending(tx, now, resp, onlyDue);
await gatherWithdrawalPending(tx, now, resp, onlyDue);
await gatherProposalPending(tx, now, resp, onlyDue);
await gatherTipPending(tx, now, resp, onlyDue);

View File

@ -25,16 +25,24 @@ import {
RefreshSessionRecord,
initRetryInfo,
updateRetryInfoTimeout,
RefreshGroupRecord,
} from "../types/dbTypes";
import { amountToPretty } from "../util/helpers";
import { Database } from "../util/query";
import { Database, TransactionHandle } from "../util/query";
import { InternalWalletState } from "./state";
import { Logger } from "../util/logging";
import { getWithdrawDenomList } from "./withdraw";
import { updateExchangeFromUrl } from "./exchanges";
import { getTimestampNow, OperationError } from "../types/walletTypes";
import {
getTimestampNow,
OperationError,
CoinPublicKey,
RefreshReason,
RefreshGroupId,
} from "../types/walletTypes";
import { guardOperationException } from "./errors";
import { NotificationType } from "../types/notifications";
import { getRandomBytes, encodeCrock } from "../crypto/talerCrypto";
const logger = new Logger("refresh.ts");
@ -71,11 +79,130 @@ export function getTotalRefreshCost(
return totalCost;
}
/**
* Create a refresh session inside a refresh group.
*/
async function refreshCreateSession(
ws: InternalWalletState,
refreshGroupId: string,
coinIndex: number,
): Promise<void> {
logger.trace(
`creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
);
const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
if (!refreshGroup) {
return;
}
if (refreshGroup.finishedPerCoin[coinIndex]) {
return;
}
const existingRefreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
if (existingRefreshSession) {
return;
}
const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
const coin = await ws.db.get(Stores.coins, oldCoinPub);
if (!coin) {
throw Error("Can't refresh, coin not found");
}
const exchange = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl);
if (!exchange) {
throw Error("db inconsistent: exchange of coin not found");
}
const oldDenom = await ws.db.get(Stores.denominations, [
exchange.baseUrl,
coin.denomPub,
]);
if (!oldDenom) {
throw Error("db inconsistent: denomination for coin not found");
}
const availableDenoms: DenominationRecord[] = await ws.db
.iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl)
.toArray();
const availableAmount = Amounts.sub(coin.currentAmount, oldDenom.feeRefresh)
.amount;
const newCoinDenoms = getWithdrawDenomList(availableAmount, availableDenoms);
if (newCoinDenoms.length === 0) {
logger.trace(
`not refreshing, available amount ${amountToPretty(
availableAmount,
)} too small`,
);
await ws.db.runWithWriteTransaction(
[Stores.coins, Stores.refreshGroups],
async tx => {
const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
if (!rg) {
return;
}
rg.finishedPerCoin[coinIndex] = true;
await tx.put(Stores.refreshGroups, rg);
},
);
ws.notify({ type: NotificationType.RefreshRefused });
return;
}
const refreshSession: RefreshSessionRecord = await ws.cryptoApi.createRefreshSession(
exchange.baseUrl,
3,
coin,
newCoinDenoms,
oldDenom.feeRefresh,
);
// Store refresh session and subtract refreshed amount from
// coin in the same transaction.
await ws.db.runWithWriteTransaction(
[Stores.refreshGroups, Stores.coins],
async tx => {
const c = await tx.get(Stores.coins, coin.coinPub);
if (!c) {
throw Error("coin not found, but marked for refresh");
}
const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee);
if (r.saturated) {
console.log("can't refresh coin, no amount left");
return;
}
c.currentAmount = r.amount;
c.status = CoinStatus.Dormant;
const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
if (!rg) {
return;
}
if (rg.refreshSessionPerCoin[coinIndex]) {
return;
}
rg.refreshSessionPerCoin[coinIndex] = refreshSession;
await tx.put(Stores.refreshGroups, rg);
await tx.put(Stores.coins, c);
},
);
logger.info(
`created refresh session for coin #${coinIndex} in ${refreshGroupId}`,
);
ws.notify({ type: NotificationType.RefreshStarted });
}
async function refreshMelt(
ws: InternalWalletState,
refreshSessionId: string,
refreshGroupId: string,
coinIndex: number,
): Promise<void> {
const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId);
const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
if (!refreshGroup) {
return;
}
const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
if (!refreshSession) {
return;
}
@ -122,7 +249,11 @@ async function refreshMelt(
refreshSession.norevealIndex = norevealIndex;
await ws.db.mutate(Stores.refresh, refreshSessionId, rs => {
await ws.db.mutate(Stores.refreshGroups, refreshGroupId, rg => {
const rs = rg.refreshSessionPerCoin[coinIndex];
if (!rs) {
return;
}
if (rs.norevealIndex !== undefined) {
return;
}
@ -130,7 +261,7 @@ async function refreshMelt(
return;
}
rs.norevealIndex = norevealIndex;
return rs;
return rg;
});
ws.notify({
@ -140,9 +271,14 @@ async function refreshMelt(
async function refreshReveal(
ws: InternalWalletState,
refreshSessionId: string,
refreshGroupId: string,
coinIndex: number,
): Promise<void> {
const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId);
const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
if (!refreshGroup) {
return;
}
const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
if (!refreshSession) {
return;
}
@ -253,23 +389,38 @@ async function refreshReveal(
}
await ws.db.runWithWriteTransaction(
[Stores.coins, Stores.refresh],
[Stores.coins, Stores.refreshGroups],
async tx => {
const rs = await tx.get(Stores.refresh, refreshSessionId);
if (!rs) {
const rg = await tx.get(Stores.refreshGroups, refreshGroupId);
if (!rg) {
console.log("no refresh session found");
return;
}
const rs = rg.refreshSessionPerCoin[coinIndex];
if (!rs) {
return;
}
if (rs.finishedTimestamp) {
console.log("refresh session already finished");
return;
}
rs.finishedTimestamp = getTimestampNow();
rs.retryInfo = initRetryInfo(false);
rg.finishedPerCoin[coinIndex] = true;
let allDone = true;
for (const f of rg.finishedPerCoin) {
if (!f) {
allDone = false;
break;
}
}
if (allDone) {
rg.finishedTimestamp = getTimestampNow();
rg.retryInfo = initRetryInfo(false);
}
for (let coin of coins) {
await tx.put(Stores.coins, coin);
}
await tx.put(Stores.refresh, rs);
await tx.put(Stores.refreshGroups, rg);
},
);
console.log("refresh finished (end of reveal)");
@ -280,11 +431,11 @@ async function refreshReveal(
async function incrementRefreshRetry(
ws: InternalWalletState,
refreshSessionId: string,
refreshGroupId: string,
err: OperationError | undefined,
): Promise<void> {
await ws.db.runWithWriteTransaction([Stores.refresh], async tx => {
const r = await tx.get(Stores.refresh, refreshSessionId);
await ws.db.runWithWriteTransaction([Stores.refreshGroups], async tx => {
const r = await tx.get(Stores.refreshGroups, refreshGroupId);
if (!r) {
return;
}
@ -294,31 +445,31 @@ async function incrementRefreshRetry(
r.retryInfo.retryCounter++;
updateRetryInfoTimeout(r.retryInfo);
r.lastError = err;
await tx.put(Stores.refresh, r);
await tx.put(Stores.refreshGroups, r);
});
ws.notify({ type: NotificationType.RefreshOperationError });
}
export async function processRefreshSession(
export async function processRefreshGroup(
ws: InternalWalletState,
refreshSessionId: string,
refreshGroupId: string,
forceNow: boolean = false,
) {
return ws.memoProcessRefresh.memo(refreshSessionId, async () => {
): Promise<void> {
await ws.memoProcessRefresh.memo(refreshGroupId, async () => {
const onOpErr = (e: OperationError) =>
incrementRefreshRetry(ws, refreshSessionId, e);
return guardOperationException(
() => processRefreshSessionImpl(ws, refreshSessionId, forceNow),
incrementRefreshRetry(ws, refreshGroupId, e);
return await guardOperationException(
async () => await processRefreshGroupImpl(ws, refreshGroupId, forceNow),
onOpErr,
);
});
}
async function resetRefreshSessionRetry(
async function resetRefreshGroupRetry(
ws: InternalWalletState,
refreshSessionId: string,
) {
await ws.db.mutate(Stores.refresh, refreshSessionId, x => {
await ws.db.mutate(Stores.refreshGroups, refreshSessionId, x => {
if (x.retryInfo.active) {
x.retryInfo = initRetryInfo();
}
@ -326,124 +477,87 @@ async function resetRefreshSessionRetry(
});
}
async function processRefreshSessionImpl(
async function processRefreshGroupImpl(
ws: InternalWalletState,
refreshSessionId: string,
refreshGroupId: string,
forceNow: boolean,
) {
if (forceNow) {
await resetRefreshSessionRetry(ws, refreshSessionId);
await resetRefreshGroupRetry(ws, refreshGroupId);
}
const refreshSession = await ws.db.get(Stores.refresh, refreshSessionId);
if (!refreshSession) {
const refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
if (!refreshGroup) {
return;
}
if (refreshSession.finishedTimestamp) {
if (refreshGroup.finishedTimestamp) {
return;
}
if (typeof refreshSession.norevealIndex !== "number") {
await refreshMelt(ws, refreshSession.refreshSessionId);
}
await refreshReveal(ws, refreshSession.refreshSessionId);
const ps = refreshGroup.oldCoinPubs.map((x, i) =>
processRefreshSession(ws, refreshGroupId, i),
);
await Promise.all(ps);
logger.trace("refresh finished");
}
export async function refresh(
async function processRefreshSession(
ws: InternalWalletState,
oldCoinPub: string,
force: boolean = false,
): Promise<void> {
const coin = await ws.db.get(Stores.coins, oldCoinPub);
if (!coin) {
console.warn("can't refresh, coin not in database");
refreshGroupId: string,
coinIndex: number,
) {
logger.trace(`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`);
let refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
if (!refreshGroup) {
return;
}
switch (coin.status) {
case CoinStatus.Dirty:
break;
case CoinStatus.Dormant:
return;
case CoinStatus.Fresh:
if (!force) {
if (refreshGroup.finishedPerCoin[coinIndex]) {
return;
}
break;
if (!refreshGroup.refreshSessionPerCoin[coinIndex]) {
await refreshCreateSession(ws, refreshGroupId, coinIndex);
refreshGroup = await ws.db.get(Stores.refreshGroups, refreshGroupId);
if (!refreshGroup) {
return;
}
const exchange = await updateExchangeFromUrl(ws, coin.exchangeBaseUrl);
if (!exchange) {
throw Error("db inconsistent: exchange of coin not found");
}
const oldDenom = await ws.db.get(Stores.denominations, [
exchange.baseUrl,
coin.denomPub,
]);
if (!oldDenom) {
throw Error("db inconsistent: denomination for coin not found");
}
const availableDenoms: DenominationRecord[] = await ws.db
.iterIndex(Stores.denominations.exchangeBaseUrlIndex, exchange.baseUrl)
.toArray();
const availableAmount = Amounts.sub(coin.currentAmount, oldDenom.feeRefresh)
.amount;
const newCoinDenoms = getWithdrawDenomList(availableAmount, availableDenoms);
if (newCoinDenoms.length === 0) {
logger.trace(
`not refreshing, available amount ${amountToPretty(
availableAmount,
)} too small`,
const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
if (!refreshSession) {
if (!refreshGroup.finishedPerCoin[coinIndex]) {
throw Error(
"BUG: refresh session was not created and coin not marked as finished",
);
await ws.db.mutate(Stores.coins, oldCoinPub, x => {
if (x.status != coin.status) {
// Concurrent modification?
}
return;
}
x.status = CoinStatus.Dormant;
return x;
});
ws.notify({ type: NotificationType.RefreshRefused });
return;
if (refreshSession.norevealIndex === undefined) {
await refreshMelt(ws, refreshGroupId, coinIndex);
}
const refreshSession: RefreshSessionRecord = await ws.cryptoApi.createRefreshSession(
exchange.baseUrl,
3,
coin,
newCoinDenoms,
oldDenom.feeRefresh,
);
// Store refresh session and subtract refreshed amount from
// coin in the same transaction.
await ws.db.runWithWriteTransaction(
[Stores.refresh, Stores.coins],
async tx => {
const c = await tx.get(Stores.coins, coin.coinPub);
if (!c) {
return;
}
if (c.status !== CoinStatus.Dirty) {
return;
}
const r = Amounts.sub(c.currentAmount, refreshSession.valueWithFee);
if (r.saturated) {
console.log("can't refresh coin, no amount left");
return;
}
c.currentAmount = r.amount;
c.status = CoinStatus.Dormant;
await tx.put(Stores.refresh, refreshSession);
await tx.put(Stores.coins, c);
},
);
logger.info(`created refresh session ${refreshSession.refreshSessionId}`);
ws.notify({ type: NotificationType.RefreshStarted });
await processRefreshSession(ws, refreshSession.refreshSessionId);
await refreshReveal(ws, refreshGroupId, coinIndex);
}
/**
* Create a refresh group for a list of coins.
*/
export async function createRefreshGroup(
tx: TransactionHandle,
oldCoinPubs: CoinPublicKey[],
reason: RefreshReason,
): Promise<RefreshGroupId> {
const refreshGroupId = encodeCrock(getRandomBytes(32));
const refreshGroup: RefreshGroupRecord = {
finishedTimestamp: undefined,
finishedPerCoin: oldCoinPubs.map(x => false),
lastError: undefined,
lastErrorPerCoin: oldCoinPubs.map(x => undefined),
oldCoinPubs: oldCoinPubs.map(x => x.coinPub),
reason,
refreshGroupId,
refreshSessionPerCoin: oldCoinPubs.map(x => undefined),
retryInfo: initRetryInfo(),
};
await tx.put(Stores.refreshGroups, refreshGroup);
return {
refreshGroupId,
};
}

View File

@ -458,10 +458,10 @@ async function processReserveImpl(
break;
case ReserveRecordStatus.REGISTERING_BANK:
await processReserveBankStatus(ws, reservePub);
return processReserveImpl(ws, reservePub, true);
return await processReserveImpl(ws, reservePub, true);
case ReserveRecordStatus.QUERYING_STATUS:
await updateReserve(ws, reservePub);
return processReserveImpl(ws, reservePub, true);
return await processReserveImpl(ws, reservePub, true);
case ReserveRecordStatus.WITHDRAWING:
await depleteReserve(ws, reservePub);
break;

View File

@ -15,7 +15,6 @@
*/
import { Database } from "../util/query";
import { InternalWalletState } from "./state";
import { parseTipUri } from "../util/taleruri";
import { TipStatus, getTimestampNow, OperationError } from "../types/walletTypes";

View File

@ -41,6 +41,7 @@ import {
OperationError,
Duration,
getTimestampNow,
RefreshReason,
} from "./walletTypes";
export enum ReserveRecordStatus {
@ -571,10 +572,6 @@ export enum CoinStatus {
* Withdrawn and never shown to anybody.
*/
Fresh = "fresh",
/**
* Used for a completed transaction and now dirty.
*/
Dirty = "dirty",
/**
* A coin that has been spent and refreshed.
*/
@ -849,6 +846,39 @@ export interface TipRecord {
retryInfo: RetryInfo;
}
export interface RefreshGroupRecord {
/**
* Retry info, even present when the operation isn't active to allow indexing
* on the next retry timestamp.
*/
retryInfo: RetryInfo;
lastError: OperationError | undefined;
lastErrorPerCoin: (OperationError | undefined)[];
refreshGroupId: string;
reason: RefreshReason;
oldCoinPubs: string[];
refreshSessionPerCoin: (RefreshSessionRecord | undefined)[];
/**
* Flag for each coin whether refreshing finished.
* If a coin can't be refreshed (remaining value too small),
* it will be marked as finished, but no refresh session will
* be created.
*/
finishedPerCoin: boolean[];
/**
* Timestamp when the refresh session finished.
*/
finishedTimestamp: Timestamp | undefined;
}
/**
* Ongoing refresh
*/
@ -912,31 +942,20 @@ export interface RefreshSessionRecord {
*/
hash: string;
/**
* Base URL for the exchange we're doing the refresh with.
*/
exchangeBaseUrl: string;
/**
* Timestamp when the refresh session finished.
*/
finishedTimestamp: Timestamp | undefined;
/**
* A 32-byte base32-crockford encoded random identifier.
*/
refreshSessionId: string;
/**
* When has this refresh session been created?
*/
created: Timestamp;
/**
* Retry info, even present when the operation isn't active to allow indexing
* on the next retry timestamp.
* Base URL for the exchange we're doing the refresh with.
*/
retryInfo: RetryInfo;
exchangeBaseUrl: string;
}
/**
@ -1366,8 +1385,8 @@ export namespace Stores {
export const denominations = new DenominationsStore();
export const exchanges = new ExchangesStore();
export const proposals = new ProposalsStore();
export const refresh = new Store<RefreshSessionRecord>("refresh", {
keyPath: "refreshSessionId",
export const refreshGroups = new Store<RefreshGroupRecord>("refreshGroups", {
keyPath: "refreshGroupId",
});
export const reserves = new ReservesStore();
export const purchases = new PurchasesStore();

View File

@ -1,4 +1,4 @@
import { Timestamp } from "./walletTypes";
import { Timestamp, RefreshReason } from "./walletTypes";
/*
This file is part of GNU Taler
@ -603,18 +603,6 @@ export interface HistoryRefund {
amountRefundedEffective: string;
}
/**
* Reasons for why a coin is being refreshed.
*/
export const enum RefreshReason {
Manual = "manual",
Pay = "pay",
Refund = "refund",
AbortPay = "abort-pay",
Recoup = "recoup",
BackupRestored = "backup-restored",
}
/**
* Event to indicate that a group of refresh sessions has completed.
*/

View File

@ -24,27 +24,41 @@
import { OperationError, Timestamp, Duration } from "./walletTypes";
import { WithdrawalSource, RetryInfo } from "./dbTypes";
export const enum PendingOperationType {
Bug = "bug",
ExchangeUpdate = "exchange-update",
Pay = "pay",
ProposalChoice = "proposal-choice",
ProposalDownload = "proposal-download",
Refresh = "refresh",
Reserve = "reserve",
RefundApply = "refund-apply",
RefundQuery = "refund-query",
TipChoice = "tip-choice",
TipPickup = "tip-pickup",
Withdraw = "withdraw",
}
/**
* Information about a pending operation.
*/
export type PendingOperationInfo = PendingOperationInfoCommon &
(
| PendingWithdrawOperation
| PendingReserveOperation
| PendingBugOperation
| PendingDirtyCoinOperation
| PendingExchangeUpdateOperation
| PendingRefreshOperation
| PendingTipOperation
| PendingProposalDownloadOperation
| PendingProposalChoiceOperation
| PendingPayOperation
| PendingRefundQueryOperation
| PendingProposalChoiceOperation
| PendingProposalDownloadOperation
| PendingRefreshOperation
| PendingRefundApplyOperation
| PendingRefundQueryOperation
| PendingReserveOperation
| PendingTipPickupOperation
| PendingWithdrawOperation
);
export interface PendingExchangeUpdateOperation {
type: "exchange-update";
type: PendingOperationType.ExchangeUpdate;
stage: string;
reason: string;
exchangeBaseUrl: string;
@ -52,13 +66,13 @@ export interface PendingExchangeUpdateOperation {
}
export interface PendingBugOperation {
type: "bug";
type: PendingOperationType.Bug;
message: string;
details: any;
}
export interface PendingReserveOperation {
type: "reserve";
type: PendingOperationType.Reserve;
retryInfo: RetryInfo | undefined;
stage: string;
timestampCreated: Timestamp;
@ -68,21 +82,13 @@ export interface PendingReserveOperation {
}
export interface PendingRefreshOperation {
type: "refresh";
type: PendingOperationType.Refresh;
lastError?: OperationError;
refreshSessionId: string;
oldCoinPub: string;
refreshStatus: string;
refreshOutputSize: number;
}
export interface PendingDirtyCoinOperation {
type: "dirty-coin";
coinPub: string;
refreshGroupId: string;
}
export interface PendingProposalDownloadOperation {
type: "proposal-download";
type: PendingOperationType.ProposalDownload;
merchantBaseUrl: string;
proposalTimestamp: Timestamp;
proposalId: string;
@ -96,63 +102,54 @@ export interface PendingProposalDownloadOperation {
* proposed contract terms.
*/
export interface PendingProposalChoiceOperation {
type: "proposal-choice";
type: PendingOperationType.ProposalChoice;
merchantBaseUrl: string;
proposalTimestamp: Timestamp;
proposalId: string;
}
export interface PendingTipOperation {
type: "tip";
export interface PendingTipPickupOperation {
type: PendingOperationType.TipPickup;
tipId: string;
merchantBaseUrl: string;
merchantTipId: string;
}
export interface PendingPayOperation {
type: "pay";
type: PendingOperationType.Pay;
proposalId: string;
isReplay: boolean;
retryInfo: RetryInfo,
retryInfo: RetryInfo;
lastError: OperationError | undefined;
}
export interface PendingRefundQueryOperation {
type: "refund-query";
type: PendingOperationType.RefundQuery;
proposalId: string;
retryInfo: RetryInfo,
retryInfo: RetryInfo;
lastError: OperationError | undefined;
}
export interface PendingRefundApplyOperation {
type: "refund-apply";
type: PendingOperationType.RefundApply;
proposalId: string;
retryInfo: RetryInfo,
retryInfo: RetryInfo;
lastError: OperationError | undefined;
numRefundsPending: number;
numRefundsDone: number;
}
export interface PendingOperationInfoCommon {
type: string;
givesLifeness: boolean;
}
export interface PendingWithdrawOperation {
type: "withdraw";
type: PendingOperationType.Withdraw;
source: WithdrawalSource;
withdrawSessionId: string;
numCoinsWithdrawn: number;
numCoinsTotal: number;
}
export interface PendingRefreshOperation {
type: "refresh";
}
export interface PendingPayOperation {
type: "pay";
export interface PendingOperationInfoCommon {
type: PendingOperationType;
givesLifeness: boolean;
}
export interface PendingOperationsResponse {

View File

@ -506,3 +506,29 @@ export interface PlanchetCreationRequest {
reservePub: string;
reservePriv: string;
}
/**
* Reasons for why a coin is being refreshed.
*/
export const enum RefreshReason {
Manual = "manual",
Pay = "pay",
Refund = "refund",
AbortPay = "abort-pay",
Recoup = "recoup",
BackupRestored = "backup-restored",
}
/**
* Wrapper for coin public keys.
*/
export interface CoinPublicKey {
readonly coinPub: string;
}
/**
* Wrapper for refresh group IDs.
*/
export interface RefreshGroupId {
readonly refreshGroupId: string;
}

View File

@ -39,15 +39,14 @@ export class AsyncOpMemoMap<T> {
const n = this.n++;
// Wrap the operation in case it immediately throws
const p = Promise.resolve().then(() => pg());
p.finally(() => {
this.cleanUp(key, n);
});
this.memoMap[key] = {
p,
n,
t: new Date().getTime(),
};
return p;
return p.finally(() => {
this.cleanUp(key, n);
});
}
clear() {
this.memoMap = {};

View File

@ -77,6 +77,7 @@ import {
AcceptWithdrawalResponse,
PurchaseDetails,
ExchangeWithdrawDetails,
RefreshReason,
} from "./types/walletTypes";
import { Logger } from "./util/logging";
@ -92,7 +93,7 @@ import { processReserve } from "./operations/reserves";
import { InternalWalletState } from "./operations/state";
import { createReserve, confirmReserve } from "./operations/reserves";
import { processRefreshSession, refresh } from "./operations/refresh";
import { processRefreshGroup, createRefreshGroup } from "./operations/refresh";
import { processWithdrawSession } from "./operations/withdraw";
import { getHistory } from "./operations/history";
import { getPendingOperations } from "./operations/pending";
@ -103,7 +104,7 @@ import { payback } from "./operations/payback";
import { TimerGroup } from "./util/timer";
import { AsyncCondition } from "./util/promiseUtils";
import { AsyncOpMemoSingle } from "./util/asyncMemo";
import { PendingOperationInfo, PendingOperationsResponse } from "./types/pending";
import { PendingOperationInfo, PendingOperationsResponse, PendingOperationType } from "./types/pending";
import { WalletNotification, NotificationType } from "./types/notifications";
import { HistoryQuery, HistoryEvent } from "./types/history";
@ -180,48 +181,45 @@ export class Wallet {
): Promise<void> {
console.log("running pending", pending);
switch (pending.type) {
case "bug":
case PendingOperationType.Bug:
// Nothing to do, will just be displayed to the user
return;
case "dirty-coin":
await refresh(this.ws, pending.coinPub);
break;
case "exchange-update":
case PendingOperationType.ExchangeUpdate:
await updateExchangeFromUrl(this.ws, pending.exchangeBaseUrl, forceNow);
break;
case "refresh":
await processRefreshSession(
case PendingOperationType.Refresh:
await processRefreshGroup(
this.ws,
pending.refreshSessionId,
pending.refreshGroupId,
forceNow,
);
break;
case "reserve":
case PendingOperationType.Reserve:
await processReserve(this.ws, pending.reservePub, forceNow);
break;
case "withdraw":
case PendingOperationType.Withdraw:
await processWithdrawSession(
this.ws,
pending.withdrawSessionId,
forceNow,
);
break;
case "proposal-choice":
case PendingOperationType.ProposalChoice:
// Nothing to do, user needs to accept/reject
break;
case "proposal-download":
case PendingOperationType.ProposalDownload:
await processDownloadProposal(this.ws, pending.proposalId, forceNow);
break;
case "tip":
case PendingOperationType.TipPickup:
await processTip(this.ws, pending.tipId, forceNow);
break;
case "pay":
case PendingOperationType.Pay:
await processPurchasePay(this.ws, pending.proposalId, forceNow);
break;
case "refund-query":
case PendingOperationType.RefundQuery:
await processPurchaseQueryRefund(this.ws, pending.proposalId, forceNow);
break;
case "refund-apply":
case PendingOperationType.RefundApply:
await processPurchaseApplyRefund(this.ws, pending.proposalId, forceNow);
break;
default:
@ -369,28 +367,6 @@ export class Wallet {
return preparePay(this.ws, talerPayUri);
}
/**
* Refresh all dirty coins.
* The returned promise resolves only after all refresh
* operations have completed.
*/
async refreshDirtyCoins(): Promise<{ numRefreshed: number }> {
let n = 0;
const coins = await this.db.iter(Stores.coins).toArray();
for (let coin of coins) {
if (coin.status == CoinStatus.Dirty) {
try {
await this.refresh(coin.coinPub);
} catch (e) {
console.log("error during refresh");
}
n += 1;
}
}
return { numRefreshed: n };
}
/**
* Add a contract to the wallet and sign coins, and send them.
*/
@ -496,9 +472,12 @@ export class Wallet {
return this.ws.memoGetBalance.memo(() => getBalances(this.ws));
}
async refresh(oldCoinPub: string, force: boolean = false): Promise<void> {
async refresh(oldCoinPub: string): Promise<void> {
try {
return refresh(this.ws, oldCoinPub, force);
const refreshGroupId = await this.db.runWithWriteTransaction([Stores.refreshGroups], async (tx) => {
return await createRefreshGroup(tx, [{ coinPub: oldCoinPub }], RefreshReason.Manual);
});
await processRefreshGroup(this.ws, refreshGroupId.refreshGroupId);
} catch (e) {
this.latch.trigger();
}