wallet-core: make changes to available amount atomic

W.r.t. transactions
This commit is contained in:
Florian Dold 2023-06-26 19:27:34 +02:00
parent 2779086a32
commit a844136489
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
7 changed files with 187 additions and 52 deletions

View File

@ -119,7 +119,7 @@ export const CURRENT_DB_CONFIG_KEY = "currentMainDbName";
* backwards-compatible way or object stores and indices
* are added.
*/
export const WALLET_DB_MINOR_VERSION = 8;
export const WALLET_DB_MINOR_VERSION = 9;
/**
* Ranges for operation status fields.
@ -723,6 +723,14 @@ export interface CoinRecord {
*/
coinSource: CoinSource;
/**
* Source transaction ID of the coin.
*
* Used to make the coin visible after the transaction
* has entered a final state.
*/
sourceTransactionId?: string;
/**
* Public key of the coin.
*/
@ -768,6 +776,14 @@ export interface CoinRecord {
*/
status: CoinStatus;
/**
* Non-zero for visible.
*
* A coin is visible when it is fresh and the
* source transaction is in a final state.
*/
visible?: number;
/**
* Information about what the coin has been allocated for.
*
@ -894,7 +910,7 @@ export enum RefreshCoinStatus {
* The refresh for this coin has been frozen, because of a permanent error.
* More info in lastErrorPerCoin.
*/
Frozen = OperationStatusRange.DORMANT_START + 1,
Failed = OperationStatusRange.DORMANT_START + 1,
}
export enum OperationStatus {
@ -1748,7 +1764,6 @@ export interface DepositKycInfo {
exchangeBaseUrl: string;
}
/**
* Record for a deposits that the wallet observed
* as a result of double spending, but which is not
@ -2132,6 +2147,15 @@ export interface CoinAvailabilityRecord {
* Number of fresh coins of this denomination that are available.
*/
freshCoinCount: number;
/**
* Number of fresh coins that are available
* and visible, i.e. the source transaction is in
* a final state.
*
* (Optional for backwards compatibility, defaults to 0.)
*/
visibleCoinCount?: number;
}
export interface ContractTermsRecord {
@ -2318,6 +2342,13 @@ export const WalletStoresV1 = {
["exchangeBaseUrl", "denomPubHash", "maxAge", "status"],
),
byCoinEvHash: describeIndex("byCoinEvHash", "coinEvHash"),
bySourceTransactionId: describeIndex(
"bySourceTransactionId",
"sourceTransactionId",
{
versionAdded: 9,
},
),
},
),
reserves: describeStore(

View File

@ -20,14 +20,17 @@
* There are multiple definition of the wallet's balance.
* We use the following terminology:
*
* - "available": Balance that the wallet believes will certainly be available
* for spending, modulo any failures of the exchange or double spending issues.
* This includes available coins *not* allocated to any
* spending/refresh/... operation. Pending withdrawals are *not* counted
* towards this balance, because they are not certain to succeed.
* Pending refreshes *are* counted towards this balance.
* This balance type is nice to show to the user, because it does not
* temporarily decrease after payment when we are waiting for refreshes
* - "available": Balance that is available
* for spending from transactions in their final state and
* expected to be available from pending refreshes.
*
* - "pending-incoming": Expected (positive!) delta
* to the available balance that we expect to have
* after pending operations reach the "done" state.
*
* - "pending-outgoing": Amount that is currently allocated
* to be spent, but the spend operation could still be aborted
* and part of the pending-outgoing amount could be recovered.
*
* - "material": Balance that the wallet believes it could spend *right now*,
* without waiting for any operations to complete.
@ -61,11 +64,13 @@ import {
AllowedExchangeInfo,
RefreshGroupRecord,
WalletStoresV1,
WithdrawalGroupStatus,
} from "../db.js";
import { InternalWalletState } from "../internal-wallet-state.js";
import { checkLogicInvariant } from "../util/invariants.js";
import { GetReadOnlyAccess } from "../util/query.js";
import { getExchangeDetails } from "./exchanges.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
/**
* Logger.
@ -133,7 +138,8 @@ export async function getBalancesInsideTransaction(
await tx.coinAvailability.iter().forEach((ca) => {
const b = initBalance(ca.currency);
for (let i = 0; i < ca.freshCoinCount; i++) {
const count = ca.visibleCoinCount ?? 0;
for (let i = 0; i < count; i++) {
b.available = Amounts.add(b.available, {
currency: ca.currency,
fraction: ca.amountFrac,
@ -150,14 +156,40 @@ export async function getBalancesInsideTransaction(
).amount;
});
await tx.withdrawalGroups.iter().forEach((wds) => {
if (wds.timestampFinish) {
return;
// FIXME: Use indexing to filter out final transactions.
await tx.withdrawalGroups.iter().forEach((wgRecord) => {
switch (wgRecord.status) {
case WithdrawalGroupStatus.AbortedBank:
case WithdrawalGroupStatus.AbortedExchange:
case WithdrawalGroupStatus.FailedAbortingBank:
case WithdrawalGroupStatus.FailedBankAborted:
case WithdrawalGroupStatus.Finished:
// Does not count as pendingIncoming
return;
case WithdrawalGroupStatus.PendingReady:
case WithdrawalGroupStatus.AbortingBank:
case WithdrawalGroupStatus.PendingAml:
case WithdrawalGroupStatus.PendingKyc:
case WithdrawalGroupStatus.PendingQueryingStatus:
case WithdrawalGroupStatus.SuspendedWaitConfirmBank:
case WithdrawalGroupStatus.SuspendedReady:
case WithdrawalGroupStatus.SuspendedRegisteringBank:
case WithdrawalGroupStatus.SuspendedKyc:
case WithdrawalGroupStatus.SuspendedAbortingBank:
case WithdrawalGroupStatus.SuspendedAml:
case WithdrawalGroupStatus.PendingRegisteringBank:
case WithdrawalGroupStatus.PendingWaitConfirmBank:
case WithdrawalGroupStatus.SuspendedQueryingStatus:
break;
default:
assertUnreachable(wgRecord.status);
}
const b = initBalance(Amounts.currencyOf(wds.denomsSel.totalWithdrawCost));
const b = initBalance(
Amounts.currencyOf(wgRecord.denomsSel.totalWithdrawCost),
);
b.pendingIncoming = Amounts.add(
b.pendingIncoming,
wds.denomsSel.totalCoinValue,
wgRecord.denomsSel.totalCoinValue,
).amount;
});

View File

@ -81,6 +81,38 @@ export interface CoinsSpendInfo {
allocationId: TransactionIdStr;
}
export async function makeCoinsVisible(
ws: InternalWalletState,
tx: GetReadWriteAccess<{
coins: typeof WalletStoresV1.coins;
coinAvailability: typeof WalletStoresV1.coinAvailability;
}>,
transactionId: string,
): Promise<void> {
const coins = await tx.coins.indexes.bySourceTransactionId.getAll(
transactionId,
);
for (const coinRecord of coins) {
if (!coinRecord.visible) {
coinRecord.visible = 1;
await tx.coins.put(coinRecord);
const ageRestriction = coinRecord.maxAge;
const car = await tx.coinAvailability.get([
coinRecord.exchangeBaseUrl,
coinRecord.denomPubHash,
ageRestriction,
]);
if (!car) {
logger.error("missing coin availability record");
continue;
}
const visCount = car.visibleCoinCount ?? 0;
car.visibleCoinCount = visCount + 1;
await tx.coinAvailability.put(car);
}
}
}
export async function makeCoinAvailable(
ws: InternalWalletState,
tx: GetReadWriteAccess<{
@ -195,6 +227,13 @@ export async function spendCoins(
);
}
coinAvailability.freshCoinCount--;
if (coin.visible) {
if (!coinAvailability.visibleCoinCount) {
logger.error("coin availability inconsistent");
} else {
coinAvailability.visibleCoinCount--;
}
}
await tx.coins.put(coin);
await tx.coinAvailability.put(coinAvailability);
}

View File

@ -46,16 +46,20 @@ import {
NotificationType,
RefreshGroupId,
RefreshReason,
TalerError,
TalerErrorCode,
TalerErrorDetail,
TalerPreciseTimestamp,
TalerProtocolTimestamp,
TransactionAction,
TransactionMajorState,
TransactionState,
TransactionType,
URL,
} from "@gnu-taler/taler-util";
import {
readSuccessResponseJsonOrThrow,
readUnexpectedResponseDetails,
} from "@gnu-taler/taler-util/http";
import { TalerCryptoInterface } from "../crypto/cryptoImplementation.js";
import {
DerivedRefreshSession,
@ -72,25 +76,23 @@ import {
RefreshReasonDetails,
WalletStoresV1,
} from "../db.js";
import { TalerError } from "@gnu-taler/taler-util";
import { isWithdrawableDenom, PendingTaskType } from "../index.js";
import {
EXCHANGE_COINS_LOCK,
InternalWalletState,
} from "../internal-wallet-state.js";
import { assertUnreachable } from "../util/assertUnreachable.js";
import {
readSuccessResponseJsonOrThrow,
readUnexpectedResponseDetails,
} from "@gnu-taler/taler-util/http";
import { selectWithdrawalDenominations } from "../util/coinSelection.js";
import { checkDbInvariant } from "../util/invariants.js";
import { GetReadOnlyAccess, GetReadWriteAccess } from "../util/query.js";
import { constructTaskIdentifier, makeCoinAvailable, OperationAttemptResult, OperationAttemptResultType } from "./common.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import { selectWithdrawalDenominations } from "../util/coinSelection.js";
import {
isWithdrawableDenom,
PendingTaskType,
} from "../index.js";
constructTaskIdentifier,
makeCoinAvailable,
makeCoinsVisible,
OperationAttemptResult,
OperationAttemptResultType,
} from "./common.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import {
constructTransactionIdentifier,
notifyTransition,
@ -144,24 +146,26 @@ export function getTotalRefreshCost(
return totalCost;
}
function updateGroupStatus(rg: RefreshGroupRecord): void {
const allDone = fnutil.all(
function updateGroupStatus(rg: RefreshGroupRecord): { final: boolean } {
const allFinal = fnutil.all(
rg.statusPerCoin,
(x) => x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Frozen,
(x) => x === RefreshCoinStatus.Finished || x === RefreshCoinStatus.Failed,
);
const anyFrozen = fnutil.any(
const anyFailed = fnutil.any(
rg.statusPerCoin,
(x) => x === RefreshCoinStatus.Frozen,
(x) => x === RefreshCoinStatus.Failed,
);
if (allDone) {
if (anyFrozen) {
if (allFinal) {
if (anyFailed) {
rg.timestampFinished = TalerPreciseTimestamp.now();
rg.operationStatus = RefreshOperationStatus.Failed;
} else {
rg.timestampFinished = TalerPreciseTimestamp.now();
rg.operationStatus = RefreshOperationStatus.Finished;
}
return { final: true };
}
return { final: false };
}
/**
@ -248,22 +252,30 @@ async function refreshCreateSession(
ws.config.testing.denomselAllowLate,
);
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId,
});
if (newCoinDenoms.selectedDenoms.length === 0) {
logger.trace(
`not refreshing, available amount ${amountToPretty(
availableAmount,
)} too small`,
);
// FIXME: State transition notification missing.
await ws.db
.mktx((x) => [x.coins, x.refreshGroups])
.mktx((x) => [x.coins, x.coinAvailability, x.refreshGroups])
.runReadWrite(async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
return;
}
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Finished;
updateGroupStatus(rg);
const updateRes = updateGroupStatus(rg);
if (updateRes.final) {
await makeCoinsVisible(ws, tx, transactionId);
}
await tx.refreshGroups.put(rg);
});
return;
@ -418,10 +430,15 @@ async function refreshMelt(
});
});
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId,
});
if (resp.status === HttpStatusCode.NotFound) {
const errDetails = await readUnexpectedResponseDetails(resp);
await ws.db
.mktx((x) => [x.refreshGroups])
.mktx((x) => [x.refreshGroups, x.coins, x.coinAvailability])
.runReadWrite(async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) {
@ -433,9 +450,12 @@ async function refreshMelt(
if (rg.statusPerCoin[coinIndex] !== RefreshCoinStatus.Pending) {
return;
}
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Frozen;
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
rg.lastErrorPerCoin[coinIndex] = errDetails;
updateGroupStatus(rg);
const updateRes = updateGroupStatus(rg);
if (updateRes.final) {
await makeCoinsVisible(ws, tx, transactionId);
}
await tx.refreshGroups.put(rg);
});
return;
@ -672,6 +692,11 @@ async function refreshReveal(
const coins: CoinRecord[] = [];
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Refresh,
refreshGroupId,
});
for (let i = 0; i < refreshSession.newDenoms.length; i++) {
const ncd = newCoinDenoms[i];
for (let j = 0; j < refreshSession.newDenoms[i].count; j++) {
@ -701,6 +726,7 @@ async function refreshReveal(
refreshGroupId,
oldCoinPub: refreshGroup.oldCoinPubs[coinIndex],
},
sourceTransactionId: transactionId,
coinEvHash: pc.coinEvHash,
maxAge: pc.maxAge,
ageCommitmentProof: pc.ageCommitmentProof,

View File

@ -57,7 +57,7 @@ import {
readSuccessResponseJsonOrThrow,
} from "@gnu-taler/taler-util/http";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
import { constructTaskIdentifier, makeCoinAvailable, OperationAttemptResult, OperationAttemptResultType } from "./common.js";
import { constructTaskIdentifier, makeCoinAvailable, makeCoinsVisible, OperationAttemptResult, OperationAttemptResultType } from "./common.js";
import { updateExchangeFromUrl } from "./exchanges.js";
import {
getCandidateWithdrawalDenoms,
@ -387,6 +387,7 @@ export async function processTip(
coinIndex: i,
walletTipId: walletTipId,
},
sourceTransactionId: transactionId,
denomPubHash: denom.denomPubHash,
denomSig: { cipher: DenomKeyType.Rsa, rsa_signature: denomSigRsa.sig },
exchangeBaseUrl: tipRecord.exchangeBaseUrl,
@ -416,6 +417,7 @@ export async function processTip(
for (const cr of newCoinRecords) {
await makeCoinAvailable(ws, tx, cr);
}
await makeCoinsVisible(ws, tx, transactionId);
return { oldTxState, newTxState };
});
notifyTransition(ws, transactionId, transitionInfo);

View File

@ -97,6 +97,7 @@ import {
TaskIdentifiers,
constructTaskIdentifier,
makeCoinAvailable,
makeCoinsVisible,
makeExchangeListItem,
runLongpollAsync,
} from "../operations/common.js";
@ -1029,6 +1030,11 @@ async function processPlanchetVerifyAndStoreCoin(
return;
}
const transactionId = constructTransactionIdentifier({
tag: TransactionType.Withdrawal,
withdrawalGroupId: wgContext.wgRecord.withdrawalGroupId,
});
const { planchet, denomInfo } = d;
const planchetDenomPub = denomInfo.denomPub;
@ -1099,6 +1105,7 @@ async function processPlanchetVerifyAndStoreCoin(
reservePub: withdrawalGroup.reservePub,
withdrawalGroupId: withdrawalGroup.withdrawalGroupId,
},
sourceTransactionId: transactionId,
maxAge: withdrawalGroup.restrictAge ?? AgeRestriction.AGE_UNRESTRICTED,
ageCommitmentProof: planchet.ageCommitmentProof,
spendAllocation: undefined,
@ -1111,7 +1118,7 @@ async function processPlanchetVerifyAndStoreCoin(
// Check if this is the first time that the whole
// withdrawal succeeded. If so, mark the withdrawal
// group as finished.
const firstSuccess = await ws.db
const success = await ws.db
.mktx((x) => [
x.coins,
x.denominations,
@ -1130,7 +1137,9 @@ async function processPlanchetVerifyAndStoreCoin(
return true;
});
ws.notify({ type: NotificationType.BalanceChange });
if (success) {
ws.notify({ type: NotificationType.BalanceChange });
}
}
/**
@ -1495,10 +1504,7 @@ async function processWithdrawalGroupPendingReady(
};
});
notifyTransition(ws, transactionId, transitionInfo);
return {
type: OperationAttemptResultType.Finished,
result: undefined,
};
return OperationAttemptResult.finishedEmpty();
}
const numTotalCoins = withdrawalGroup.denomsSel.selectedDenoms
@ -1563,7 +1569,7 @@ async function processWithdrawalGroupPendingReady(
const maxReportedErrors = 5;
const res = await ws.db
.mktx((x) => [x.coins, x.withdrawalGroups, x.planchets])
.mktx((x) => [x.coins, x.coinAvailability, x.withdrawalGroups, x.planchets])
.runReadWrite(async (tx) => {
const wg = await tx.withdrawalGroups.get(withdrawalGroupId);
if (!wg) {
@ -1588,6 +1594,7 @@ async function processWithdrawalGroupPendingReady(
if (wg.timestampFinish === undefined && numFinished === numTotalCoins) {
wg.timestampFinish = TalerPreciseTimestamp.now();
wg.status = WithdrawalGroupStatus.Finished;
await makeCoinsVisible(ws, tx, transactionId);
}
const newTxState = computeWithdrawalTransactionStatus(wg);

View File

@ -31,7 +31,6 @@ import {
AmountJson,
AmountResponse,
Amounts,
AmountString,
CoinStatus,
ConvertAmountRequest,
DenominationInfo,
@ -42,7 +41,6 @@ import {
ForcedDenomSel,
GetAmountRequest,
GetPlanForOperationRequest,
GetPlanForOperationResponse,
j2s,
Logger,
parsePaytoUri,