diff --git a/src/crypto/cryptoApi.ts b/src/crypto/cryptoApi.ts index d3a93ff8d..5e3237836 100644 --- a/src/crypto/cryptoApi.ts +++ b/src/crypto/cryptoApi.ts @@ -41,6 +41,7 @@ import { BenchmarkResult, CoinWithDenom, PayCoinInfo } from "../walletTypes"; import * as timer from "../timer"; import { startWorker } from "./startWorker"; +import { throws } from "assert"; /** * State of a crypto worker. @@ -98,6 +99,11 @@ export class CryptoApi { */ private numBusy: number = 0; + /** + * Did we stop accepting new requests? + */ + private stopped: boolean = false; + public enableTracing = false; /** @@ -106,6 +112,7 @@ export class CryptoApi { terminateWorkers() { for (let worker of this.workers) { if (worker.w) { + this.enableTracing && console.log("terminating worker"); worker.w.terminate(); if (worker.terminationTimerHandle) { worker.terminationTimerHandle.clear(); @@ -120,10 +127,19 @@ export class CryptoApi { } } + stop() { + this.terminateWorkers(); + this.stopped = true; + } + /** * Start a worker (if not started) and set as busy. */ wake(ws: WorkerState, work: WorkItem): void { + if (this.stopped) { + this.enableTracing && console.log("not waking, as cryptoApi is stopped"); + return; + } if (ws.currentWorkItem !== null) { throw Error("assertion failed"); } @@ -158,7 +174,7 @@ export class CryptoApi { ws.w = null; } }; - ws.terminationTimerHandle = timer.after(5 * 1000, destroy); + ws.terminationTimerHandle = timer.after(15 * 1000, destroy); } handleWorkerError(ws: WorkerState, e: ErrorEvent) { @@ -253,6 +269,7 @@ export class CryptoApi { priority: number, ...args: any[] ): Promise { + this.enableTracing && console.log("cryptoApi: doRpc called"); const p: Promise = new Promise((resolve, reject) => { const rpcId = this.nextRpcId++; const workItem: WorkItem = { diff --git a/src/crypto/nodeWorker.ts b/src/crypto/nodeWorker.ts index fa942387a..b5a2e8b44 100644 --- a/src/crypto/nodeWorker.ts +++ b/src/crypto/nodeWorker.ts @@ -92,6 +92,7 @@ export class Worker { * Forcibly terminate the worker thread. */ terminate () { + console.log("terminating node.js worker"); this.child.kill("SIGINT"); } } diff --git a/src/headless/taler-wallet-cli.ts b/src/headless/taler-wallet-cli.ts index c57c3ab00..e2b8b54ac 100644 --- a/src/headless/taler-wallet-cli.ts +++ b/src/headless/taler-wallet-cli.ts @@ -9,6 +9,9 @@ import Axios from "axios"; import URI = require("urijs"); import querystring = require("querystring"); +import { CheckPaymentResponse } from "../talerTypes"; + +const enableTracing = false; class ConsoleNotifier implements Notifier { notify(): void { @@ -18,29 +21,29 @@ class ConsoleNotifier implements Notifier { class ConsoleBadge implements Badge { startBusy(): void { - console.log("NOTIFICATION: busy"); + enableTracing && console.log("NOTIFICATION: busy"); } stopBusy(): void { - console.log("NOTIFICATION: busy end"); + enableTracing && console.log("NOTIFICATION: busy end"); } showNotification(): void { - console.log("NOTIFICATION: show"); + enableTracing && console.log("NOTIFICATION: show"); } clearNotification(): void { - console.log("NOTIFICATION: cleared"); + enableTracing && console.log("NOTIFICATION: cleared"); } } export class NodeHttpLib implements HttpRequestLibrary { async get(url: string): Promise { - console.log("making GET request to", url); + enableTracing && console.log("making GET request to", url); const resp = await Axios({ method: "get", url: url, responseType: "json", }); - console.log("got response", resp.data); - console.log("resp type", typeof resp.data); + enableTracing && console.log("got response", resp.data); + enableTracing && console.log("resp type", typeof resp.data); return { responseJson: resp.data, status: resp.status, @@ -51,15 +54,15 @@ export class NodeHttpLib implements HttpRequestLibrary { url: string, body: any, ): Promise { - console.log("making POST request to", url); + enableTracing && console.log("making POST request to", url); const resp = await Axios({ method: "post", url: url, responseType: "json", data: body, }); - console.log("got response", resp.data); - console.log("resp type", typeof resp.data); + enableTracing && console.log("got response", resp.data); + enableTracing && console.log("resp type", typeof resp.data); return { responseJson: resp.data, status: resp.status, @@ -70,15 +73,15 @@ export class NodeHttpLib implements HttpRequestLibrary { url: string, form: any, ): Promise { - console.log("making POST request to", url); + enableTracing && console.log("making POST request to", url); const resp = await Axios({ method: "post", url: url, data: querystring.stringify(form), responseType: "json", }); - console.log("got response", resp.data); - console.log("resp type", typeof resp.data); + enableTracing && console.log("got response", resp.data); + enableTracing && console.log("resp type", typeof resp.data); return { responseJson: resp.data, status: resp.status, @@ -152,6 +155,66 @@ async function createBankReserve( } } +class MerchantBackendConnection { + constructor( + public merchantBaseUrl: string, + public merchantInstance: string, + public apiKey: string, + ) {} + + async createOrder( + amount: string, + summary: string, + fulfillmentUrl: string, + ): Promise<{ orderId: string }> { + const reqUrl = new URI("order").absoluteTo(this.merchantBaseUrl).href(); + const orderReq = { + order: { + amount, + summary, + fulfillment_url: fulfillmentUrl, + instance: this.merchantInstance, + }, + }; + const resp = await Axios({ + method: "post", + url: reqUrl, + data: orderReq, + responseType: "json", + headers: { + Authorization: `ApiKey ${this.apiKey}`, + }, + }); + if (resp.status != 200) { + throw Error("failed to create bank reserve"); + } + const orderId = resp.data.order_id; + if (!orderId) { + throw Error("no order id in response"); + } + return { orderId }; + } + + async checkPayment(orderId: string): Promise { + const reqUrl = new URI("check-payment") + .absoluteTo(this.merchantBaseUrl) + .href(); + const resp = await Axios({ + method: "get", + url: reqUrl, + params: { order_id: orderId, instance: this.merchantInstance }, + responseType: "json", + headers: { + Authorization: `ApiKey ${this.apiKey}`, + }, + }); + if (resp.status != 200) { + throw Error("failed to check payment"); + } + return CheckPaymentResponse.checked(resp.data); + } +} + async function main() { const myNotifier = new ConsoleNotifier(); @@ -216,15 +279,54 @@ async function main() { await myWallet.confirmReserve({ reservePub: reserveResponse.reservePub }); - //await myWallet.waitForReserveDrained(reserveResponse.reservePub); + await myWallet.processReserve(reserveResponse.reservePub); - //myWallet.clearNotification(); + console.log("process reserve returned"); - //myWallet.stop(); + const balance = await myWallet.getBalances(); - const dbContents = await exportDb(myDb); + console.log(JSON.stringify(balance, null, 2)); - console.log("db:", JSON.stringify(dbContents, null, 2)); + const myMerchant = new MerchantBackendConnection( + "https://backend.test.taler.net/", + "default", + "sandbox", + ); + + const orderResp = await myMerchant.createOrder( + "TESTKUDOS:5", + "hello world", + "https://example.com/", + ); + + console.log("created order with orderId", orderResp.orderId); + + const paymentStatus = await myMerchant.checkPayment(orderResp.orderId); + + console.log("payment status", paymentStatus); + + const contractUrl = paymentStatus.contract_url; + if (!contractUrl) { + throw Error("no contract URL in payment response"); + } + + const proposalId = await myWallet.downloadProposal(contractUrl); + + console.log("proposal id", proposalId); + + const checkPayResult = await myWallet.checkPay(proposalId); + + console.log("check pay result", checkPayResult); + + const confirmPayResult = await myWallet.confirmPay(proposalId, undefined); + + console.log("confirmPayResult", confirmPayResult); + + const paymentStatus2 = await myMerchant.checkPayment(orderResp.orderId); + + console.log("payment status after wallet payment:", paymentStatus2); + + myWallet.stop(); } main().catch(err => { diff --git a/src/logging.ts b/src/logging.ts index ca073c10c..4e7b60b93 100644 --- a/src/logging.ts +++ b/src/logging.ts @@ -21,8 +21,8 @@ import { QueryRoot, Store, - openPromise, } from "./query"; +import { openPromise } from "./promiseUtils"; /** * Supported log levels. diff --git a/src/promiseUtils.ts b/src/promiseUtils.ts new file mode 100644 index 000000000..eb649471b --- /dev/null +++ b/src/promiseUtils.ts @@ -0,0 +1,39 @@ +/* + This file is part of GNU Taler + (C) 2019 GNUnet e.V. + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ + + export interface OpenedPromise { + promise: Promise; + resolve: (val: T) => void; + reject: (err: any) => void; + } + +/** + * Get an unresolved promise together with its extracted resolve / reject + * function. + */ +export function openPromise(): OpenedPromise { + let resolve: ((x?: any) => void) | null = null; + let reject: ((reason?: any) => void) | null = null; + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + if (!(resolve && reject)) { + // Never happens, unless JS implementation is broken + throw Error(); + } + return { resolve, reject, promise }; +} \ No newline at end of file diff --git a/src/query.ts b/src/query.ts index 5feb29a55..7308d9ede 100644 --- a/src/query.ts +++ b/src/query.ts @@ -14,13 +14,13 @@ TALER; see the file COPYING. If not, see */ - /** * Database query abstractions. * @module Query * @author Florian Dold */ + import { openPromise } from "./promiseUtils"; /** * Result of an inner join. @@ -38,18 +38,17 @@ export interface JoinLeftResult { right?: R; } - /** * Definition of an object store. */ export class Store { - constructor(public name: string, - public storeParams?: IDBObjectStoreParameters, - public validator?: (v: T) => T) { - } + constructor( + public name: string, + public storeParams?: IDBObjectStoreParameters, + public validator?: (v: T) => T, + ) {} } - /** * Options for an index. */ @@ -63,7 +62,6 @@ export interface IndexOptions { multiEntry?: boolean; } - /** * Definition of an index. */ @@ -78,7 +76,12 @@ export class Index { */ options: IndexOptions; - constructor(s: Store, public indexName: string, public keyPath: string | string[], options?: IndexOptions) { + constructor( + s: Store, + public indexName: string, + public keyPath: string | string[], + options?: IndexOptions, + ) { const defaultOptions = { multiEntry: false, }; @@ -91,7 +94,7 @@ export class Index { * because otherwise the compiler complains. In iterIndex the * key type is pretty useful. */ - protected _dummyKey: S|undefined; + protected _dummyKey: S | undefined; } /** @@ -104,21 +107,29 @@ export interface QueryStream { * The left side of the join is extracted via a function from the stream's * result, the right side of the join is the key of the index. */ - indexJoin(index: Index, keyFn: (obj: T) => I): QueryStream>; + indexJoin( + index: Index, + keyFn: (obj: T) => I, + ): QueryStream>; /** * Join the current query with values from an index, and keep values in the * current stream that don't have a match. The left side of the join is * extracted via a function from the stream's result, the right side of the * join is the key of the index. */ - indexJoinLeft(index: Index, - keyFn: (obj: T) => I): QueryStream>; + indexJoinLeft( + index: Index, + keyFn: (obj: T) => I, + ): QueryStream>; /** * Join the current query with values from another object store. * The left side of the join is extracted via a function over the current query, * the right side of the join is the key of the object store. */ - keyJoin(store: Store, keyFn: (obj: T) => I): QueryStream>; + keyJoin( + store: Store, + keyFn: (obj: T) => I, + ): QueryStream>; /** * Only keep elements in the result stream for which the predicate returns @@ -166,7 +177,6 @@ export interface QueryStream { run(): Promise; } - /** * Query result that consists of at most one value. */ @@ -184,20 +194,25 @@ export interface QueryValue { * branch). This is necessary since IndexedDB does not allow long-lived * transactions. */ - cond(f: (x: T) => boolean, onTrue: (r: QueryRoot) => R, onFalse: (r: QueryRoot) => R): Promise; + cond( + f: (x: T) => boolean, + onTrue: (r: QueryRoot) => R, + onFalse: (r: QueryRoot) => R, + ): Promise; } - abstract class BaseQueryValue implements QueryValue { - - constructor(public root: QueryRoot) { - } + constructor(public root: QueryRoot) {} map(f: (x: T) => S): QueryValue { return new MapQueryValue(this, f); } - cond(f: (x: T) => boolean, onTrue: (r: QueryRoot) => R, onFalse: (r: QueryRoot) => R): Promise { + cond( + f: (x: T) => boolean, + onTrue: (r: QueryRoot) => R, + onFalse: (r: QueryRoot) => R, + ): Promise { return new Promise((resolve, reject) => { this.subscribeOne((v, tx) => { if (f(v)) { @@ -228,7 +243,7 @@ class FirstQueryValue extends BaseQueryValue { return; } if (isDone) { - f(undefined, tx); + f(undefined, tx); } else { f(value, tx); } @@ -247,37 +262,16 @@ class MapQueryValue extends BaseQueryValue { } } - /** * Exception that should be thrown by client code to abort a transaction. */ export const AbortTransaction = Symbol("abort_transaction"); -/** - * Get an unresolved promise together with its extracted resolve / reject - * function. - */ -export function openPromise(): any { - let resolve: ((x?: any) => void) | null = null; - let reject: ((reason?: any) => void) | null = null; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - if (!(resolve && reject)) { - // Never happens, unless JS implementation is broken - throw Error(); - } - return {resolve, reject, promise}; -} - - abstract class QueryStreamBase implements QueryStream { - abstract subscribe(f: (isDone: boolean, - value: any, - tx: IDBTransaction) => void): void; - constructor(public root: QueryRoot) { - } + abstract subscribe( + f: (isDone: boolean, value: any, tx: IDBTransaction) => void, + ): void; + constructor(public root: QueryRoot) {} first(): QueryValue { return new FirstQueryValue(this); @@ -291,20 +285,36 @@ abstract class QueryStreamBase implements QueryStream { return new QueryStreamMap(this, f); } - indexJoin(index: Index, - keyFn: (obj: T) => I): QueryStream> { + indexJoin( + index: Index, + keyFn: (obj: T) => I, + ): QueryStream> { this.root.addStoreAccess(index.storeName, false); - return new QueryStreamIndexJoin(this, index.storeName, index.indexName, keyFn); + return new QueryStreamIndexJoin( + this, + index.storeName, + index.indexName, + keyFn, + ); } - indexJoinLeft(index: Index, - keyFn: (obj: T) => I): QueryStream> { + indexJoinLeft( + index: Index, + keyFn: (obj: T) => I, + ): QueryStream> { this.root.addStoreAccess(index.storeName, false); - return new QueryStreamIndexJoinLeft(this, index.storeName, index.indexName, keyFn); + return new QueryStreamIndexJoinLeft( + this, + index.storeName, + index.indexName, + keyFn, + ); } - keyJoin(store: Store, - keyFn: (obj: T) => I): QueryStream> { + keyJoin( + store: Store, + keyFn: (obj: T) => I, + ): QueryStream> { this.root.addStoreAccess(store.name, false); return new QueryStreamKeyJoin(this, store.name, keyFn); } @@ -314,7 +324,7 @@ abstract class QueryStreamBase implements QueryStream { } toArray(): Promise { - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); const values: T[] = []; this.subscribe((isDone, value) => { @@ -326,12 +336,12 @@ abstract class QueryStreamBase implements QueryStream { }); return Promise.resolve() - .then(() => this.root.finish()) - .then(() => promise); + .then(() => this.root.finish()) + .then(() => promise); } fold(f: (x: T, acc: A) => A, init: A): Promise { - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); let acc = init; this.subscribe((isDone, value) => { @@ -343,12 +353,12 @@ abstract class QueryStreamBase implements QueryStream { }); return Promise.resolve() - .then(() => this.root.finish()) - .then(() => promise); + .then(() => this.root.finish()) + .then(() => promise); } forEach(f: (x: T) => void): Promise { - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); this.subscribe((isDone, value) => { if (isDone) { @@ -359,12 +369,12 @@ abstract class QueryStreamBase implements QueryStream { }); return Promise.resolve() - .then(() => this.root.finish()) - .then(() => promise); + .then(() => this.root.finish()) + .then(() => promise); } run(): Promise { - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); this.subscribe((isDone, value) => { if (isDone) { @@ -374,8 +384,8 @@ abstract class QueryStreamBase implements QueryStream { }); return Promise.resolve() - .then(() => this.root.finish()) - .then(() => promise); + .then(() => this.root.finish()) + .then(() => promise); } } @@ -401,7 +411,6 @@ class QueryStreamFilter extends QueryStreamBase { } } - class QueryStreamFlatMap extends QueryStreamBase { constructor(public s: QueryStreamBase, public flatMapFn: (v: T) => S[]) { super(s.root); @@ -421,7 +430,6 @@ class QueryStreamFlatMap extends QueryStreamBase { } } - class QueryStreamMap extends QueryStreamBase { constructor(public s: QueryStreamBase, public mapFn: (v: S) => T) { super(s.root); @@ -439,10 +447,13 @@ class QueryStreamMap extends QueryStreamBase { } } - class QueryStreamIndexJoin extends QueryStreamBase> { - constructor(public s: QueryStreamBase, public storeName: string, public indexName: string, - public key: any) { + constructor( + public s: QueryStreamBase, + public storeName: string, + public indexName: string, + public key: any, + ) { super(s.root); } @@ -457,7 +468,7 @@ class QueryStreamIndexJoin extends QueryStreamBase> { req.onsuccess = () => { const cursor = req.result; if (cursor) { - f(false, {left: value, right: cursor.value}, tx); + f(false, { left: value, right: cursor.value }, tx); cursor.continue(); } }; @@ -465,10 +476,15 @@ class QueryStreamIndexJoin extends QueryStreamBase> { } } - -class QueryStreamIndexJoinLeft extends QueryStreamBase> { - constructor(public s: QueryStreamBase, public storeName: string, public indexName: string, - public key: any) { +class QueryStreamIndexJoinLeft extends QueryStreamBase< + JoinLeftResult +> { + constructor( + public s: QueryStreamBase, + public storeName: string, + public indexName: string, + public key: any, + ) { super(s.root); } @@ -485,11 +501,11 @@ class QueryStreamIndexJoinLeft extends QueryStreamBase extends QueryStreamBase extends QueryStreamBase> { - constructor(public s: QueryStreamBase, public storeName: string, - public key: any) { + constructor( + public s: QueryStreamBase, + public storeName: string, + public key: any, + ) { super(s.root); } @@ -515,7 +533,7 @@ class QueryStreamKeyJoin extends QueryStreamBase> { req.onsuccess = () => { const cursor = req.result; if (cursor) { - f(false, {left: value, right: cursor.value}, tx); + f(false, { left: value, right: cursor.value }, tx); cursor.continue(); } else { f(true, undefined, tx); @@ -525,7 +543,6 @@ class QueryStreamKeyJoin extends QueryStreamBase> { } } - class IterQueryStream extends QueryStreamBase { private storeName: string; private options: any; @@ -538,11 +555,10 @@ class IterQueryStream extends QueryStreamBase { this.subscribers = []; const doIt = (tx: IDBTransaction) => { - const {indexName = void 0, only = void 0} = this.options; + const { indexName = void 0, only = void 0 } = this.options; let s: any; if (indexName !== void 0) { - s = tx.objectStore(this.storeName) - .index(this.options.indexName); + s = tx.objectStore(this.storeName).index(this.options.indexName); } else { s = tx.objectStore(this.storeName); } @@ -574,12 +590,11 @@ class IterQueryStream extends QueryStreamBase { } } - /** * Root wrapper around an IndexedDB for queries with a fluent interface. */ export class QueryRoot { - private work: Array<((t: IDBTransaction) => void)> = []; + private work: Array<(t: IDBTransaction) => void> = []; private stores: Set = new Set(); private kickoffPromise: Promise; @@ -595,13 +610,12 @@ export class QueryRoot { private keys: { [keyName: string]: IDBValidKey } = {}; - constructor(public db: IDBDatabase) { - } + constructor(public db: IDBDatabase) {} /** * Get a named key that was created during the query. */ - key(keyName: string): IDBValidKey|undefined { + key(keyName: string): IDBValidKey | undefined { return this.keys[keyName]; } @@ -626,7 +640,7 @@ export class QueryRoot { */ count(store: Store): Promise { this.checkFinished(); - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); const doCount = (tx: IDBTransaction) => { const s = tx.objectStore(store.name); @@ -638,15 +652,17 @@ export class QueryRoot { this.addWork(doCount, store.name, false); return Promise.resolve() - .then(() => this.finish()) - .then(() => promise); - + .then(() => this.finish()) + .then(() => promise); } /** * Delete all objects in a store that match a predicate. */ - deleteIf(store: Store, predicate: (x: T, n: number) => boolean): QueryRoot { + deleteIf( + store: Store, + predicate: (x: T, n: number) => boolean, + ): QueryRoot { this.checkFinished(); const doDeleteIf = (tx: IDBTransaction) => { const s = tx.objectStore(store.name); @@ -666,8 +682,10 @@ export class QueryRoot { return this; } - iterIndex(index: Index, - only?: S): QueryStream { + iterIndex( + index: Index, + only?: S, + ): QueryStream { this.checkFinished(); this.stores.add(index.storeName); this.scheduleFinish(); @@ -688,7 +706,7 @@ export class QueryRoot { const req = tx.objectStore(store.name).put(val); if (keyName) { req.onsuccess = () => { - this.keys[keyName] = req.result; + this.keys[keyName] = req.result; }; } }; @@ -702,7 +720,7 @@ export class QueryRoot { */ putOrGetExisting(store: Store, val: T, key: IDBValidKey): Promise { this.checkFinished(); - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); const doPutOrGet = (tx: IDBTransaction) => { const objstore = tx.objectStore(store.name); const req = objstore.get(key); @@ -722,10 +740,9 @@ export class QueryRoot { return promise; } - putWithResult(store: Store, val: T): Promise { this.checkFinished(); - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); const doPutWithResult = (tx: IDBTransaction) => { const req = tx.objectStore(store.name).put(val); req.onsuccess = () => { @@ -735,18 +752,17 @@ export class QueryRoot { }; this.addWork(doPutWithResult, store.name, true); return Promise.resolve() - .then(() => this.finish()) - .then(() => promise); + .then(() => this.finish()) + .then(() => promise); } - /** * Update objects inside a transaction. * * If the mutation function throws AbortTransaction, the whole transaction will be aborted. * If the mutation function returns undefined or null, no modification will be made. */ - mutate(store: Store, key: any, f: (v: T) => T|undefined): QueryRoot { + mutate(store: Store, key: any, f: (v: T) => T | undefined): QueryRoot { this.checkFinished(); const doPut = (tx: IDBTransaction) => { const req = tx.objectStore(store.name).openCursor(IDBKeyRange.only(key)); @@ -754,7 +770,7 @@ export class QueryRoot { const cursor = req.result; if (cursor) { const value = cursor.value; - let modifiedValue: T|undefined; + let modifiedValue: T | undefined; try { modifiedValue = f(value); } catch (e) { @@ -776,7 +792,6 @@ export class QueryRoot { return this; } - /** * Add all object from an iterable to the given object store. */ @@ -810,13 +825,13 @@ export class QueryRoot { /** * Get one object from a store by its key. */ - get(store: Store, key: any): Promise { + get(store: Store, key: any): Promise { this.checkFinished(); if (key === void 0) { throw Error("key must not be undefined"); } - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); const doGet = (tx: IDBTransaction) => { const req = tx.objectStore(store.name).get(key); @@ -827,8 +842,8 @@ export class QueryRoot { this.addWork(doGet, store.name, false); return Promise.resolve() - .then(() => this.finish()) - .then(() => promise); + .then(() => this.finish()) + .then(() => promise); } /** @@ -839,7 +854,7 @@ export class QueryRoot { getMany(store: Store, keys: any[]): Promise { this.checkFinished(); - const { resolve, promise } = openPromise(); + const { resolve, promise } = openPromise(); const results: T[] = []; const doGetMany = (tx: IDBTransaction) => { @@ -859,26 +874,29 @@ export class QueryRoot { this.addWork(doGetMany, store.name, false); return Promise.resolve() - .then(() => this.finish()) - .then(() => promise); + .then(() => this.finish()) + .then(() => promise); } /** * Get one object from a store by its key. */ - getIndexed(index: Index, - key: I): Promise { + getIndexed( + index: Index, + key: I, + ): Promise { this.checkFinished(); if (key === void 0) { throw Error("key must not be undefined"); } - const {resolve, promise} = openPromise(); + const { resolve, promise } = openPromise(); const doGetIndexed = (tx: IDBTransaction) => { - const req = tx.objectStore(index.storeName) - .index(index.indexName) - .get(key); + const req = tx + .objectStore(index.storeName) + .index(index.indexName) + .get(key); req.onsuccess = () => { resolve(req.result); }; @@ -886,8 +904,8 @@ export class QueryRoot { this.addWork(doGetIndexed, index.storeName, false); return Promise.resolve() - .then(() => this.finish()) - .then(() => promise); + .then(() => this.finish()) + .then(() => promise); } private scheduleFinish() { @@ -917,10 +935,12 @@ export class QueryRoot { resolve(); }; tx.onabort = () => { - console.warn(`aborted ${mode} transaction on stores [${[... this.stores]}]`); + console.warn( + `aborted ${mode} transaction on stores [${[...this.stores]}]`, + ); reject(Error("transaction aborted")); }; - tx.onerror = (e) => { + tx.onerror = e => { console.warn(`error in transaction`, (e.target as any).error); }; for (const w of this.work) { @@ -946,9 +966,11 @@ export class QueryRoot { /** * Low-level function to add a task to the internal work queue. */ - addWork(workFn: (t: IDBTransaction) => void, - storeName?: string, - isWrite?: boolean) { + addWork( + workFn: (t: IDBTransaction) => void, + storeName?: string, + isWrite?: boolean, + ) { this.work.push(workFn); if (storeName) { this.addStoreAccess(storeName, isWrite); diff --git a/src/talerTypes.ts b/src/talerTypes.ts index e8bb2e510..9176daf77 100644 --- a/src/talerTypes.ts +++ b/src/talerTypes.ts @@ -905,3 +905,30 @@ export class Proposal { */ static checked: (obj: any) => Proposal; } + +/** + * Response from the internal merchant API. + */ +@Checkable.Class({extra: true}) +export class CheckPaymentResponse { + @Checkable.Boolean() + paid: boolean; + + @Checkable.Optional(Checkable.Boolean()) + refunded: boolean | undefined; + + @Checkable.Optional(Checkable.String()) + refunded_amount: string | undefined; + + @Checkable.Optional(Checkable.Value(() => ContractTerms)) + contract_terms: ContractTerms | undefined; + + @Checkable.Optional(Checkable.String()) + contract_url: string | undefined; + + /** + * Verify that a value matches the schema of this class and convert it into a + * member. + */ + static checked: (obj: any) => CheckPaymentResponse; +} \ No newline at end of file diff --git a/src/wallet.ts b/src/wallet.ts index 4fc108a11..6d4eeb26c 100644 --- a/src/wallet.ts +++ b/src/wallet.ts @@ -105,6 +105,7 @@ import { WalletBalance, WalletBalanceEntry, } from "./walletTypes"; +import { openPromise } from "./promiseUtils"; interface SpeculativePayData { payCoinInfo: PayCoinInfo; @@ -327,6 +328,7 @@ export class Wallet { * IndexedDB database used by the wallet. */ db: IDBDatabase; + private enableTracing = false; private http: HttpRequestLibrary; private badge: Badge; private notifier: Notifier; @@ -337,6 +339,12 @@ export class Wallet { private speculativePayData: SpeculativePayData | undefined; private cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; private activeTipOperations: { [s: string]: Promise } = {}; + private activeProcessReserveOperations: { + [reservePub: string]: Promise; + } = {}; + private activeProcessPreCoinOperations: { + [preCoinPub: string]: Promise; + } = {}; /** * Set of identifiers for running operations. @@ -426,14 +434,14 @@ export class Wallet { .iter(Stores.reserves) .forEach(reserve => { console.log("resuming reserve", reserve.reserve_pub); - this.processReserve(reserve); + this.processReserve(reserve.reserve_pub); }); this.q() .iter(Stores.precoins) .forEach(preCoin => { console.log("resuming precoin"); - this.processPreCoin(preCoin); + this.processPreCoin(preCoin.coinPub); }); this.q() @@ -1073,151 +1081,184 @@ export class Wallet { * First fetch information requred to withdraw from the reserve, * then deplete the reserve, withdrawing coins until it is empty. */ - private async processReserve( - reserveRecord: ReserveRecord, - retryDelayMs: number = 250, - ): Promise { - const opId = "reserve-" + reserveRecord.reserve_pub; + async processReserve(reservePub: string): Promise { + const activeOperation = this.activeProcessReserveOperations[reservePub]; + + if (activeOperation) { + return activeOperation; + } + + const opId = "reserve-" + reservePub; this.startOperation(opId); + // This opened promise gets resolved only once the + // reserve withdraw operation succeeds, even after retries. + const op = openPromise(); + + const processReserveInternal = async (retryDelayMs: number = 250) => { + try { + const reserve = await this.updateReserve(reservePub); + await this.depleteReserve(reserve); + op.resolve(); + } catch (e) { + // random, exponential backoff truncated at 3 minutes + const nextDelay = Math.min( + 2 * retryDelayMs + retryDelayMs * Math.random(), + 3000 * 60, + ); + console.warn( + `Failed to deplete reserve, trying again in ${retryDelayMs} ms`, + ); + this.timerGroup.after(retryDelayMs, () => + processReserveInternal(nextDelay), + ); + } + }; + try { - const reserve = await this.updateReserve(reserveRecord.reserve_pub); - await this.depleteReserve(reserve); - } catch (e) { - // random, exponential backoff truncated at 3 minutes - const nextDelay = Math.min( - 2 * retryDelayMs + retryDelayMs * Math.random(), - 3000 * 60, - ); - console.warn( - `Failed to deplete reserve, trying again in ${retryDelayMs} ms`, - ); - this.timerGroup.after(retryDelayMs, () => - this.processReserve(reserveRecord, nextDelay), - ); + processReserveInternal(); + this.activeProcessReserveOperations[reservePub] = op.promise; + await op.promise; } finally { this.stopOperation(opId); + delete this.activeProcessReserveOperations[reservePub]; } } /** * Given a planchet, withdraw a coin from the exchange. */ - private async processPreCoin( - preCoin: PreCoinRecord, - retryDelayMs = 200, - ): Promise { - // Throttle concurrent executions of this function, so we don't withdraw too many coins at once. - if ( - this.processPreCoinConcurrent >= 4 || - this.processPreCoinThrottle[preCoin.exchangeBaseUrl] - ) { - console.log("delaying processPreCoin"); - this.timerGroup.after(retryDelayMs, () => - this.processPreCoin(preCoin, Math.min(retryDelayMs * 2, 5 * 60 * 1000)), - ); - return; + private async processPreCoin(preCoinPub: string): Promise { + const activeOperation = this.activeProcessPreCoinOperations[preCoinPub]; + if (activeOperation) { + return activeOperation; } - console.log("executing processPreCoin", preCoin); - this.processPreCoinConcurrent++; - try { - const exchange = await this.q().get( - Stores.exchanges, - preCoin.exchangeBaseUrl, - ); - if (!exchange) { - console.error("db inconsistent: exchange for precoin not found"); + + const op = openPromise(); + + const processPreCoinInternal = async (retryDelayMs: number = 200) => { + const preCoin = await this.q().get(Stores.precoins, preCoinPub); + if (!preCoin) { + console.log("processPreCoin: preCoinPub not found"); return; } - const denom = await this.q().get(Stores.denominations, [ - preCoin.exchangeBaseUrl, - preCoin.denomPub, - ]); - if (!denom) { - console.error("db inconsistent: denom for precoin not found"); - return; + // Throttle concurrent executions of this function, + // so we don't withdraw too many coins at once. + if ( + this.processPreCoinConcurrent >= 4 || + this.processPreCoinThrottle[preCoin.exchangeBaseUrl] + ) { + this.enableTracing && console.log("delaying processPreCoin"); + this.timerGroup.after(retryDelayMs, () => + processPreCoinInternal(Math.min(retryDelayMs * 2, 5 * 60 * 1000)), + ); + return op.promise; } - const coin = await this.withdrawExecute(preCoin); - console.log("processPreCoin: got coin", coin); + //console.log("executing processPreCoin", preCoin); + this.processPreCoinConcurrent++; - const mutateReserve = (r: ReserveRecord) => { - console.log( - `before committing coin: current ${amountToPretty( - r.current_amount!, - )}, precoin: ${amountToPretty(r.precoin_amount)})}`, + try { + const exchange = await this.q().get( + Stores.exchanges, + preCoin.exchangeBaseUrl, ); - - const x = Amounts.sub( - r.precoin_amount, - preCoin.coinValue, - denom.feeWithdraw, - ); - if (x.saturated) { - console.error("database inconsistent"); - throw AbortTransaction; + if (!exchange) { + console.error("db inconsistent: exchange for precoin not found"); + return; + } + const denom = await this.q().get(Stores.denominations, [ + preCoin.exchangeBaseUrl, + preCoin.denomPub, + ]); + if (!denom) { + console.error("db inconsistent: denom for precoin not found"); + return; } - r.precoin_amount = x.amount; - return r; - }; - await this.q() - .mutate(Stores.reserves, preCoin.reservePub, mutateReserve) - .delete(Stores.precoins, coin.coinPub) - .add(Stores.coins, coin) - .finish(); + const coin = await this.withdrawExecute(preCoin); - if (coin.status === CoinStatus.TainedByTip) { - const tip = await this.q().getIndexed( - Stores.tips.coinPubIndex, - coin.coinPub, - ); - if (!tip) { - throw Error( - `inconsistent DB: tip for coin pub ${coin.coinPub} not found.`, + const mutateReserve = (r: ReserveRecord) => { + const x = Amounts.sub( + r.precoin_amount, + preCoin.coinValue, + denom.feeWithdraw, ); - } + if (x.saturated) { + console.error("database inconsistent"); + throw AbortTransaction; + } + r.precoin_amount = x.amount; + return r; + }; - if (tip.accepted) { - console.log("untainting already accepted tip"); - // Transactionally set coin to fresh. - const mutateCoin = (c: CoinRecord) => { - if (c.status === CoinStatus.TainedByTip) { - c.status = CoinStatus.Fresh; - } - return c; - }; - await this.q().mutate(Stores.coins, coin.coinPub, mutateCoin); - // Show notifications only for accepted tips + await this.q() + .mutate(Stores.reserves, preCoin.reservePub, mutateReserve) + .delete(Stores.precoins, coin.coinPub) + .add(Stores.coins, coin) + .finish(); + + if (coin.status === CoinStatus.TainedByTip) { + const tip = await this.q().getIndexed( + Stores.tips.coinPubIndex, + coin.coinPub, + ); + if (!tip) { + throw Error( + `inconsistent DB: tip for coin pub ${coin.coinPub} not found.`, + ); + } + + if (tip.accepted) { + console.log("untainting already accepted tip"); + // Transactionally set coin to fresh. + const mutateCoin = (c: CoinRecord) => { + if (c.status === CoinStatus.TainedByTip) { + c.status = CoinStatus.Fresh; + } + return c; + }; + await this.q().mutate(Stores.coins, coin.coinPub, mutateCoin); + // Show notifications only for accepted tips + this.badge.showNotification(); + } + } else { this.badge.showNotification(); } - } else { - this.badge.showNotification(); + + this.notifier.notify(); + op.resolve(); + } catch (e) { + console.error( + "Failed to withdraw coin from precoin, retrying in", + retryDelayMs, + "ms", + e, + ); + // exponential backoff truncated at one minute + const nextRetryDelayMs = Math.min(retryDelayMs * 2, 5 * 60 * 1000); + this.timerGroup.after(retryDelayMs, () => + processPreCoinInternal(nextRetryDelayMs), + ); + + const currentThrottle = + this.processPreCoinThrottle[preCoin.exchangeBaseUrl] || 0; + this.processPreCoinThrottle[preCoin.exchangeBaseUrl] = + currentThrottle + 1; + this.timerGroup.after(retryDelayMs, () => { + this.processPreCoinThrottle[preCoin.exchangeBaseUrl]--; + }); + } finally { + this.processPreCoinConcurrent--; } + }; - this.notifier.notify(); - } catch (e) { - console.error( - "Failed to withdraw coin from precoin, retrying in", - retryDelayMs, - "ms", - e, - ); - // exponential backoff truncated at one minute - const nextRetryDelayMs = Math.min(retryDelayMs * 2, 5 * 60 * 1000); - this.timerGroup.after(retryDelayMs, () => - this.processPreCoin(preCoin, nextRetryDelayMs), - ); - - const currentThrottle = - this.processPreCoinThrottle[preCoin.exchangeBaseUrl] || 0; - this.processPreCoinThrottle[preCoin.exchangeBaseUrl] = - currentThrottle + 1; - this.timerGroup.after(retryDelayMs, () => { - this.processPreCoinThrottle[preCoin.exchangeBaseUrl]--; - }); + try { + this.activeProcessPreCoinOperations[preCoinPub] = op.promise; + await processPreCoinInternal(); + return op.promise; } finally { - this.processPreCoinConcurrent--; + delete this.activeProcessPreCoinOperations[preCoinPub]; } } @@ -1332,9 +1373,8 @@ export class Wallet { .finish(); this.notifier.notify(); - this.processReserve(reserve); + this.processReserve(reserve.reserve_pub); } - private async withdrawExecute(pc: PreCoinRecord): Promise { const wd: any = {}; @@ -1424,20 +1464,22 @@ export class Wallet { r.timestamp_depleted = new Date().getTime(); } - console.log( - `after creating precoin: current ${amountToPretty( - r.current_amount, - )}, precoin: ${amountToPretty(r.precoin_amount)})}`, - ); - return r; } const preCoin = await this.cryptoApi.createPreCoin(denom, reserve); - await this.q() - .put(Stores.precoins, preCoin) - .mutate(Stores.reserves, reserve.reserve_pub, mutateReserve); - await this.processPreCoin(preCoin); + // This will fail and throw an exception if the remaining amount in the + // reserve is too low to create a pre-coin. + try { + await this.q() + .put(Stores.precoins, preCoin) + .mutate(Stores.reserves, reserve.reserve_pub, mutateReserve) + .finish(); + } catch (e) { + console.log("can't create pre-coin:", e.name, e.message); + return; + } + await this.processPreCoin(preCoin.coinPub); }); await Promise.all(ps); @@ -1746,7 +1788,10 @@ export class Wallet { return ret; } - async getExchangePaytoUri(exchangeBaseUrl: string, supportedTargetTypes: string[]): Promise { + async getExchangePaytoUri( + exchangeBaseUrl: string, + supportedTargetTypes: string[], + ): Promise { const wireInfo = await this.getWireInfo(exchangeBaseUrl); for (let account of wireInfo.accounts) { const paytoUri = new URI(account.url); @@ -1820,8 +1865,6 @@ export class Wallet { throw Error("exchange doesn't offer any denominations"); } - console.log("updating exchange with wireMethodDetails", wireMethodDetails); - const r = await this.q().get(Stores.exchanges, baseUrl); let exchangeInfo: ExchangeRecord; @@ -2714,7 +2757,7 @@ export class Wallet { */ stop() { this.timerGroup.stopCurrentAndFutureTimers(); - this.cryptoApi.terminateWorkers(); + this.cryptoApi.stop(); } async getSenderWireInfos(): Promise { @@ -3199,7 +3242,7 @@ export class Wallet { withdrawSig: response.reserve_sigs[i].reserve_sig, }; await this.q().put(Stores.precoins, preCoin); - this.processPreCoin(preCoin); + this.processPreCoin(preCoin.coinPub); } tipRecord.pickedUp = true; diff --git a/tsconfig.json b/tsconfig.json index 7cbde9642..4a2f09e87 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -48,6 +48,7 @@ "src/libtoolVersion-test.ts", "src/libtoolVersion.ts", "src/logging.ts", + "src/promiseUtils.ts", "src/query.ts", "src/talerTypes.ts", "src/timer.ts",