diff --git a/packages/taler-util/src/twrpc-impl.missing.ts b/packages/taler-util/src/twrpc-impl.missing.ts new file mode 100644 index 000000000..d9ed37815 --- /dev/null +++ b/packages/taler-util/src/twrpc-impl.missing.ts @@ -0,0 +1,17 @@ +/* + This file is part of GNU Taler + (C) 2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +// Not implemented. diff --git a/packages/taler-util/src/twrpc-impl.node.ts b/packages/taler-util/src/twrpc-impl.node.ts new file mode 100644 index 000000000..52ab65b73 --- /dev/null +++ b/packages/taler-util/src/twrpc-impl.node.ts @@ -0,0 +1,222 @@ +/* + This file is part of GNU Taler + (C) 2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +import * as net from "node:net"; +import * as fs from "node:fs"; +import { Logger } from "./logging.js"; +import { bytesToString, typedArrayConcat } from "./taler-crypto.js"; +import type { RpcConnectArgs, RpcServerArgs } from "./twrpc.js"; + +interface ReadLinewiseArgs { + onLine(lineData: Uint8Array): void; + sock: net.Socket; +} + +const logger = new Logger("twrpc-impl.node.ts"); + +function readStreamLinewise(args: ReadLinewiseArgs): void { + let chunks: Uint8Array[] = []; + args.sock.on("data", (buf: Uint8Array) => { + logger.info(`received ${buf.length} bytes`); + // Process all newlines in the newly received buffer + while (1) { + const newlineIdx = buf.indexOf("\n".charCodeAt(0)); + if (newlineIdx >= 0) { + let left = buf.subarray(0, newlineIdx + 1); + let right = buf.subarray(newlineIdx + 1); + chunks.push(left); + const line = typedArrayConcat(chunks); + args.onLine(line); + chunks = []; + buf = right; + } else { + chunks.push(buf); + break; + } + } + }); +} + +export async function connectRpc(args: RpcConnectArgs): Promise { + let sockFilename = args.socketFilename; + return new Promise((resolve, reject) => { + const client = net.createConnection(sockFilename); + client.on("connect", () => { + let parsingBody: string | undefined = undefined; + let bodyChunks: string[] = []; + + logger.info("connected!"); + client.write("%hello-from-client\n"); + const res = args.onEstablished({ + sendMessage(m) { + client.write("%request\n"); + client.write(JSON.stringify(m)); + client.write("\n"); + client.write("%end\n"); + }, + close() { + client.destroy(); + }, + }); + readStreamLinewise({ + sock: client, + onLine(line) { + const lineStr = bytesToString(line); + logger.info(`got line from server: ${lineStr}`); + // Are we currently parsing the body of a request? + if (!parsingBody) { + const strippedLine = lineStr.trim(); + if (strippedLine == "%message") { + logger.info("got message start"); + parsingBody = "message"; + } else if (strippedLine == "%hello-from-server") { + logger.info("got hello from server"); + } else if (strippedLine.startsWith("%error:")) { + logger.info("got error from server, disconnecting"); + client.end(); + res.onDisconnect(); + } else { + logger.info("got unknown request"); + client.write("%error: invalid message\n"); + client.end(); + } + } else if (parsingBody == "message") { + const strippedLine = lineStr.trim(); + if (strippedLine == "%end") { + logger.info("finished request"); + let req = bodyChunks.join(""); + let reqJson: any = undefined; + try { + reqJson = JSON.parse(req); + } catch (e) { + logger.warn("JSON request was invalid"); + } + if (reqJson !== undefined) { + logger.info(`request: ${req}`); + res.onMessage(reqJson); + } else { + client.write("%error: invalid JSON"); + client.end(); + } + bodyChunks = []; + } else { + bodyChunks.push(lineStr); + } + } else { + logger.info("invalid parser state"); + client.write("%error: internal error\n"); + client.end(); + } + }, + }); + client.on("close", () => { + res.onDisconnect(); + }); + client.on("data", () => {}); + resolve(res.result); + }); + }); +} + +export async function runRpcServer(args: RpcServerArgs): Promise { + let sockFilename = args.socketFilename; + try { + fs.unlinkSync(sockFilename); + } catch (e) { + // Do nothing! + } + return new Promise((resolve, reject) => { + const server = net.createServer((sock) => { + // Are we currently parsing the body of a request? + let parsingBody: string | undefined = undefined; + let bodyChunks: string[] = []; + + logger.info("got new connection"); + sock.write("%hello-from-server\n"); + const handlers = args.onConnect({ + sendResponse(message) { + sock.write("%message\n"); + sock.write(JSON.stringify(message)); + sock.write("\n"); + sock.write("%end\n"); + }, + }); + + sock.on("error", (err) => { + logger.info(`connection error: ${err}`); + }); + + function processLine(line: Uint8Array) { + const lineStr = bytesToString(line); + logger.info(`got line: ${lineStr}`); + if (!parsingBody) { + const strippedLine = lineStr.trim(); + if (strippedLine == "%request") { + logger.info("got request start"); + parsingBody = "request"; + } else if (strippedLine === "%hello-from-client") { + console.log("got hello from client"); + } else if (strippedLine.startsWith("%error:")) { + console.log("got error from client"); + sock.end(); + handlers.onDisconnect(); + } else { + logger.info("got unknown request"); + sock.write("%error: invalid request\n"); + sock.end(); + } + } else if (parsingBody == "request") { + const strippedLine = lineStr.trim(); + if (strippedLine == "%end") { + logger.info("finished request"); + let req = bodyChunks.join(""); + let reqJson: any = undefined; + try { + reqJson = JSON.parse(req); + } catch (e) { + logger.warn("JSON request was invalid"); + } + if (reqJson !== undefined) { + logger.info(`request: ${req}`); + handlers.onMessage(reqJson); + } else { + sock.write("%error: invalid JSON"); + sock.end(); + } + bodyChunks = []; + } else { + bodyChunks.push(lineStr); + } + } else { + logger.info("invalid parser state"); + sock.write("%error: internal error\n"); + sock.end(); + } + } + + readStreamLinewise({ + sock, + onLine: processLine, + }); + + sock.on("close", (hadError: boolean) => { + logger.info(`connection closed, hadError=${hadError}`); + handlers.onDisconnect(); + }); + }); + server.listen("wallet-core.sock"); + }); +} diff --git a/packages/taler-util/src/twrpc.ts b/packages/taler-util/src/twrpc.ts new file mode 100644 index 000000000..615491b42 --- /dev/null +++ b/packages/taler-util/src/twrpc.ts @@ -0,0 +1,64 @@ +/* + This file is part of GNU Taler + (C) 2023 Taler Systems S.A. + + GNU Taler is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + GNU Taler is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + GNU Taler; see the file COPYING. If not, see + */ + +/** + * Implementation for the wallet-core IPC protocol. + * + * Currently the protcol is completely unstable and only used internally + * by the wallet for testing purposes. + */ + + +// Platform-specific implementation +export { connectRpc, runRpcServer } from "#twrpc-impl"; + +export type JsonMessage = + | string + | number + | boolean + | null + | JsonMessage[] + | { [key: string]: JsonMessage }; + +export interface RpcServerClientHandlers { + onMessage(msg: JsonMessage): void; + onDisconnect(): void; +} + +export interface RpcServerClient { + sendResponse(message: JsonMessage): void; +} + +export interface RpcServerArgs { + socketFilename: string; + onConnect(client: RpcServerClient): RpcServerClientHandlers; +} + +export interface RpcClientServerConnection { + sendMessage(m: JsonMessage): void; + close(): void; +} + +export interface RpcConnectArgs { + socketFilename: string; + onEstablished(connection: RpcClientServerConnection): { + result: T; + onDisconnect(): void; + onMessage(m: JsonMessage): void; + }; +} + +