diff options
author | Florian Dold <florian@dold.me> | 2023-01-25 18:49:00 +0100 |
---|---|---|
committer | Florian Dold <florian@dold.me> | 2023-01-25 18:49:00 +0100 |
commit | c57ba4c0cea133059ac30eae3c7e527886240059 (patch) | |
tree | 9028ccde2bbd990b00449b9bb14c77e2b4732dcc /packages/taler-wallet-cli/src/index.ts | |
parent | 3aa077e0975128b66ade8866ad8227e3666a1b13 (diff) |
wallet-cli: daemonized wallet MVP
Diffstat (limited to 'packages/taler-wallet-cli/src/index.ts')
-rw-r--r-- | packages/taler-wallet-cli/src/index.ts | 304 |
1 files changed, 286 insertions, 18 deletions
diff --git a/packages/taler-wallet-cli/src/index.ts b/packages/taler-wallet-cli/src/index.ts index 4f84bb6c4..14000aefd 100644 --- a/packages/taler-wallet-cli/src/index.ts +++ b/packages/taler-wallet-cli/src/index.ts @@ -24,6 +24,9 @@ import { clk, codecForList, codecForString, + CoreApiMessageEnvelope, + CoreApiRequestEnvelope, + CoreApiResponse, decodeCrock, encodeCrock, getRandomBytes, @@ -35,8 +38,15 @@ import { setDangerousTimetravel, setGlobalLogLevelFromString, TalerUriType, + WalletNotification, } from "@gnu-taler/taler-util"; -import type { TalerCryptoInterface } from "@gnu-taler/taler-wallet-core"; +import { + OpenedPromise, + openPromise, + TalerCryptoInterface, + TalerError, + WalletCoreResponseType, +} from "@gnu-taler/taler-wallet-core"; import { CryptoDispatcher, getDefaultNodeWallet, @@ -54,6 +64,7 @@ import { } from "@gnu-taler/taler-wallet-core"; import fs from "fs"; import os from "os"; +import { connectRpc, JsonMessage, runRpcServer } from "./rpc.js"; // This module also serves as the entry point for the crypto // thread worker, and thus must expose these two handlers. @@ -154,7 +165,10 @@ export const walletCli = clk help: "Command line interface for the GNU Taler wallet.", }) .maybeOption("walletDbFile", ["--wallet-db"], clk.STRING, { - help: "location of the wallet database file", + help: "Location of the wallet database file", + }) + .maybeOption("walletConnection", ["--wallet-connection"], clk.STRING, { + help: "Connect to an RPC wallet", }) .maybeOption("timetravel", ["--timetravel"], clk.INT, { help: "modify system time by given offset in microseconds", @@ -199,10 +213,33 @@ function checkEnvFlag(name: string): boolean { return false; } -async function withWallet<T>( +export interface WalletContext { + /** + * High-level client for making API requests to wallet-core. + */ + client: WalletCoreApiClient; + + /** + * Low-level interface for making API requests to wallet-core. + */ + makeCoreApiRequest( + operation: string, + payload: unknown, + ): Promise<CoreApiResponse>; + + /** + * Return a promise that resolves after the wallet has emitted a notification + * that meets the criteria of the "cond" predicate. + */ + waitForNotificationCond( + cond: (n: WalletNotification) => boolean, + ): Promise<void>; +} + +async function createLocalWallet( walletCliArgs: WalletCliArgsType, - f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>, -): Promise<T> { + notificationHandler?: (n: WalletNotification) => void, +): Promise<Wallet> { const dbPath = walletCliArgs.wallet.walletDbFile ?? defaultWalletDbPath; const myHttpLib = new NodeHttpLib(); if (walletCliArgs.wallet.noThrottle) { @@ -213,6 +250,9 @@ async function withWallet<T>( httpLib: myHttpLib, notifyHandler: (n) => { logger.info(`wallet notification: ${j2s(n)}`); + if (notificationHandler) { + notificationHandler(n); + } }, cryptoWorkerType: walletCliArgs.wallet.cryptoWorker as any, }); @@ -223,15 +263,10 @@ async function withWallet<T>( applyVerbose(walletCliArgs.wallet.verbose); try { - const w = { - ws: wallet, - client: wallet.client, - }; await wallet.handleCoreApiRequest("initWallet", "native-init", { skipDefaults: walletCliArgs.wallet.skipDefaults, }); - const ret = await f(w); - return ret; + return wallet; } catch (e) { const ed = getErrorDetailFromException(e); console.error("Operation failed: " + summarizeTalerErrorDetail(ed)); @@ -239,11 +274,189 @@ async function withWallet<T>( process.exit(1); } finally { logger.trace("operation with wallet finished, stopping"); - wallet.stop(); logger.trace("stopped wallet"); } } +export interface RemoteWallet { + /** + * Low-level interface for making API requests to wallet-core. + */ + makeCoreApiRequest( + operation: string, + payload: unknown, + ): Promise<CoreApiResponse>; + + /** + * Close the connection to the remote wallet. + */ + close(): void; +} + +async function createRemoteWallet( + notificationHandler?: (n: WalletNotification) => void, +): Promise<RemoteWallet> { + let nextRequestId = 1; + let requestMap: Map< + string, + { + promiseCapability: OpenedPromise<CoreApiResponse>; + } + > = new Map(); + + const ctx = await connectRpc<RemoteWallet>({ + socketFilename: "wallet-core.sock", + onEstablished(connection) { + const ctx: RemoteWallet = { + makeCoreApiRequest(operation, payload) { + const id = `req-${nextRequestId}`; + const req: CoreApiRequestEnvelope = { + operation, + id, + args: payload, + }; + const promiseCap = openPromise<CoreApiResponse>(); + requestMap.set(id, { + promiseCapability: promiseCap, + }); + connection.sendMessage(req as unknown as JsonMessage); + return promiseCap.promise; + }, + close() { + connection.close(); + }, + }; + return { + result: ctx, + onDisconnect() { + logger.info("remote wallet disconnected"); + }, + onMessage(m) { + // FIXME: use a codec for parsing the response envelope! + + logger.info(`got message from remote wallet: ${j2s(m)}`); + if (typeof m !== "object" || m == null) { + logger.warn("message from wallet not understood (wrong type)"); + return; + } + const type = (m as any).type; + if (type === "response" || type === "error") { + const id = (m as any).id; + if (typeof id !== "string") { + logger.warn( + "message from wallet not understood (no id in response)", + ); + return; + } + const h = requestMap.get(id); + if (!h) { + logger.warn(`no handler registered for response id ${id}`); + return; + } + h.promiseCapability.resolve(m as any); + } else if (type === "notification") { + logger.info("got notification"); + if (notificationHandler) { + notificationHandler((m as any).payload); + } + } else { + logger.warn("message from wallet not understood"); + } + }, + }; + }, + }); + return ctx; +} + +/** + * Get a high-level API client from a remove wallet. + */ +function getClientFromRemoteWallet(w: RemoteWallet): WalletCoreApiClient { + const client: WalletCoreApiClient = { + async call(op, payload): Promise<any> { + const res = await w.makeCoreApiRequest(op, payload); + switch (res.type) { + case "error": + throw TalerError.fromUncheckedDetail(res.error); + case "response": + return res.result; + } + }, + }; + return client; +} + +async function withWallet<T>( + walletCliArgs: WalletCliArgsType, + f: (ctx: WalletContext) => Promise<T>, +): Promise<T> { + // Bookkeeping for waiting on notification conditions + let nextCondIndex = 1; + const condMap: Map< + number, + { + condition: (n: WalletNotification) => boolean; + promiseCapability: OpenedPromise<void>; + } + > = new Map(); + function onNotification(n: WalletNotification) { + condMap.forEach((cond, condKey) => { + if (cond.condition(n)) { + cond.promiseCapability.resolve(); + } + }); + } + function waitForNotificationCond(cond: (n: WalletNotification) => boolean) { + const promCap = openPromise<void>(); + condMap.set(nextCondIndex++, { + condition: cond, + promiseCapability: promCap, + }); + return promCap.promise; + } + + if (walletCliArgs.wallet.walletConnection) { + logger.info("creating remote wallet"); + const w = await createRemoteWallet(onNotification); + const ctx: WalletContext = { + makeCoreApiRequest(operation, payload) { + return w.makeCoreApiRequest(operation, payload); + }, + client: getClientFromRemoteWallet(w), + waitForNotificationCond, + }; + const res = await f(ctx); + w.close(); + return res; + } else { + const w = await createLocalWallet(walletCliArgs, onNotification); + const ctx: WalletContext = { + client: w.client, + waitForNotificationCond, + makeCoreApiRequest(operation, payload) { + return w.handleCoreApiRequest(operation, "my-req", payload); + }, + }; + return await f(ctx); + } +} + +/** + * Run a function with a local wallet. + * + * Stops the wallet after the function is done. + */ +async function withLocalWallet<T>( + walletCliArgs: WalletCliArgsType, + f: (w: { client: WalletCoreApiClient; ws: Wallet }) => Promise<T>, +): Promise<T> { + const w = await createLocalWallet(walletCliArgs); + const res = await f({ client: w.client, ws: w }); + w.stop(); + return res; +} + walletCli .subcommand("balance", "balance", { help: "Show wallet balance." }) .flag("json", ["--json"], { @@ -277,9 +490,8 @@ walletCli process.exit(1); } try { - const resp = await wallet.ws.handleCoreApiRequest( + const resp = await wallet.makeCoreApiRequest( args.api.operation, - "reqid-1", requestJson, ); console.log(JSON.stringify(resp, undefined, 2)); @@ -338,7 +550,7 @@ transactionsCli help: "Identifier of the transaction to delete", }) .flag("force", ["--force"], { - help: "Force aborting the transaction. Might lose money." + help: "Force aborting the transaction. Might lose money.", }) .action(async (args) => { await withWallet(args, async (wallet) => { @@ -383,7 +595,7 @@ walletCli .maybeOption("maxRetries", ["--max-retries"], clk.INT) .flag("failOnMaxRetries", ["--fail-on-max-retries"]) .action(async (args) => { - await withWallet(args, async (wallet) => { + await withLocalWallet(args, async (wallet) => { logger.info("running until pending operations are finished"); const resp = await wallet.ws.runTaskLoop({ maxRetries: args.finishPendingOpt.maxRetries, @@ -802,7 +1014,7 @@ depositCli .requiredArgument("amount", clk.STRING) .requiredArgument("targetPayto", clk.STRING) .action(async (args) => { - await withWallet(args, async (wallet) => { + await withLocalWallet(args, async (wallet) => { const resp = await wallet.client.call( WalletApiOperation.CreateDepositGroup, { @@ -815,6 +1027,7 @@ depositCli }); }); +// FIXME: should probably be removed depositCli .subcommand("trackDepositArgs", "track") .requiredArgument("depositGroupId", clk.STRING) @@ -835,6 +1048,61 @@ const advancedCli = walletCli.subcommand("advancedArgs", "advanced", { }); advancedCli + .subcommand("serve", "serve", { + help: "Serve the wallet API via a unix domain socket.", + }) + .action(async (args) => { + const w = await createLocalWallet(args); + w.runTaskLoop() + .then((res) => { + logger.warn("task loop exited unexpectedly"); + }) + .catch((e) => { + logger.error(`error in task loop: ${e}`); + }); + let nextClientId = 1; + const notifyHandlers = new Map<number, (n: WalletNotification) => void>(); + w.addNotificationListener((n) => { + notifyHandlers.forEach((v, k) => { + v(n); + }); + }); + await runRpcServer({ + socketFilename: "wallet-core.sock", + onConnect(client) { + logger.info("connected"); + const clientId = nextClientId++; + notifyHandlers.set(clientId, (n: WalletNotification) => { + client.sendResponse({ + type: "notification", + payload: n as unknown as JsonMessage, + }); + }); + return { + onDisconnect() { + notifyHandlers.delete(clientId); + logger.info("disconnected"); + }, + onMessage(msg) { + logger.info(`message: ${j2s(msg)}`); + const op = (msg as any).operation; + const id = (msg as any).id; + const payload = (msg as any).args; + w.handleCoreApiRequest(op, id, payload) + .then((resp) => { + logger.info("sending response"); + client.sendResponse(resp as unknown as JsonMessage); + }) + .catch((e) => { + logger.error(`unexpected error: ${e}`); + }); + }, + }; + }, + }); + }); + +advancedCli .subcommand("init", "init", { help: "Initialize the wallet (with DB) and exit.", }) @@ -848,7 +1116,7 @@ advancedCli }) .flag("forceNow", ["-f", "--force-now"]) .action(async (args) => { - await withWallet(args, async (wallet) => { + await withLocalWallet(args, async (wallet) => { await wallet.ws.runPending(args.runPendingOpt.forceNow); }); }); |