hide internal wallet state, keep it internal to package

This commit is contained in:
Florian Dold 2021-06-17 21:06:45 +02:00
parent 954ed23911
commit 99550b0011
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
6 changed files with 252 additions and 125 deletions

View File

@ -33,8 +33,7 @@ import {
WALLET_EXCHANGE_PROTOCOL_VERSION,
WALLET_MERCHANT_PROTOCOL_VERSION,
runRetryLoop,
handleCoreApiRequest,
InternalWalletState,
Wallet,
} from "@gnu-taler/taler-wallet-core";
import fs from "fs";
@ -156,8 +155,8 @@ function sendAkonoMessage(ev: CoreApiEnvelope): void {
class AndroidWalletMessageHandler {
walletArgs: DefaultNodeWalletArgs | undefined;
maybeWallet: InternalWalletState | undefined;
wp = openPromise<InternalWalletState>();
maybeWallet: Wallet | undefined;
wp = openPromise<Wallet>();
httpLib = new NodeHttpLib();
/**
@ -180,8 +179,8 @@ class AndroidWalletMessageHandler {
const reinit = async () => {
const w = await getDefaultNodeWallet(this.walletArgs);
this.maybeWallet = w;
await handleCoreApiRequest(w, "initWallet", "akono-init", {});
runRetryLoop(w).catch((e) => {
await w.handleCoreApiRequest("initWallet", "akono-init", {});
w.runRetryLoop().catch((e) => {
console.error("Error during wallet retry loop", e);
});
this.wp.resolve(w);
@ -230,14 +229,14 @@ class AndroidWalletMessageHandler {
}
const wallet = await this.wp.promise;
wallet.stop();
this.wp = openPromise<InternalWalletState>();
this.wp = openPromise<Wallet>();
this.maybeWallet = undefined;
await reinit();
return wrapResponse({});
}
default: {
const wallet = await this.wp.promise;
return await handleCoreApiRequest(wallet, operation, id, args);
return await wallet.handleCoreApiRequest(operation, id, args);
}
}
}

View File

@ -51,7 +51,7 @@ import {
getClientFromWalletState,
WalletApiOperation,
WalletCoreApiClient,
InternalWalletState,
Wallet,
} from "@gnu-taler/taler-wallet-core";
// This module also serves as the entry point for the crypto
@ -172,10 +172,7 @@ type WalletCliArgsType = clk.GetArgType<typeof walletCli>;
async function withWallet<T>(
walletCliArgs: WalletCliArgsType,
f: (w: {
client: WalletCoreApiClient;
ws: InternalWalletState;
}) => Promise<T>,
f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>,
): Promise<T> {
const dbPath = walletCliArgs.wallet.walletDbFile ?? defaultWalletDbPath;
const myHttpLib = new NodeHttpLib();
@ -190,7 +187,7 @@ async function withWallet<T>(
try {
const w = {
ws: wallet,
client: await getClientFromWalletState(wallet),
client: wallet.client,
};
const ret = await f(w);
return ret;
@ -242,8 +239,7 @@ walletCli
console.error("Invalid JSON");
process.exit(1);
}
const resp = await handleCoreApiRequest(
wallet.ws,
const resp = await wallet.ws.handleCoreApiRequest(
args.api.operation,
"reqid-1",
requestJson,
@ -294,7 +290,7 @@ walletCli
.flag("forceNow", ["-f", "--force-now"])
.action(async (args) => {
await withWallet(args, async (wallet) => {
await runPending(wallet.ws, args.runPendingOpt.forceNow);
await wallet.ws.runPending(args.runPendingOpt.forceNow);
});
});
@ -318,7 +314,7 @@ walletCli
.maybeOption("maxRetries", ["--max-retries"], clk.INT)
.action(async (args) => {
await withWallet(args, async (wallet) => {
await runUntilDone(wallet.ws, {
await wallet.ws.runUntilDone({
maxRetries: args.finishPendingOpt.maxRetries,
});
wallet.ws.stop();
@ -607,7 +603,7 @@ depositCli
},
);
console.log(`Created deposit ${resp.depositGroupId}`);
await runPending(wallet.ws);
await wallet.ws.runPending();
});
});

View File

@ -40,8 +40,6 @@ import {
import { DbAccess, GetReadOnlyAccess } from "./util/query.js";
import { TimerGroup } from "./util/timer.js";
type NotificationListener = (n: WalletNotification) => void;
const logger = new Logger("state.ts");
export const EXCHANGE_COINS_LOCK = "exchange-coins-lock";
@ -79,114 +77,51 @@ export interface ExchangeOperations {
}>;
}
export type NotificationListener = (n: WalletNotification) => void;
/**
* Internal state of the wallet.
* Internal, shard wallet state that is used by the implementation
* of wallet operations.
*
* FIXME: This should not be exported anywhere from the taler-wallet-core package,
* as it's an opaque implementation detail.
*/
export class InternalWalletState implements InternalWalletState {
memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> = new AsyncOpMemoSingle();
memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle();
memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoProcessDeposit: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
export interface InternalWalletState {
memoProcessReserve: AsyncOpMemoMap<void>;
memoMakePlanchet: AsyncOpMemoMap<void>;
memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse>;
memoGetBalance: AsyncOpMemoSingle<BalancesResponse>;
memoProcessRefresh: AsyncOpMemoMap<void>;
memoProcessRecoup: AsyncOpMemoMap<void>;
memoProcessDeposit: AsyncOpMemoMap<void>;
cryptoApi: CryptoApi;
timerGroup: TimerGroup = new TimerGroup();
latch = new AsyncCondition();
stopped = false;
memoRunRetryLoop = new AsyncOpMemoSingle<void>();
timerGroup: TimerGroup;
latch: AsyncCondition;
stopped: boolean;
memoRunRetryLoop: AsyncOpMemoSingle<void>;
listeners: NotificationListener[] = [];
listeners: NotificationListener[];
initCalled: boolean = false;
initCalled: boolean;
// FIXME: This should be done in wallet.ts, here we should only give declarations
exchangeOps: ExchangeOperations = {
getExchangeDetails,
getExchangeTrust,
updateExchangeFromUrl,
};
exchangeOps: ExchangeOperations;
/**
* Promises that are waiting for a particular resource.
*/
private resourceWaiters: Record<string, OpenedPromise<void>[]> = {};
db: DbAccess<typeof WalletStoresV1>;
http: HttpRequestLibrary;
/**
* Resources that are currently locked.
*/
private resourceLocks: Set<string> = new Set();
notify(n: WalletNotification): void;
constructor(
// FIXME: Make this a getter and make
// the actual value nullable.
// Check if we are in a DB migration / garbage collection
// and throw an error in that case.
public db: DbAccess<typeof WalletStoresV1>,
public http: HttpRequestLibrary,
cryptoWorkerFactory: CryptoWorkerFactory,
) {
this.cryptoApi = new CryptoApi(cryptoWorkerFactory);
}
notify(n: WalletNotification): void {
logger.trace("Notification", n);
for (const l of this.listeners) {
const nc = JSON.parse(JSON.stringify(n));
setTimeout(() => {
l(nc);
}, 0);
}
}
addNotificationListener(f: (n: WalletNotification) => void): void {
this.listeners.push(f);
}
addNotificationListener(f: (n: WalletNotification) => void): void;
/**
* Stop ongoing processing.
*/
stop(): void {
this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoApi.stop();
}
stop(): void;
/**
* Run an async function after acquiring a list of locks, identified
* by string tokens.
*/
async runSequentialized<T>(tokens: string[], f: () => Promise<T>) {
// Make sure locks are always acquired in the same order
tokens = [...tokens].sort();
for (const token of tokens) {
if (this.resourceLocks.has(token)) {
const p = openPromise<void>();
let waitList = this.resourceWaiters[token];
if (!waitList) {
waitList = this.resourceWaiters[token] = [];
}
waitList.push(p);
await p.promise;
}
this.resourceLocks.add(token);
}
try {
logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`);
const result = await f();
logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`);
return result;
} finally {
for (const token of tokens) {
this.resourceLocks.delete(token);
let waiter = (this.resourceWaiters[token] ?? []).shift();
if (waiter) {
waiter.resolve();
}
}
}
}
runSequentialized<T>(tokens: string[], f: () => Promise<T>): Promise<T>;
}

View File

@ -36,6 +36,7 @@ import { SynchronousCryptoWorkerFactory } from "../crypto/workers/synchronousWor
import type { IDBFactory } from "@gnu-taler/idb-bridge";
import { WalletNotification } from "@gnu-taler/taler-util";
import { InternalWalletState } from "../common.js";
import { Wallet } from "../wallet.js";
const logger = new Logger("headless/helpers.ts");
@ -93,7 +94,7 @@ function makeId(length: number): string {
*/
export async function getDefaultNodeWallet(
args: DefaultNodeWalletArgs = {},
): Promise<InternalWalletState> {
): Promise<Wallet> {
BridgeIDBFactory.enableTracing = false;
const myBackend = new MemoryBackend();
myBackend.enableTracing = false;
@ -172,7 +173,7 @@ export async function getDefaultNodeWallet(
workerFactory = new SynchronousCryptoWorkerFactory();
}
const w = new InternalWalletState(myDb, myHttpLib, workerFactory);
const w = await Wallet.create(myDb, myHttpLib, workerFactory);
if (args.notifyHandler) {
w.addNotificationListener(args.notifyHandler);

View File

@ -23,6 +23,7 @@
* Imports.
*/
import {
BalancesResponse,
codecForAny,
codecForDeleteTransactionRequest,
codecForRetryTransactionRequest,
@ -32,9 +33,11 @@ import {
getDurationRemaining,
isTimestampExpired,
j2s,
PreparePayResultType,
TalerErrorCode,
Timestamp,
timestampMin,
WalletNotification,
} from "@gnu-taler/taler-util";
import {
addBackupProvider,
@ -59,6 +62,7 @@ import {
import {
acceptExchangeTermsOfService,
getExchangeDetails,
getExchangeTrust,
updateExchangeFromUrl,
} from "./operations/exchanges.js";
import {
@ -85,7 +89,11 @@ import {
getFundingPaytoUris,
processReserve,
} from "./operations/reserves.js";
import { InternalWalletState } from "./common.js";
import {
ExchangeOperations,
InternalWalletState,
NotificationListener,
} from "./common.js";
import {
runIntegrationTest,
testPay,
@ -106,16 +114,16 @@ import {
AuditorTrustRecord,
CoinSourceType,
ReserveRecordStatus,
WalletStoresV1,
} from "./db.js";
import { NotificationType } from "@gnu-taler/taler-util";
import {
PendingOperationInfo,
PendingOperationsResponse,
PendingOperationType,
} from "./pending-types.js";
import { CoinDumpJson } from "@gnu-taler/taler-util";
import {
codecForTransactionsRequest,
} from "@gnu-taler/taler-util";
import { codecForTransactionsRequest } from "@gnu-taler/taler-util";
import {
AcceptManualWithdrawalResult,
AcceptWithdrawalResponse,
@ -151,6 +159,16 @@ import { assertUnreachable } from "./util/assertUnreachable.js";
import { Logger } from "@gnu-taler/taler-util";
import { setWalletDeviceId } from "./operations/backup/state.js";
import { WalletCoreApiClient } from "./wallet-api-types.js";
import { AsyncOpMemoMap, AsyncOpMemoSingle } from "./util/asyncMemo.js";
import { CryptoApi, CryptoWorkerFactory } from "./crypto/workers/cryptoApi.js";
import { TimerGroup } from "./util/timer.js";
import {
AsyncCondition,
OpenedPromise,
openPromise,
} from "./util/promiseUtils.js";
import { DbAccess } from "./util/query.js";
import { HttpRequestLibrary } from "./util/http.js";
const builtinAuditors: AuditorTrustRecord[] = [
{
@ -618,7 +636,6 @@ async function dumpCoins(ws: InternalWalletState): Promise<CoinDumpJson> {
return coinsJson;
}
/**
* Get an API client from an internal wallet state object.
*/
@ -936,3 +953,178 @@ export async function handleCoreApiRequest(
}
}
}
/**
* Public handle to a running wallet.
*/
export class Wallet {
private ws: InternalWalletState;
private _client: WalletCoreApiClient;
private constructor(
db: DbAccess<typeof WalletStoresV1>,
http: HttpRequestLibrary,
cryptoWorkerFactory: CryptoWorkerFactory,
) {
this.ws = new InternalWalletStateImpl(db, http, cryptoWorkerFactory);
}
get client() {
return this._client;
}
static async create(
db: DbAccess<typeof WalletStoresV1>,
http: HttpRequestLibrary,
cryptoWorkerFactory: CryptoWorkerFactory,
): Promise<Wallet> {
const w = new Wallet(db, http, cryptoWorkerFactory);
w._client = await getClientFromWalletState(w.ws);
return w;
}
addNotificationListener(f: (n: WalletNotification) => void): void {
return this.ws.addNotificationListener(f);
}
stop(): void {
this.ws.stop();
}
runRetryLoop(): Promise<void> {
return runRetryLoop(this.ws);
}
runPending(forceNow: boolean = false) {
return runPending(this.ws, forceNow);
}
runUntilDone(
req: {
maxRetries?: number;
} = {},
) {
return runUntilDone(this.ws, req);
}
handleCoreApiRequest(
operation: string,
id: string,
payload: unknown,
): Promise<CoreApiResponse> {
return handleCoreApiRequest(this.ws, operation, id, payload);
}
}
/**
* Internal state of the wallet.
*
* This ties together all the operation implementations.
*/
class InternalWalletStateImpl implements InternalWalletState {
memoProcessReserve: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoMakePlanchet: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoGetPending: AsyncOpMemoSingle<PendingOperationsResponse> = new AsyncOpMemoSingle();
memoGetBalance: AsyncOpMemoSingle<BalancesResponse> = new AsyncOpMemoSingle();
memoProcessRefresh: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoProcessRecoup: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
memoProcessDeposit: AsyncOpMemoMap<void> = new AsyncOpMemoMap();
cryptoApi: CryptoApi;
timerGroup: TimerGroup = new TimerGroup();
latch = new AsyncCondition();
stopped = false;
memoRunRetryLoop = new AsyncOpMemoSingle<void>();
listeners: NotificationListener[] = [];
initCalled: boolean = false;
exchangeOps: ExchangeOperations = {
getExchangeDetails,
getExchangeTrust,
updateExchangeFromUrl,
};
/**
* Promises that are waiting for a particular resource.
*/
private resourceWaiters: Record<string, OpenedPromise<void>[]> = {};
/**
* Resources that are currently locked.
*/
private resourceLocks: Set<string> = new Set();
constructor(
// FIXME: Make this a getter and make
// the actual value nullable.
// Check if we are in a DB migration / garbage collection
// and throw an error in that case.
public db: DbAccess<typeof WalletStoresV1>,
public http: HttpRequestLibrary,
cryptoWorkerFactory: CryptoWorkerFactory,
) {
this.cryptoApi = new CryptoApi(cryptoWorkerFactory);
}
notify(n: WalletNotification): void {
logger.trace("Notification", n);
for (const l of this.listeners) {
const nc = JSON.parse(JSON.stringify(n));
setTimeout(() => {
l(nc);
}, 0);
}
}
addNotificationListener(f: (n: WalletNotification) => void): void {
this.listeners.push(f);
}
/**
* Stop ongoing processing.
*/
stop(): void {
this.stopped = true;
this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoApi.stop();
}
/**
* Run an async function after acquiring a list of locks, identified
* by string tokens.
*/
async runSequentialized<T>(tokens: string[], f: () => Promise<T>) {
// Make sure locks are always acquired in the same order
tokens = [...tokens].sort();
for (const token of tokens) {
if (this.resourceLocks.has(token)) {
const p = openPromise<void>();
let waitList = this.resourceWaiters[token];
if (!waitList) {
waitList = this.resourceWaiters[token] = [];
}
waitList.push(p);
await p.promise;
}
this.resourceLocks.add(token);
}
try {
logger.trace(`begin exclusive execution on ${JSON.stringify(tokens)}`);
const result = await f();
logger.trace(`end exclusive execution on ${JSON.stringify(tokens)}`);
return result;
} finally {
for (const token of tokens) {
this.resourceLocks.delete(token);
let waiter = (this.resourceWaiters[token] ?? []).shift();
if (waiter) {
waiter.resolve();
}
}
}
}
}

View File

@ -37,6 +37,8 @@ import {
runRetryLoop,
handleNotifyReserve,
InternalWalletState,
Wallet,
WalletApiOperation,
} from "@gnu-taler/taler-wallet-core";
import {
classifyTalerUri,
@ -52,8 +54,10 @@ import { BrowserCryptoWorkerFactory } from "./browserCryptoWorkerFactory";
/**
* Currently active wallet instance. Might be unloaded and
* re-instantiated when the database is reset.
*
* FIXME: Maybe move the wallet reseting into the Wallet class?
*/
let currentWallet: InternalWalletState | undefined;
let currentWallet: Wallet | undefined;
let currentDatabase: DbAccess<typeof WalletStoresV1> | undefined;
@ -170,7 +174,7 @@ async function dispatch(
};
break;
}
r = await handleCoreApiRequest(w, req.operation, req.id, req.payload);
r = await w.handleCoreApiRequest(req.operation, req.id, req.payload);
break;
}
}
@ -256,7 +260,7 @@ async function reinitWallet(): Promise<void> {
}
const http = new BrowserHttpLib();
console.log("setting wallet");
const wallet = new InternalWalletState(
const wallet = await Wallet.create(
currentDatabase,
http,
new BrowserCryptoWorkerFactory(),
@ -270,7 +274,7 @@ async function reinitWallet(): Promise<void> {
}
}
});
runRetryLoop(wallet).catch((e) => {
wallet.runRetryLoop().catch((e) => {
console.log("error during wallet retry loop", e);
});
// Useful for debugging in the background page.
@ -360,7 +364,8 @@ function headerListener(
if (!w) {
return;
}
handleNotifyReserve(w);
// FIXME: Is this still useful?
// handleNotifyReserve(w);
});
break;
default:
@ -451,4 +456,3 @@ export async function wxMain(): Promise<void> {
setupHeaderListener();
});
}