wallet-core: put refresh sessions into own store

This commit is contained in:
Florian Dold 2023-09-08 12:26:58 +02:00
parent 132ece8e53
commit 50b0b324ae
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
4 changed files with 100 additions and 60 deletions

View File

@ -107,6 +107,8 @@ import { RetryInfo, TaskIdentifiers } from "./operations/common.js";
full contract terms from the DB quite often. full contract terms from the DB quite often.
Instead, we should probably extract what we need into a separate object Instead, we should probably extract what we need into a separate object
store. store.
- More object stores should have an "id" primary key,
as this makes referencing less expensive.
*/ */
/** /**
@ -943,9 +945,6 @@ export interface RefreshReasonDetails {
export interface RefreshGroupRecord { export interface RefreshGroupRecord {
operationStatus: RefreshOperationStatus; operationStatus: RefreshOperationStatus;
// FIXME: Put this into a different object store?
lastErrorPerCoin: { [coinIndex: number]: TalerErrorDetail };
/** /**
* Unique, randomly generated identifier for this group of * Unique, randomly generated identifier for this group of
* refresh operations. * refresh operations.
@ -969,13 +968,9 @@ export interface RefreshGroupRecord {
oldCoinPubs: string[]; oldCoinPubs: string[];
// FIXME: Should this go into a separate
// object store for faster updates?
refreshSessionPerCoin: (RefreshSessionRecord | undefined)[];
inputPerCoin: AmountString[]; inputPerCoin: AmountString[];
estimatedOutputPerCoin: AmountString[]; expectedOutputPerCoin: AmountString[];
/** /**
* Flag for each coin whether refreshing finished. * Flag for each coin whether refreshing finished.
@ -997,6 +992,13 @@ export interface RefreshGroupRecord {
* Ongoing refresh * Ongoing refresh
*/ */
export interface RefreshSessionRecord { export interface RefreshSessionRecord {
refreshGroupId: string;
/**
* Index of the coin in the refresh group.
*/
coinIndex: number;
/** /**
* 512-bit secret that can be used to derive * 512-bit secret that can be used to derive
* the other cryptographic material for the refresh session. * the other cryptographic material for the refresh session.
@ -1021,6 +1023,8 @@ export interface RefreshSessionRecord {
* The no-reveal-index after we've done the melting. * The no-reveal-index after we've done the melting.
*/ */
norevealIndex?: number; norevealIndex?: number;
lastError?: TalerErrorDetail;
} }
export enum RefundReason { export enum RefundReason {
@ -2372,6 +2376,13 @@ export const WalletStoresV1 = {
byStatus: describeIndex("byStatus", "operationStatus"), byStatus: describeIndex("byStatus", "operationStatus"),
}, },
), ),
refreshSessions: describeStore(
"refreshSessions",
describeContents<RefreshSessionRecord>({
keyPath: ["refreshGroupId", "coinIndex"],
}),
{},
),
recoupGroups: describeStore( recoupGroups: describeStore(
"recoupGroups", "recoupGroups",
describeContents<RecoupGroupRecord>({ describeContents<RecoupGroupRecord>({

View File

@ -95,14 +95,7 @@ function computeRefreshGroupAvailableAmount(r: RefreshGroupRecord): AmountJson {
return available; return available;
} }
for (let i = 0; i < r.oldCoinPubs.length; i++) { for (let i = 0; i < r.oldCoinPubs.length; i++) {
const session = r.refreshSessionPerCoin[i]; available = Amounts.add(available, r.expectedOutputPerCoin[i]).amount;
if (session) {
// We are always assuming the refresh will succeed, thus we
// report the output as available balance.
available = Amounts.add(available, session.amountRefreshOutput).amount;
} else {
available = Amounts.add(available, r.estimatedOutputPerCoin[i]).amount;
}
} }
return available; return available;
} }

View File

@ -76,7 +76,11 @@ import {
RefreshReasonDetails, RefreshReasonDetails,
WalletStoresV1, WalletStoresV1,
} from "../db.js"; } from "../db.js";
import { isWithdrawableDenom, PendingTaskType } from "../index.js"; import {
isWithdrawableDenom,
PendingTaskType,
RefreshSessionRecord,
} from "../index.js";
import { import {
EXCHANGE_COINS_LOCK, EXCHANGE_COINS_LOCK,
InternalWalletState, InternalWalletState,
@ -170,18 +174,23 @@ function updateGroupStatus(rg: RefreshGroupRecord): { final: boolean } {
/** /**
* Create a refresh session for one particular coin inside a refresh group. * Create a refresh session for one particular coin inside a refresh group.
*
* If the session already exists, return the existing one.
*
* If the session doesn't need to be created (refresh group gone or session already
* finished), return undefined.
*/ */
async function refreshCreateSession( async function provideRefreshSession(
ws: InternalWalletState, ws: InternalWalletState,
refreshGroupId: string, refreshGroupId: string,
coinIndex: number, coinIndex: number,
): Promise<void> { ): Promise<RefreshSessionRecord | undefined> {
logger.trace( logger.trace(
`creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`, `creating refresh session for coin ${coinIndex} in refresh group ${refreshGroupId}`,
); );
const d = await ws.db const d = await ws.db
.mktx((x) => [x.refreshGroups, x.coins]) .mktx((x) => [x.refreshGroups, x.coins, x.refreshSessions])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId); const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
if (!refreshGroup) { if (!refreshGroup) {
@ -192,21 +201,24 @@ async function refreshCreateSession(
) { ) {
return; return;
} }
const existingRefreshSession = const existingRefreshSession = await tx.refreshSessions.get([
refreshGroup.refreshSessionPerCoin[coinIndex]; refreshGroupId,
if (existingRefreshSession) { coinIndex,
return; ]);
}
const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex]; const oldCoinPub = refreshGroup.oldCoinPubs[coinIndex];
const coin = await tx.coins.get(oldCoinPub); const coin = await tx.coins.get(oldCoinPub);
if (!coin) { if (!coin) {
throw Error("Can't refresh, coin not found"); throw Error("Can't refresh, coin not found");
} }
return { refreshGroup, coin }; return { refreshGroup, coin, existingRefreshSession };
}); });
if (!d) { if (!d) {
return; return undefined;
}
if (d.existingRefreshSession) {
return d.existingRefreshSession;
} }
const { refreshGroup, coin } = d; const { refreshGroup, coin } = d;
@ -288,17 +300,23 @@ async function refreshCreateSession(
const sessionSecretSeed = encodeCrock(getRandomBytes(64)); const sessionSecretSeed = encodeCrock(getRandomBytes(64));
// Store refresh session for this coin in the database. // Store refresh session for this coin in the database.
await ws.db const newSession = await ws.db
.mktx((x) => [x.refreshGroups, x.coins]) .mktx((x) => [x.refreshGroups, x.coins, x.refreshSessions])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId); const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) { if (!rg) {
return; return;
} }
if (rg.refreshSessionPerCoin[coinIndex]) { const existingSession = await tx.refreshSessions.get([
refreshGroupId,
coinIndex,
]);
if (existingSession) {
return; return;
} }
rg.refreshSessionPerCoin[coinIndex] = { const newSession: RefreshSessionRecord = {
coinIndex,
refreshGroupId,
norevealIndex: undefined, norevealIndex: undefined,
sessionSecretSeed: sessionSecretSeed, sessionSecretSeed: sessionSecretSeed,
newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({ newDenoms: newCoinDenoms.selectedDenoms.map((x) => ({
@ -307,11 +325,13 @@ async function refreshCreateSession(
})), })),
amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue), amountRefreshOutput: Amounts.stringify(newCoinDenoms.totalCoinValue),
}; };
await tx.refreshGroups.put(rg); await tx.refreshSessions.put(newSession);
return newSession;
}); });
logger.trace( logger.trace(
`created refresh session for coin #${coinIndex} in ${refreshGroupId}`, `created refresh session for coin #${coinIndex} in ${refreshGroupId}`,
); );
return newSession;
} }
function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration { function getRefreshRequestTimeout(rg: RefreshGroupRecord): Duration {
@ -326,13 +346,16 @@ async function refreshMelt(
coinIndex: number, coinIndex: number,
): Promise<void> { ): Promise<void> {
const d = await ws.db const d = await ws.db
.mktx((x) => [x.refreshGroups, x.coins, x.denominations]) .mktx((x) => [x.refreshGroups, x.refreshSessions, x.coins, x.denominations])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId); const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
if (!refreshGroup) { if (!refreshGroup) {
return; return;
} }
const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; const refreshSession = await tx.refreshSessions.get([
refreshGroupId,
coinIndex,
]);
if (!refreshSession) { if (!refreshSession) {
return; return;
} }
@ -442,7 +465,12 @@ async function refreshMelt(
if (resp.status === HttpStatusCode.NotFound) { if (resp.status === HttpStatusCode.NotFound) {
const errDetails = await readUnexpectedResponseDetails(resp); const errDetails = await readUnexpectedResponseDetails(resp);
const transitionInfo = await ws.db const transitionInfo = await ws.db
.mktx((x) => [x.refreshGroups, x.coins, x.coinAvailability]) .mktx((x) => [
x.refreshGroups,
x.refreshSessions,
x.coins,
x.coinAvailability,
])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId); const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) { if (!rg) {
@ -456,12 +484,22 @@ async function refreshMelt(
} }
const oldTxState = computeRefreshTransactionState(rg); const oldTxState = computeRefreshTransactionState(rg);
rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed; rg.statusPerCoin[coinIndex] = RefreshCoinStatus.Failed;
rg.lastErrorPerCoin[coinIndex] = errDetails; const refreshSession = await tx.refreshSessions.get([
refreshGroupId,
coinIndex,
]);
if (!refreshSession) {
throw Error(
"db invariant failed: missing refresh session in database",
);
}
refreshSession.lastError = errDetails;
const updateRes = updateGroupStatus(rg); const updateRes = updateGroupStatus(rg);
if (updateRes.final) { if (updateRes.final) {
await makeCoinsVisible(ws, tx, transactionId); await makeCoinsVisible(ws, tx, transactionId);
} }
await tx.refreshGroups.put(rg); await tx.refreshGroups.put(rg);
await tx.refreshSessions.put(refreshSession);
const newTxState = computeRefreshTransactionState(rg); const newTxState = computeRefreshTransactionState(rg);
return { return {
oldTxState, oldTxState,
@ -493,7 +531,7 @@ async function refreshMelt(
refreshSession.norevealIndex = norevealIndex; refreshSession.norevealIndex = norevealIndex;
await ws.db await ws.db
.mktx((x) => [x.refreshGroups]) .mktx((x) => [x.refreshGroups, x.refreshSessions])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId); const rg = await tx.refreshGroups.get(refreshGroupId);
if (!rg) { if (!rg) {
@ -502,7 +540,7 @@ async function refreshMelt(
if (rg.timestampFinished) { if (rg.timestampFinished) {
return; return;
} }
const rs = rg.refreshSessionPerCoin[coinIndex]; const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
if (!rs) { if (!rs) {
return; return;
} }
@ -510,7 +548,7 @@ async function refreshMelt(
return; return;
} }
rs.norevealIndex = norevealIndex; rs.norevealIndex = norevealIndex;
await tx.refreshGroups.put(rg); await tx.refreshSessions.put(rs);
}); });
} }
@ -581,13 +619,16 @@ async function refreshReveal(
`doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`, `doing refresh reveal for ${refreshGroupId} (old coin ${coinIndex})`,
); );
const d = await ws.db const d = await ws.db
.mktx((x) => [x.refreshGroups, x.coins, x.denominations]) .mktx((x) => [x.refreshGroups, x.refreshSessions, x.coins, x.denominations])
.runReadOnly(async (tx) => { .runReadOnly(async (tx) => {
const refreshGroup = await tx.refreshGroups.get(refreshGroupId); const refreshGroup = await tx.refreshGroups.get(refreshGroupId);
if (!refreshGroup) { if (!refreshGroup) {
return; return;
} }
const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex]; const refreshSession = await tx.refreshSessions.get([
refreshGroupId,
coinIndex,
]);
if (!refreshSession) { if (!refreshSession) {
return; return;
} }
@ -755,6 +796,7 @@ async function refreshReveal(
x.denominations, x.denominations,
x.coinAvailability, x.coinAvailability,
x.refreshGroups, x.refreshGroups,
x.refreshSessions,
]) ])
.runReadWrite(async (tx) => { .runReadWrite(async (tx) => {
const rg = await tx.refreshGroups.get(refreshGroupId); const rg = await tx.refreshGroups.get(refreshGroupId);
@ -762,7 +804,7 @@ async function refreshReveal(
logger.warn("no refresh session found"); logger.warn("no refresh session found");
return; return;
} }
const rs = rg.refreshSessionPerCoin[coinIndex]; const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
if (!rs) { if (!rs) {
return; return;
} }
@ -858,10 +900,15 @@ async function processRefreshSession(
logger.trace( logger.trace(
`processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`, `processing refresh session for coin ${coinIndex} of group ${refreshGroupId}`,
); );
let refreshGroup = await ws.db let { refreshGroup, refreshSession } = await ws.db
.mktx((x) => [x.refreshGroups]) .mktx((x) => [x.refreshGroups, x.refreshSessions])
.runReadOnly(async (tx) => { .runReadOnly(async (tx) => {
return tx.refreshGroups.get(refreshGroupId); const rg = await tx.refreshGroups.get(refreshGroupId);
const rs = await tx.refreshSessions.get([refreshGroupId, coinIndex]);
return {
refreshGroup: rg,
refreshSession: rs,
};
}); });
if (!refreshGroup) { if (!refreshGroup) {
return; return;
@ -869,18 +916,9 @@ async function processRefreshSession(
if (refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished) { if (refreshGroup.statusPerCoin[coinIndex] === RefreshCoinStatus.Finished) {
return; return;
} }
if (!refreshGroup.refreshSessionPerCoin[coinIndex]) { if (!refreshSession) {
await refreshCreateSession(ws, refreshGroupId, coinIndex); refreshSession = await provideRefreshSession(ws, refreshGroupId, coinIndex);
refreshGroup = await ws.db
.mktx((x) => [x.refreshGroups])
.runReadOnly(async (tx) => {
return tx.refreshGroups.get(refreshGroupId);
});
if (!refreshGroup) {
return;
} }
}
const refreshSession = refreshGroup.refreshSessionPerCoin[coinIndex];
if (!refreshSession) { if (!refreshSession) {
if (refreshGroup.statusPerCoin[coinIndex] !== RefreshCoinStatus.Finished) { if (refreshGroup.statusPerCoin[coinIndex] !== RefreshCoinStatus.Finished) {
throw Error( throw Error(
@ -1058,13 +1096,11 @@ export async function createRefreshGroup(
timestampFinished: undefined, timestampFinished: undefined,
statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending), statusPerCoin: oldCoinPubs.map(() => RefreshCoinStatus.Pending),
oldCoinPubs: oldCoinPubs.map((x) => x.coinPub), oldCoinPubs: oldCoinPubs.map((x) => x.coinPub),
lastErrorPerCoin: {},
reasonDetails, reasonDetails,
reason, reason,
refreshGroupId, refreshGroupId,
refreshSessionPerCoin: oldCoinPubs.map(() => undefined),
inputPerCoin: oldCoinPubs.map((x) => x.amount), inputPerCoin: oldCoinPubs.map((x) => x.amount),
estimatedOutputPerCoin: estimatedOutputPerCoin.map((x) => expectedOutputPerCoin: estimatedOutputPerCoin.map((x) =>
Amounts.stringify(x), Amounts.stringify(x),
), ),
timestampCreated: TalerPreciseTimestamp.now(), timestampCreated: TalerPreciseTimestamp.now(),

View File

@ -760,7 +760,7 @@ function buildTransactionForRefresh(
).amount; ).amount;
const outputAmount = Amounts.sumOrZero( const outputAmount = Amounts.sumOrZero(
refreshGroupRecord.currency, refreshGroupRecord.currency,
refreshGroupRecord.estimatedOutputPerCoin, refreshGroupRecord.expectedOutputPerCoin,
).amount; ).amount;
return { return {
type: TransactionType.Refresh, type: TransactionType.Refresh,