/*
This file is part of GNU Taler
(C) 2022 GNUnet e.V.
GNU Taler is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
GNU Taler; see the file COPYING. If not, see
*/
/**
* Imports.
*/
import {
AgeRestriction,
AmountJson,
Amounts,
CancellationToken,
CoinRefreshRequest,
CoinStatus,
ExchangeEntryStatus,
ExchangeListItem,
ExchangeTosStatus,
getErrorDetailFromException,
j2s,
Logger,
OperationErrorInfo,
RefreshReason,
TalerErrorCode,
TalerErrorDetail,
TombstoneIdStr,
TransactionIdStr,
TransactionType,
} from "@gnu-taler/taler-util";
import {
WalletStoresV1,
CoinRecord,
ExchangeDetailsRecord,
ExchangeRecord,
} from "../db.js";
import { makeErrorDetail, TalerError } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../internal-wallet-state.js";
import { checkDbInvariant, checkLogicInvariant } from "../util/invariants.js";
import { GetReadWriteAccess } from "../util/query.js";
import {
OperationAttemptResult,
OperationAttemptResultType,
RetryInfo,
} from "../util/retries.js";
import { CryptoApiStoppedError } from "../crypto/workers/crypto-dispatcher.js";
const logger = new Logger("operations/common.ts");
export interface CoinsSpendInfo {
coinPubs: string[];
contributions: AmountJson[];
refreshReason: RefreshReason;
/**
* Identifier for what the coin has been spent for.
*/
allocationId: TransactionIdStr;
}
export async function makeCoinAvailable(
ws: InternalWalletState,
tx: GetReadWriteAccess<{
coins: typeof WalletStoresV1.coins;
coinAvailability: typeof WalletStoresV1.coinAvailability;
denominations: typeof WalletStoresV1.denominations;
}>,
coinRecord: CoinRecord,
): Promise {
checkLogicInvariant(coinRecord.status === CoinStatus.Fresh);
const existingCoin = await tx.coins.get(coinRecord.coinPub);
if (existingCoin) {
return;
}
const denom = await tx.denominations.get([
coinRecord.exchangeBaseUrl,
coinRecord.denomPubHash,
]);
checkDbInvariant(!!denom);
const ageRestriction = coinRecord.maxAge;
let car = await tx.coinAvailability.get([
coinRecord.exchangeBaseUrl,
coinRecord.denomPubHash,
ageRestriction,
]);
if (!car) {
car = {
maxAge: ageRestriction,
amountFrac: denom.amountFrac,
amountVal: denom.amountVal,
currency: denom.currency,
denomPubHash: denom.denomPubHash,
exchangeBaseUrl: denom.exchangeBaseUrl,
freshCoinCount: 0,
};
}
car.freshCoinCount++;
await tx.coins.put(coinRecord);
await tx.coinAvailability.put(car);
}
export async function spendCoins(
ws: InternalWalletState,
tx: GetReadWriteAccess<{
coins: typeof WalletStoresV1.coins;
coinAvailability: typeof WalletStoresV1.coinAvailability;
refreshGroups: typeof WalletStoresV1.refreshGroups;
denominations: typeof WalletStoresV1.denominations;
}>,
csi: CoinsSpendInfo,
): Promise {
if (csi.coinPubs.length != csi.contributions.length) {
throw Error("assertion failed");
}
if (csi.coinPubs.length === 0) {
return;
}
let refreshCoinPubs: CoinRefreshRequest[] = [];
for (let i = 0; i < csi.coinPubs.length; i++) {
const coin = await tx.coins.get(csi.coinPubs[i]);
if (!coin) {
throw Error("coin allocated for payment doesn't exist anymore");
}
const denom = await ws.getDenomInfo(
ws,
tx,
coin.exchangeBaseUrl,
coin.denomPubHash,
);
checkDbInvariant(!!denom);
const coinAvailability = await tx.coinAvailability.get([
coin.exchangeBaseUrl,
coin.denomPubHash,
coin.maxAge,
]);
checkDbInvariant(!!coinAvailability);
const contrib = csi.contributions[i];
if (coin.status !== CoinStatus.Fresh) {
const alloc = coin.spendAllocation;
if (!alloc) {
continue;
}
if (alloc.id !== csi.allocationId) {
// FIXME: assign error code
logger.info("conflicting coin allocation ID");
logger.info(`old ID: ${alloc.id}, new ID: ${csi.allocationId}`);
throw Error("conflicting coin allocation (id)");
}
if (0 !== Amounts.cmp(alloc.amount, contrib)) {
// FIXME: assign error code
throw Error("conflicting coin allocation (contrib)");
}
continue;
}
coin.status = CoinStatus.Dormant;
coin.spendAllocation = {
id: csi.allocationId,
amount: Amounts.stringify(contrib),
};
const remaining = Amounts.sub(denom.value, contrib);
if (remaining.saturated) {
throw Error("not enough remaining balance on coin for payment");
}
refreshCoinPubs.push({
amount: Amounts.stringify(remaining.amount),
coinPub: coin.coinPub,
});
checkDbInvariant(!!coinAvailability);
if (coinAvailability.freshCoinCount === 0) {
throw Error(
`invalid coin count ${coinAvailability.freshCoinCount} in DB`,
);
}
coinAvailability.freshCoinCount--;
await tx.coins.put(coin);
await tx.coinAvailability.put(coinAvailability);
}
await ws.refreshOps.createRefreshGroup(
ws,
tx,
Amounts.currencyOf(csi.contributions[0]),
refreshCoinPubs,
csi.refreshReason,
{
originatingTransactionId: csi.allocationId,
},
);
}
export async function storeOperationError(
ws: InternalWalletState,
pendingTaskId: string,
e: TalerErrorDetail,
): Promise {
await ws.db
.mktx((x) => [x.operationRetries])
.runReadWrite(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
retryRecord = {
id: pendingTaskId,
lastError: e,
retryInfo: RetryInfo.reset(),
};
} else {
retryRecord.lastError = e;
retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
});
}
export async function resetOperationTimeout(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
await ws.db
.mktx((x) => [x.operationRetries])
.runReadWrite(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (retryRecord) {
// Note that we don't reset the lastError, it should still be visible
// while the retry runs.
retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
await tx.operationRetries.put(retryRecord);
}
});
}
export async function storeOperationPending(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
await ws.db
.mktx((x) => [x.operationRetries])
.runReadWrite(async (tx) => {
let retryRecord = await tx.operationRetries.get(pendingTaskId);
if (!retryRecord) {
retryRecord = {
id: pendingTaskId,
retryInfo: RetryInfo.reset(),
};
} else {
delete retryRecord.lastError;
retryRecord.retryInfo = RetryInfo.increment(retryRecord.retryInfo);
}
await tx.operationRetries.put(retryRecord);
});
}
export async function runOperationWithErrorReporting(
ws: InternalWalletState,
opId: string,
f: () => Promise>,
): Promise> {
let maybeError: TalerErrorDetail | undefined;
try {
const resp = await f();
switch (resp.type) {
case OperationAttemptResultType.Error:
await storeOperationError(ws, opId, resp.errorDetail);
return resp;
case OperationAttemptResultType.Finished:
await storeOperationFinished(ws, opId);
return resp;
case OperationAttemptResultType.Pending:
await storeOperationPending(ws, opId);
return resp;
case OperationAttemptResultType.Longpoll:
return resp;
}
} catch (e) {
if (e instanceof CryptoApiStoppedError) {
if (ws.stopped) {
logger.warn("crypto API stopped during shutdown, ignoring error");
return {
type: OperationAttemptResultType.Error,
errorDetail: makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
{},
"Crypto API stopped during shutdown",
),
};
}
}
if (e instanceof TalerError) {
logger.warn("operation processed resulted in error");
logger.warn(`error was: ${j2s(e.errorDetail)}`);
maybeError = e.errorDetail;
await storeOperationError(ws, opId, maybeError!);
return {
type: OperationAttemptResultType.Error,
errorDetail: e.errorDetail,
};
} else if (e instanceof Error) {
// This is a bug, as we expect pending operations to always
// do their own error handling and only throw WALLET_PENDING_OPERATION_FAILED
// or return something.
logger.error(`Uncaught exception: ${e.message}`);
logger.error(`Stack: ${e.stack}`);
maybeError = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
{
stack: e.stack,
},
`unexpected exception (message: ${e.message})`,
);
await storeOperationError(ws, opId, maybeError);
return {
type: OperationAttemptResultType.Error,
errorDetail: maybeError,
};
} else {
logger.error("Uncaught exception, value is not even an error.");
maybeError = makeErrorDetail(
TalerErrorCode.WALLET_UNEXPECTED_EXCEPTION,
{},
`unexpected exception (not even an error)`,
);
await storeOperationError(ws, opId, maybeError);
return {
type: OperationAttemptResultType.Error,
errorDetail: maybeError,
};
}
}
}
export async function storeOperationFinished(
ws: InternalWalletState,
pendingTaskId: string,
): Promise {
await ws.db
.mktx((x) => [x.operationRetries])
.runReadWrite(async (tx) => {
await tx.operationRetries.delete(pendingTaskId);
});
}
export enum TombstoneTag {
DeleteWithdrawalGroup = "delete-withdrawal-group",
DeleteReserve = "delete-reserve",
DeletePayment = "delete-payment",
DeleteTip = "delete-tip",
DeleteRefreshGroup = "delete-refresh-group",
DeleteDepositGroup = "delete-deposit-group",
DeleteRefund = "delete-refund",
DeletePeerPullDebit = "delete-peer-pull-debit",
DeletePeerPushDebit = "delete-peer-push-debit",
DeletePeerPullCredit = "delete-peer-pull-credit",
DeletePeerPushCredit = "delete-peer-push-credit",
}
/**
* Create an event ID from the type and the primary key for the event.
*
* @deprecated use constructTransactionIdentifier instead
*/
export function makeTransactionId(
type: TransactionType,
...args: string[]
): TransactionIdStr {
return `txn:${type}:${args.map((x) => encodeURIComponent(x)).join(":")}`;
}
export function parseId(
idType: "txn" | "tmb" | "any",
txId: string,
): {
type: TransactionType;
args: string[];
} {
const txnParts = txId.split(":");
if (txnParts.length < 3) {
throw Error("id should have al least 3 parts separated by ':'");
}
const [prefix, typeStr, ...args] = txnParts;
const type = typeStr as TransactionType;
if (idType != "any" && prefix !== idType) {
throw Error(`id should start with ${idType}`);
}
if (args.length === 0) {
throw Error("id should have one or more arguments");
}
return { type, args };
}
/**
* Create an event ID from the type and the primary key for the event.
*/
export function makeTombstoneId(
type: TombstoneTag,
...args: string[]
): TombstoneIdStr {
return `tmb:${type}:${args.map((x) => encodeURIComponent(x)).join(":")}`;
}
export function getExchangeTosStatus(
exchangeDetails: ExchangeDetailsRecord,
): ExchangeTosStatus {
if (!exchangeDetails.tosAccepted) {
return ExchangeTosStatus.New;
}
if (exchangeDetails.tosAccepted?.etag == exchangeDetails.tosCurrentEtag) {
return ExchangeTosStatus.Accepted;
}
return ExchangeTosStatus.Changed;
}
export function makeExchangeListItem(
r: ExchangeRecord,
exchangeDetails: ExchangeDetailsRecord | undefined,
lastError: TalerErrorDetail | undefined,
): ExchangeListItem {
const lastUpdateErrorInfo: OperationErrorInfo | undefined = lastError
? {
error: lastError,
}
: undefined;
if (!exchangeDetails) {
return {
exchangeBaseUrl: r.baseUrl,
currency: undefined,
tosStatus: ExchangeTosStatus.Unknown,
paytoUris: [],
exchangeStatus: ExchangeEntryStatus.Unknown,
permanent: r.permanent,
ageRestrictionOptions: [],
lastUpdateErrorInfo,
};
}
let exchangeStatus;
exchangeStatus = ExchangeEntryStatus.Ok;
return {
exchangeBaseUrl: r.baseUrl,
currency: exchangeDetails.currency,
tosStatus: getExchangeTosStatus(exchangeDetails),
paytoUris: exchangeDetails.wireInfo.accounts.map((x) => x.payto_uri),
exchangeStatus,
permanent: r.permanent,
ageRestrictionOptions: exchangeDetails.ageMask
? AgeRestriction.getAgeGroupsFromMask(exchangeDetails.ageMask)
: [],
lastUpdateErrorInfo,
};
}
export interface LongpollResult {
ready: boolean;
}
export function runLongpollAsync(
ws: InternalWalletState,
retryTag: string,
reqFn: (ct: CancellationToken) => Promise,
): void {
const asyncFn = async () => {
if (ws.stopped) {
logger.trace("not long-polling reserve, wallet already stopped");
await storeOperationPending(ws, retryTag);
return;
}
const cts = CancellationToken.create();
let res: { ready: boolean } | undefined = undefined;
try {
ws.activeLongpoll[retryTag] = {
cancel: () => {
logger.trace("cancel of reserve longpoll requested");
cts.cancel();
},
};
res = await reqFn(cts.token);
} catch (e) {
await storeOperationError(ws, retryTag, getErrorDetailFromException(e));
return;
} finally {
delete ws.activeLongpoll[retryTag];
}
if (!res.ready) {
await storeOperationPending(ws, retryTag);
}
ws.workAvailable.trigger();
};
asyncFn();
}