headless/android port, PoC for CLI / headless tests

This commit is contained in:
Florian Dold 2019-08-01 23:21:15 +02:00
parent 92b04858a3
commit 5f62d83a4d
No known key found for this signature in database
GPG Key ID: D2E4F00F29D02A4B
9 changed files with 537 additions and 285 deletions

View File

@ -41,6 +41,7 @@ import { BenchmarkResult, CoinWithDenom, PayCoinInfo } from "../walletTypes";
import * as timer from "../timer"; import * as timer from "../timer";
import { startWorker } from "./startWorker"; import { startWorker } from "./startWorker";
import { throws } from "assert";
/** /**
* State of a crypto worker. * State of a crypto worker.
@ -98,6 +99,11 @@ export class CryptoApi {
*/ */
private numBusy: number = 0; private numBusy: number = 0;
/**
* Did we stop accepting new requests?
*/
private stopped: boolean = false;
public enableTracing = false; public enableTracing = false;
/** /**
@ -106,6 +112,7 @@ export class CryptoApi {
terminateWorkers() { terminateWorkers() {
for (let worker of this.workers) { for (let worker of this.workers) {
if (worker.w) { if (worker.w) {
this.enableTracing && console.log("terminating worker");
worker.w.terminate(); worker.w.terminate();
if (worker.terminationTimerHandle) { if (worker.terminationTimerHandle) {
worker.terminationTimerHandle.clear(); worker.terminationTimerHandle.clear();
@ -120,10 +127,19 @@ export class CryptoApi {
} }
} }
stop() {
this.terminateWorkers();
this.stopped = true;
}
/** /**
* Start a worker (if not started) and set as busy. * Start a worker (if not started) and set as busy.
*/ */
wake(ws: WorkerState, work: WorkItem): void { wake(ws: WorkerState, work: WorkItem): void {
if (this.stopped) {
this.enableTracing && console.log("not waking, as cryptoApi is stopped");
return;
}
if (ws.currentWorkItem !== null) { if (ws.currentWorkItem !== null) {
throw Error("assertion failed"); throw Error("assertion failed");
} }
@ -158,7 +174,7 @@ export class CryptoApi {
ws.w = null; ws.w = null;
} }
}; };
ws.terminationTimerHandle = timer.after(5 * 1000, destroy); ws.terminationTimerHandle = timer.after(15 * 1000, destroy);
} }
handleWorkerError(ws: WorkerState, e: ErrorEvent) { handleWorkerError(ws: WorkerState, e: ErrorEvent) {
@ -253,6 +269,7 @@ export class CryptoApi {
priority: number, priority: number,
...args: any[] ...args: any[]
): Promise<T> { ): Promise<T> {
this.enableTracing && console.log("cryptoApi: doRpc called");
const p: Promise<T> = new Promise<T>((resolve, reject) => { const p: Promise<T> = new Promise<T>((resolve, reject) => {
const rpcId = this.nextRpcId++; const rpcId = this.nextRpcId++;
const workItem: WorkItem = { const workItem: WorkItem = {

View File

@ -92,6 +92,7 @@ export class Worker {
* Forcibly terminate the worker thread. * Forcibly terminate the worker thread.
*/ */
terminate () { terminate () {
console.log("terminating node.js worker");
this.child.kill("SIGINT"); this.child.kill("SIGINT");
} }
} }

View File

@ -9,6 +9,9 @@ import Axios from "axios";
import URI = require("urijs"); import URI = require("urijs");
import querystring = require("querystring"); import querystring = require("querystring");
import { CheckPaymentResponse } from "../talerTypes";
const enableTracing = false;
class ConsoleNotifier implements Notifier { class ConsoleNotifier implements Notifier {
notify(): void { notify(): void {
@ -18,29 +21,29 @@ class ConsoleNotifier implements Notifier {
class ConsoleBadge implements Badge { class ConsoleBadge implements Badge {
startBusy(): void { startBusy(): void {
console.log("NOTIFICATION: busy"); enableTracing && console.log("NOTIFICATION: busy");
} }
stopBusy(): void { stopBusy(): void {
console.log("NOTIFICATION: busy end"); enableTracing && console.log("NOTIFICATION: busy end");
} }
showNotification(): void { showNotification(): void {
console.log("NOTIFICATION: show"); enableTracing && console.log("NOTIFICATION: show");
} }
clearNotification(): void { clearNotification(): void {
console.log("NOTIFICATION: cleared"); enableTracing && console.log("NOTIFICATION: cleared");
} }
} }
export class NodeHttpLib implements HttpRequestLibrary { export class NodeHttpLib implements HttpRequestLibrary {
async get(url: string): Promise<import("../http").HttpResponse> { async get(url: string): Promise<import("../http").HttpResponse> {
console.log("making GET request to", url); enableTracing && console.log("making GET request to", url);
const resp = await Axios({ const resp = await Axios({
method: "get", method: "get",
url: url, url: url,
responseType: "json", responseType: "json",
}); });
console.log("got response", resp.data); enableTracing && console.log("got response", resp.data);
console.log("resp type", typeof resp.data); enableTracing && console.log("resp type", typeof resp.data);
return { return {
responseJson: resp.data, responseJson: resp.data,
status: resp.status, status: resp.status,
@ -51,15 +54,15 @@ export class NodeHttpLib implements HttpRequestLibrary {
url: string, url: string,
body: any, body: any,
): Promise<import("../http").HttpResponse> { ): Promise<import("../http").HttpResponse> {
console.log("making POST request to", url); enableTracing && console.log("making POST request to", url);
const resp = await Axios({ const resp = await Axios({
method: "post", method: "post",
url: url, url: url,
responseType: "json", responseType: "json",
data: body, data: body,
}); });
console.log("got response", resp.data); enableTracing && console.log("got response", resp.data);
console.log("resp type", typeof resp.data); enableTracing && console.log("resp type", typeof resp.data);
return { return {
responseJson: resp.data, responseJson: resp.data,
status: resp.status, status: resp.status,
@ -70,15 +73,15 @@ export class NodeHttpLib implements HttpRequestLibrary {
url: string, url: string,
form: any, form: any,
): Promise<import("../http").HttpResponse> { ): Promise<import("../http").HttpResponse> {
console.log("making POST request to", url); enableTracing && console.log("making POST request to", url);
const resp = await Axios({ const resp = await Axios({
method: "post", method: "post",
url: url, url: url,
data: querystring.stringify(form), data: querystring.stringify(form),
responseType: "json", responseType: "json",
}); });
console.log("got response", resp.data); enableTracing && console.log("got response", resp.data);
console.log("resp type", typeof resp.data); enableTracing && console.log("resp type", typeof resp.data);
return { return {
responseJson: resp.data, responseJson: resp.data,
status: resp.status, status: resp.status,
@ -152,6 +155,66 @@ async function createBankReserve(
} }
} }
class MerchantBackendConnection {
constructor(
public merchantBaseUrl: string,
public merchantInstance: string,
public apiKey: string,
) {}
async createOrder(
amount: string,
summary: string,
fulfillmentUrl: string,
): Promise<{ orderId: string }> {
const reqUrl = new URI("order").absoluteTo(this.merchantBaseUrl).href();
const orderReq = {
order: {
amount,
summary,
fulfillment_url: fulfillmentUrl,
instance: this.merchantInstance,
},
};
const resp = await Axios({
method: "post",
url: reqUrl,
data: orderReq,
responseType: "json",
headers: {
Authorization: `ApiKey ${this.apiKey}`,
},
});
if (resp.status != 200) {
throw Error("failed to create bank reserve");
}
const orderId = resp.data.order_id;
if (!orderId) {
throw Error("no order id in response");
}
return { orderId };
}
async checkPayment(orderId: string): Promise<CheckPaymentResponse> {
const reqUrl = new URI("check-payment")
.absoluteTo(this.merchantBaseUrl)
.href();
const resp = await Axios({
method: "get",
url: reqUrl,
params: { order_id: orderId, instance: this.merchantInstance },
responseType: "json",
headers: {
Authorization: `ApiKey ${this.apiKey}`,
},
});
if (resp.status != 200) {
throw Error("failed to check payment");
}
return CheckPaymentResponse.checked(resp.data);
}
}
async function main() { async function main() {
const myNotifier = new ConsoleNotifier(); const myNotifier = new ConsoleNotifier();
@ -216,15 +279,54 @@ async function main() {
await myWallet.confirmReserve({ reservePub: reserveResponse.reservePub }); await myWallet.confirmReserve({ reservePub: reserveResponse.reservePub });
//await myWallet.waitForReserveDrained(reserveResponse.reservePub); await myWallet.processReserve(reserveResponse.reservePub);
//myWallet.clearNotification(); console.log("process reserve returned");
//myWallet.stop(); const balance = await myWallet.getBalances();
const dbContents = await exportDb(myDb); console.log(JSON.stringify(balance, null, 2));
console.log("db:", JSON.stringify(dbContents, null, 2)); const myMerchant = new MerchantBackendConnection(
"https://backend.test.taler.net/",
"default",
"sandbox",
);
const orderResp = await myMerchant.createOrder(
"TESTKUDOS:5",
"hello world",
"https://example.com/",
);
console.log("created order with orderId", orderResp.orderId);
const paymentStatus = await myMerchant.checkPayment(orderResp.orderId);
console.log("payment status", paymentStatus);
const contractUrl = paymentStatus.contract_url;
if (!contractUrl) {
throw Error("no contract URL in payment response");
}
const proposalId = await myWallet.downloadProposal(contractUrl);
console.log("proposal id", proposalId);
const checkPayResult = await myWallet.checkPay(proposalId);
console.log("check pay result", checkPayResult);
const confirmPayResult = await myWallet.confirmPay(proposalId, undefined);
console.log("confirmPayResult", confirmPayResult);
const paymentStatus2 = await myMerchant.checkPayment(orderResp.orderId);
console.log("payment status after wallet payment:", paymentStatus2);
myWallet.stop();
} }
main().catch(err => { main().catch(err => {

View File

@ -21,8 +21,8 @@
import { import {
QueryRoot, QueryRoot,
Store, Store,
openPromise,
} from "./query"; } from "./query";
import { openPromise } from "./promiseUtils";
/** /**
* Supported log levels. * Supported log levels.

39
src/promiseUtils.ts Normal file
View File

@ -0,0 +1,39 @@
/*
This file is part of GNU Taler
(C) 2019 GNUnet e.V.
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
export interface OpenedPromise<T> {
promise: Promise<T>;
resolve: (val: T) => void;
reject: (err: any) => void;
}
/**
* Get an unresolved promise together with its extracted resolve / reject
* function.
*/
export function openPromise<T>(): OpenedPromise<T> {
let resolve: ((x?: any) => void) | null = null;
let reject: ((reason?: any) => void) | null = null;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
if (!(resolve && reject)) {
// Never happens, unless JS implementation is broken
throw Error();
}
return { resolve, reject, promise };
}

View File

@ -14,13 +14,13 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/ */
/** /**
* Database query abstractions. * Database query abstractions.
* @module Query * @module Query
* @author Florian Dold * @author Florian Dold
*/ */
import { openPromise } from "./promiseUtils";
/** /**
* Result of an inner join. * Result of an inner join.
@ -38,18 +38,17 @@ export interface JoinLeftResult<L, R> {
right?: R; right?: R;
} }
/** /**
* Definition of an object store. * Definition of an object store.
*/ */
export class Store<T> { export class Store<T> {
constructor(public name: string, constructor(
public name: string,
public storeParams?: IDBObjectStoreParameters, public storeParams?: IDBObjectStoreParameters,
public validator?: (v: T) => T) { public validator?: (v: T) => T,
} ) {}
} }
/** /**
* Options for an index. * Options for an index.
*/ */
@ -63,7 +62,6 @@ export interface IndexOptions {
multiEntry?: boolean; multiEntry?: boolean;
} }
/** /**
* Definition of an index. * Definition of an index.
*/ */
@ -78,7 +76,12 @@ export class Index<S extends IDBValidKey, T> {
*/ */
options: IndexOptions; options: IndexOptions;
constructor(s: Store<T>, public indexName: string, public keyPath: string | string[], options?: IndexOptions) { constructor(
s: Store<T>,
public indexName: string,
public keyPath: string | string[],
options?: IndexOptions,
) {
const defaultOptions = { const defaultOptions = {
multiEntry: false, multiEntry: false,
}; };
@ -91,7 +94,7 @@ export class Index<S extends IDBValidKey, T> {
* because otherwise the compiler complains. In iterIndex the * because otherwise the compiler complains. In iterIndex the
* key type is pretty useful. * key type is pretty useful.
*/ */
protected _dummyKey: S|undefined; protected _dummyKey: S | undefined;
} }
/** /**
@ -104,21 +107,29 @@ export interface QueryStream<T> {
* The left side of the join is extracted via a function from the stream's * The left side of the join is extracted via a function from the stream's
* result, the right side of the join is the key of the index. * result, the right side of the join is the key of the index.
*/ */
indexJoin<S, I extends IDBValidKey>(index: Index<I, S>, keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>>; indexJoin<S, I extends IDBValidKey>(
index: Index<I, S>,
keyFn: (obj: T) => I,
): QueryStream<JoinResult<T, S>>;
/** /**
* Join the current query with values from an index, and keep values in the * Join the current query with values from an index, and keep values in the
* current stream that don't have a match. The left side of the join is * current stream that don't have a match. The left side of the join is
* extracted via a function from the stream's result, the right side of the * extracted via a function from the stream's result, the right side of the
* join is the key of the index. * join is the key of the index.
*/ */
indexJoinLeft<S, I extends IDBValidKey>(index: Index<I, S>, indexJoinLeft<S, I extends IDBValidKey>(
keyFn: (obj: T) => I): QueryStream<JoinLeftResult<T, S>>; index: Index<I, S>,
keyFn: (obj: T) => I,
): QueryStream<JoinLeftResult<T, S>>;
/** /**
* Join the current query with values from another object store. * Join the current query with values from another object store.
* The left side of the join is extracted via a function over the current query, * The left side of the join is extracted via a function over the current query,
* the right side of the join is the key of the object store. * the right side of the join is the key of the object store.
*/ */
keyJoin<S, I extends IDBValidKey>(store: Store<S>, keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>>; keyJoin<S, I extends IDBValidKey>(
store: Store<S>,
keyFn: (obj: T) => I,
): QueryStream<JoinResult<T, S>>;
/** /**
* Only keep elements in the result stream for which the predicate returns * Only keep elements in the result stream for which the predicate returns
@ -166,7 +177,6 @@ export interface QueryStream<T> {
run(): Promise<void>; run(): Promise<void>;
} }
/** /**
* Query result that consists of at most one value. * Query result that consists of at most one value.
*/ */
@ -184,20 +194,25 @@ export interface QueryValue<T> {
* branch). This is necessary since IndexedDB does not allow long-lived * branch). This is necessary since IndexedDB does not allow long-lived
* transactions. * transactions.
*/ */
cond<R>(f: (x: T) => boolean, onTrue: (r: QueryRoot) => R, onFalse: (r: QueryRoot) => R): Promise<void>; cond<R>(
f: (x: T) => boolean,
onTrue: (r: QueryRoot) => R,
onFalse: (r: QueryRoot) => R,
): Promise<void>;
} }
abstract class BaseQueryValue<T> implements QueryValue<T> { abstract class BaseQueryValue<T> implements QueryValue<T> {
constructor(public root: QueryRoot) {}
constructor(public root: QueryRoot) {
}
map<S>(f: (x: T) => S): QueryValue<S> { map<S>(f: (x: T) => S): QueryValue<S> {
return new MapQueryValue<T, S>(this, f); return new MapQueryValue<T, S>(this, f);
} }
cond<R>(f: (x: T) => boolean, onTrue: (r: QueryRoot) => R, onFalse: (r: QueryRoot) => R): Promise<void> { cond<R>(
f: (x: T) => boolean,
onTrue: (r: QueryRoot) => R,
onFalse: (r: QueryRoot) => R,
): Promise<void> {
return new Promise<void>((resolve, reject) => { return new Promise<void>((resolve, reject) => {
this.subscribeOne((v, tx) => { this.subscribeOne((v, tx) => {
if (f(v)) { if (f(v)) {
@ -247,37 +262,16 @@ class MapQueryValue<T, S> extends BaseQueryValue<S> {
} }
} }
/** /**
* Exception that should be thrown by client code to abort a transaction. * Exception that should be thrown by client code to abort a transaction.
*/ */
export const AbortTransaction = Symbol("abort_transaction"); export const AbortTransaction = Symbol("abort_transaction");
/**
* Get an unresolved promise together with its extracted resolve / reject
* function.
*/
export function openPromise<T>(): any {
let resolve: ((x?: any) => void) | null = null;
let reject: ((reason?: any) => void) | null = null;
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
if (!(resolve && reject)) {
// Never happens, unless JS implementation is broken
throw Error();
}
return {resolve, reject, promise};
}
abstract class QueryStreamBase<T> implements QueryStream<T> { abstract class QueryStreamBase<T> implements QueryStream<T> {
abstract subscribe(f: (isDone: boolean, abstract subscribe(
value: any, f: (isDone: boolean, value: any, tx: IDBTransaction) => void,
tx: IDBTransaction) => void): void; ): void;
constructor(public root: QueryRoot) { constructor(public root: QueryRoot) {}
}
first(): QueryValue<T> { first(): QueryValue<T> {
return new FirstQueryValue(this); return new FirstQueryValue(this);
@ -291,20 +285,36 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
return new QueryStreamMap(this, f); return new QueryStreamMap(this, f);
} }
indexJoin<S, I extends IDBValidKey>(index: Index<I, S>, indexJoin<S, I extends IDBValidKey>(
keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>> { index: Index<I, S>,
keyFn: (obj: T) => I,
): QueryStream<JoinResult<T, S>> {
this.root.addStoreAccess(index.storeName, false); this.root.addStoreAccess(index.storeName, false);
return new QueryStreamIndexJoin<T, S>(this, index.storeName, index.indexName, keyFn); return new QueryStreamIndexJoin<T, S>(
this,
index.storeName,
index.indexName,
keyFn,
);
} }
indexJoinLeft<S, I extends IDBValidKey>(index: Index<I, S>, indexJoinLeft<S, I extends IDBValidKey>(
keyFn: (obj: T) => I): QueryStream<JoinLeftResult<T, S>> { index: Index<I, S>,
keyFn: (obj: T) => I,
): QueryStream<JoinLeftResult<T, S>> {
this.root.addStoreAccess(index.storeName, false); this.root.addStoreAccess(index.storeName, false);
return new QueryStreamIndexJoinLeft<T, S>(this, index.storeName, index.indexName, keyFn); return new QueryStreamIndexJoinLeft<T, S>(
this,
index.storeName,
index.indexName,
keyFn,
);
} }
keyJoin<S, I extends IDBValidKey>(store: Store<S>, keyJoin<S, I extends IDBValidKey>(
keyFn: (obj: T) => I): QueryStream<JoinResult<T, S>> { store: Store<S>,
keyFn: (obj: T) => I,
): QueryStream<JoinResult<T, S>> {
this.root.addStoreAccess(store.name, false); this.root.addStoreAccess(store.name, false);
return new QueryStreamKeyJoin<T, S>(this, store.name, keyFn); return new QueryStreamKeyJoin<T, S>(this, store.name, keyFn);
} }
@ -314,7 +324,7 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
} }
toArray(): Promise<T[]> { toArray(): Promise<T[]> {
const {resolve, promise} = openPromise(); const { resolve, promise } = openPromise<T[]>();
const values: T[] = []; const values: T[] = [];
this.subscribe((isDone, value) => { this.subscribe((isDone, value) => {
@ -331,7 +341,7 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
} }
fold<A>(f: (x: T, acc: A) => A, init: A): Promise<A> { fold<A>(f: (x: T, acc: A) => A, init: A): Promise<A> {
const {resolve, promise} = openPromise(); const { resolve, promise } = openPromise<A>();
let acc = init; let acc = init;
this.subscribe((isDone, value) => { this.subscribe((isDone, value) => {
@ -348,7 +358,7 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
} }
forEach(f: (x: T) => void): Promise<void> { forEach(f: (x: T) => void): Promise<void> {
const {resolve, promise} = openPromise(); const { resolve, promise } = openPromise<void>();
this.subscribe((isDone, value) => { this.subscribe((isDone, value) => {
if (isDone) { if (isDone) {
@ -364,7 +374,7 @@ abstract class QueryStreamBase<T> implements QueryStream<T> {
} }
run(): Promise<void> { run(): Promise<void> {
const {resolve, promise} = openPromise(); const { resolve, promise } = openPromise<void>();
this.subscribe((isDone, value) => { this.subscribe((isDone, value) => {
if (isDone) { if (isDone) {
@ -401,7 +411,6 @@ class QueryStreamFilter<T> extends QueryStreamBase<T> {
} }
} }
class QueryStreamFlatMap<T, S> extends QueryStreamBase<S> { class QueryStreamFlatMap<T, S> extends QueryStreamBase<S> {
constructor(public s: QueryStreamBase<T>, public flatMapFn: (v: T) => S[]) { constructor(public s: QueryStreamBase<T>, public flatMapFn: (v: T) => S[]) {
super(s.root); super(s.root);
@ -421,7 +430,6 @@ class QueryStreamFlatMap<T, S> extends QueryStreamBase<S> {
} }
} }
class QueryStreamMap<S, T> extends QueryStreamBase<T> { class QueryStreamMap<S, T> extends QueryStreamBase<T> {
constructor(public s: QueryStreamBase<S>, public mapFn: (v: S) => T) { constructor(public s: QueryStreamBase<S>, public mapFn: (v: S) => T) {
super(s.root); super(s.root);
@ -439,10 +447,13 @@ class QueryStreamMap<S, T> extends QueryStreamBase<T> {
} }
} }
class QueryStreamIndexJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> { class QueryStreamIndexJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
constructor(public s: QueryStreamBase<T>, public storeName: string, public indexName: string, constructor(
public key: any) { public s: QueryStreamBase<T>,
public storeName: string,
public indexName: string,
public key: any,
) {
super(s.root); super(s.root);
} }
@ -457,7 +468,7 @@ class QueryStreamIndexJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
req.onsuccess = () => { req.onsuccess = () => {
const cursor = req.result; const cursor = req.result;
if (cursor) { if (cursor) {
f(false, {left: value, right: cursor.value}, tx); f(false, { left: value, right: cursor.value }, tx);
cursor.continue(); cursor.continue();
} }
}; };
@ -465,10 +476,15 @@ class QueryStreamIndexJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
} }
} }
class QueryStreamIndexJoinLeft<T, S> extends QueryStreamBase<
class QueryStreamIndexJoinLeft<T, S> extends QueryStreamBase<JoinLeftResult<T, S>> { JoinLeftResult<T, S>
constructor(public s: QueryStreamBase<T>, public storeName: string, public indexName: string, > {
public key: any) { constructor(
public s: QueryStreamBase<T>,
public storeName: string,
public indexName: string,
public key: any,
) {
super(s.root); super(s.root);
} }
@ -485,11 +501,11 @@ class QueryStreamIndexJoinLeft<T, S> extends QueryStreamBase<JoinLeftResult<T, S
const cursor = req.result; const cursor = req.result;
if (cursor) { if (cursor) {
gotMatch = true; gotMatch = true;
f(false, {left: value, right: cursor.value}, tx); f(false, { left: value, right: cursor.value }, tx);
cursor.continue(); cursor.continue();
} else { } else {
if (!gotMatch) { if (!gotMatch) {
f(false, {left: value}, tx); f(false, { left: value }, tx);
} }
} }
}; };
@ -497,10 +513,12 @@ class QueryStreamIndexJoinLeft<T, S> extends QueryStreamBase<JoinLeftResult<T, S
} }
} }
class QueryStreamKeyJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> { class QueryStreamKeyJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
constructor(public s: QueryStreamBase<T>, public storeName: string, constructor(
public key: any) { public s: QueryStreamBase<T>,
public storeName: string,
public key: any,
) {
super(s.root); super(s.root);
} }
@ -515,7 +533,7 @@ class QueryStreamKeyJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
req.onsuccess = () => { req.onsuccess = () => {
const cursor = req.result; const cursor = req.result;
if (cursor) { if (cursor) {
f(false, {left: value, right: cursor.value}, tx); f(false, { left: value, right: cursor.value }, tx);
cursor.continue(); cursor.continue();
} else { } else {
f(true, undefined, tx); f(true, undefined, tx);
@ -525,7 +543,6 @@ class QueryStreamKeyJoin<T, S> extends QueryStreamBase<JoinResult<T, S>> {
} }
} }
class IterQueryStream<T> extends QueryStreamBase<T> { class IterQueryStream<T> extends QueryStreamBase<T> {
private storeName: string; private storeName: string;
private options: any; private options: any;
@ -538,11 +555,10 @@ class IterQueryStream<T> extends QueryStreamBase<T> {
this.subscribers = []; this.subscribers = [];
const doIt = (tx: IDBTransaction) => { const doIt = (tx: IDBTransaction) => {
const {indexName = void 0, only = void 0} = this.options; const { indexName = void 0, only = void 0 } = this.options;
let s: any; let s: any;
if (indexName !== void 0) { if (indexName !== void 0) {
s = tx.objectStore(this.storeName) s = tx.objectStore(this.storeName).index(this.options.indexName);
.index(this.options.indexName);
} else { } else {
s = tx.objectStore(this.storeName); s = tx.objectStore(this.storeName);
} }
@ -574,12 +590,11 @@ class IterQueryStream<T> extends QueryStreamBase<T> {
} }
} }
/** /**
* Root wrapper around an IndexedDB for queries with a fluent interface. * Root wrapper around an IndexedDB for queries with a fluent interface.
*/ */
export class QueryRoot { export class QueryRoot {
private work: Array<((t: IDBTransaction) => void)> = []; private work: Array<(t: IDBTransaction) => void> = [];
private stores: Set<string> = new Set(); private stores: Set<string> = new Set();
private kickoffPromise: Promise<void>; private kickoffPromise: Promise<void>;
@ -595,13 +610,12 @@ export class QueryRoot {
private keys: { [keyName: string]: IDBValidKey } = {}; private keys: { [keyName: string]: IDBValidKey } = {};
constructor(public db: IDBDatabase) { constructor(public db: IDBDatabase) {}
}
/** /**
* Get a named key that was created during the query. * Get a named key that was created during the query.
*/ */
key(keyName: string): IDBValidKey|undefined { key(keyName: string): IDBValidKey | undefined {
return this.keys[keyName]; return this.keys[keyName];
} }
@ -626,7 +640,7 @@ export class QueryRoot {
*/ */
count<T>(store: Store<T>): Promise<number> { count<T>(store: Store<T>): Promise<number> {
this.checkFinished(); this.checkFinished();
const {resolve, promise} = openPromise(); const { resolve, promise } = openPromise<number>();
const doCount = (tx: IDBTransaction) => { const doCount = (tx: IDBTransaction) => {
const s = tx.objectStore(store.name); const s = tx.objectStore(store.name);
@ -640,13 +654,15 @@ export class QueryRoot {
return Promise.resolve() return Promise.resolve()
.then(() => this.finish()) .then(() => this.finish())
.then(() => promise); .then(() => promise);
} }
/** /**
* Delete all objects in a store that match a predicate. * Delete all objects in a store that match a predicate.
*/ */
deleteIf<T>(store: Store<T>, predicate: (x: T, n: number) => boolean): QueryRoot { deleteIf<T>(
store: Store<T>,
predicate: (x: T, n: number) => boolean,
): QueryRoot {
this.checkFinished(); this.checkFinished();
const doDeleteIf = (tx: IDBTransaction) => { const doDeleteIf = (tx: IDBTransaction) => {
const s = tx.objectStore(store.name); const s = tx.objectStore(store.name);
@ -666,8 +682,10 @@ export class QueryRoot {
return this; return this;
} }
iterIndex<S extends IDBValidKey, T>(index: Index<S, T>, iterIndex<S extends IDBValidKey, T>(
only?: S): QueryStream<T> { index: Index<S, T>,
only?: S,
): QueryStream<T> {
this.checkFinished(); this.checkFinished();
this.stores.add(index.storeName); this.stores.add(index.storeName);
this.scheduleFinish(); this.scheduleFinish();
@ -702,7 +720,7 @@ export class QueryRoot {
*/ */
putOrGetExisting<T>(store: Store<T>, val: T, key: IDBValidKey): Promise<T> { putOrGetExisting<T>(store: Store<T>, val: T, key: IDBValidKey): Promise<T> {
this.checkFinished(); this.checkFinished();
const {resolve, promise} = openPromise(); const { resolve, promise } = openPromise<T>();
const doPutOrGet = (tx: IDBTransaction) => { const doPutOrGet = (tx: IDBTransaction) => {
const objstore = tx.objectStore(store.name); const objstore = tx.objectStore(store.name);
const req = objstore.get(key); const req = objstore.get(key);
@ -722,10 +740,9 @@ export class QueryRoot {
return promise; return promise;
} }
putWithResult<T>(store: Store<T>, val: T): Promise<IDBValidKey> { putWithResult<T>(store: Store<T>, val: T): Promise<IDBValidKey> {
this.checkFinished(); this.checkFinished();
const {resolve, promise} = openPromise(); const { resolve, promise } = openPromise<IDBValidKey>();
const doPutWithResult = (tx: IDBTransaction) => { const doPutWithResult = (tx: IDBTransaction) => {
const req = tx.objectStore(store.name).put(val); const req = tx.objectStore(store.name).put(val);
req.onsuccess = () => { req.onsuccess = () => {
@ -739,14 +756,13 @@ export class QueryRoot {
.then(() => promise); .then(() => promise);
} }
/** /**
* Update objects inside a transaction. * Update objects inside a transaction.
* *
* If the mutation function throws AbortTransaction, the whole transaction will be aborted. * If the mutation function throws AbortTransaction, the whole transaction will be aborted.
* If the mutation function returns undefined or null, no modification will be made. * If the mutation function returns undefined or null, no modification will be made.
*/ */
mutate<T>(store: Store<T>, key: any, f: (v: T) => T|undefined): QueryRoot { mutate<T>(store: Store<T>, key: any, f: (v: T) => T | undefined): QueryRoot {
this.checkFinished(); this.checkFinished();
const doPut = (tx: IDBTransaction) => { const doPut = (tx: IDBTransaction) => {
const req = tx.objectStore(store.name).openCursor(IDBKeyRange.only(key)); const req = tx.objectStore(store.name).openCursor(IDBKeyRange.only(key));
@ -754,7 +770,7 @@ export class QueryRoot {
const cursor = req.result; const cursor = req.result;
if (cursor) { if (cursor) {
const value = cursor.value; const value = cursor.value;
let modifiedValue: T|undefined; let modifiedValue: T | undefined;
try { try {
modifiedValue = f(value); modifiedValue = f(value);
} catch (e) { } catch (e) {
@ -776,7 +792,6 @@ export class QueryRoot {
return this; return this;
} }
/** /**
* Add all object from an iterable to the given object store. * Add all object from an iterable to the given object store.
*/ */
@ -810,13 +825,13 @@ export class QueryRoot {
/** /**
* Get one object from a store by its key. * Get one object from a store by its key.
*/ */
get<T>(store: Store<T>, key: any): Promise<T|undefined> { get<T>(store: Store<T>, key: any): Promise<T | undefined> {
this.checkFinished(); this.checkFinished();
if (key === void 0) { if (key === void 0) {
throw Error("key must not be undefined"); throw Error("key must not be undefined");
} }
const {resolve, promise} = openPromise(); const { resolve, promise } = openPromise<T | undefined>();
const doGet = (tx: IDBTransaction) => { const doGet = (tx: IDBTransaction) => {
const req = tx.objectStore(store.name).get(key); const req = tx.objectStore(store.name).get(key);
@ -839,7 +854,7 @@ export class QueryRoot {
getMany<T>(store: Store<T>, keys: any[]): Promise<T[]> { getMany<T>(store: Store<T>, keys: any[]): Promise<T[]> {
this.checkFinished(); this.checkFinished();
const { resolve, promise } = openPromise(); const { resolve, promise } = openPromise<T[]>();
const results: T[] = []; const results: T[] = [];
const doGetMany = (tx: IDBTransaction) => { const doGetMany = (tx: IDBTransaction) => {
@ -866,17 +881,20 @@ export class QueryRoot {
/** /**
* Get one object from a store by its key. * Get one object from a store by its key.
*/ */
getIndexed<I extends IDBValidKey, T>(index: Index<I, T>, getIndexed<I extends IDBValidKey, T>(
key: I): Promise<T|undefined> { index: Index<I, T>,
key: I,
): Promise<T | undefined> {
this.checkFinished(); this.checkFinished();
if (key === void 0) { if (key === void 0) {
throw Error("key must not be undefined"); throw Error("key must not be undefined");
} }
const {resolve, promise} = openPromise<void>(); const { resolve, promise } = openPromise<T | undefined>();
const doGetIndexed = (tx: IDBTransaction) => { const doGetIndexed = (tx: IDBTransaction) => {
const req = tx.objectStore(index.storeName) const req = tx
.objectStore(index.storeName)
.index(index.indexName) .index(index.indexName)
.get(key); .get(key);
req.onsuccess = () => { req.onsuccess = () => {
@ -917,10 +935,12 @@ export class QueryRoot {
resolve(); resolve();
}; };
tx.onabort = () => { tx.onabort = () => {
console.warn(`aborted ${mode} transaction on stores [${[... this.stores]}]`); console.warn(
`aborted ${mode} transaction on stores [${[...this.stores]}]`,
);
reject(Error("transaction aborted")); reject(Error("transaction aborted"));
}; };
tx.onerror = (e) => { tx.onerror = e => {
console.warn(`error in transaction`, (e.target as any).error); console.warn(`error in transaction`, (e.target as any).error);
}; };
for (const w of this.work) { for (const w of this.work) {
@ -946,9 +966,11 @@ export class QueryRoot {
/** /**
* Low-level function to add a task to the internal work queue. * Low-level function to add a task to the internal work queue.
*/ */
addWork(workFn: (t: IDBTransaction) => void, addWork(
workFn: (t: IDBTransaction) => void,
storeName?: string, storeName?: string,
isWrite?: boolean) { isWrite?: boolean,
) {
this.work.push(workFn); this.work.push(workFn);
if (storeName) { if (storeName) {
this.addStoreAccess(storeName, isWrite); this.addStoreAccess(storeName, isWrite);

View File

@ -905,3 +905,30 @@ export class Proposal {
*/ */
static checked: (obj: any) => Proposal; static checked: (obj: any) => Proposal;
} }
/**
* Response from the internal merchant API.
*/
@Checkable.Class({extra: true})
export class CheckPaymentResponse {
@Checkable.Boolean()
paid: boolean;
@Checkable.Optional(Checkable.Boolean())
refunded: boolean | undefined;
@Checkable.Optional(Checkable.String())
refunded_amount: string | undefined;
@Checkable.Optional(Checkable.Value(() => ContractTerms))
contract_terms: ContractTerms | undefined;
@Checkable.Optional(Checkable.String())
contract_url: string | undefined;
/**
* Verify that a value matches the schema of this class and convert it into a
* member.
*/
static checked: (obj: any) => CheckPaymentResponse;
}

View File

@ -105,6 +105,7 @@ import {
WalletBalance, WalletBalance,
WalletBalanceEntry, WalletBalanceEntry,
} from "./walletTypes"; } from "./walletTypes";
import { openPromise } from "./promiseUtils";
interface SpeculativePayData { interface SpeculativePayData {
payCoinInfo: PayCoinInfo; payCoinInfo: PayCoinInfo;
@ -327,6 +328,7 @@ export class Wallet {
* IndexedDB database used by the wallet. * IndexedDB database used by the wallet.
*/ */
db: IDBDatabase; db: IDBDatabase;
private enableTracing = false;
private http: HttpRequestLibrary; private http: HttpRequestLibrary;
private badge: Badge; private badge: Badge;
private notifier: Notifier; private notifier: Notifier;
@ -337,6 +339,12 @@ export class Wallet {
private speculativePayData: SpeculativePayData | undefined; private speculativePayData: SpeculativePayData | undefined;
private cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {}; private cachedNextUrl: { [fulfillmentUrl: string]: NextUrlResult } = {};
private activeTipOperations: { [s: string]: Promise<TipRecord> } = {}; private activeTipOperations: { [s: string]: Promise<TipRecord> } = {};
private activeProcessReserveOperations: {
[reservePub: string]: Promise<void>;
} = {};
private activeProcessPreCoinOperations: {
[preCoinPub: string]: Promise<void>;
} = {};
/** /**
* Set of identifiers for running operations. * Set of identifiers for running operations.
@ -426,14 +434,14 @@ export class Wallet {
.iter(Stores.reserves) .iter(Stores.reserves)
.forEach(reserve => { .forEach(reserve => {
console.log("resuming reserve", reserve.reserve_pub); console.log("resuming reserve", reserve.reserve_pub);
this.processReserve(reserve); this.processReserve(reserve.reserve_pub);
}); });
this.q() this.q()
.iter(Stores.precoins) .iter(Stores.precoins)
.forEach(preCoin => { .forEach(preCoin => {
console.log("resuming precoin"); console.log("resuming precoin");
this.processPreCoin(preCoin); this.processPreCoin(preCoin.coinPub);
}); });
this.q() this.q()
@ -1073,16 +1081,25 @@ export class Wallet {
* First fetch information requred to withdraw from the reserve, * First fetch information requred to withdraw from the reserve,
* then deplete the reserve, withdrawing coins until it is empty. * then deplete the reserve, withdrawing coins until it is empty.
*/ */
private async processReserve( async processReserve(reservePub: string): Promise<void> {
reserveRecord: ReserveRecord, const activeOperation = this.activeProcessReserveOperations[reservePub];
retryDelayMs: number = 250,
): Promise<void> { if (activeOperation) {
const opId = "reserve-" + reserveRecord.reserve_pub; return activeOperation;
}
const opId = "reserve-" + reservePub;
this.startOperation(opId); this.startOperation(opId);
// This opened promise gets resolved only once the
// reserve withdraw operation succeeds, even after retries.
const op = openPromise<void>();
const processReserveInternal = async (retryDelayMs: number = 250) => {
try { try {
const reserve = await this.updateReserve(reserveRecord.reserve_pub); const reserve = await this.updateReserve(reservePub);
await this.depleteReserve(reserve); await this.depleteReserve(reserve);
op.resolve();
} catch (e) { } catch (e) {
// random, exponential backoff truncated at 3 minutes // random, exponential backoff truncated at 3 minutes
const nextDelay = Math.min( const nextDelay = Math.min(
@ -1093,33 +1110,54 @@ export class Wallet {
`Failed to deplete reserve, trying again in ${retryDelayMs} ms`, `Failed to deplete reserve, trying again in ${retryDelayMs} ms`,
); );
this.timerGroup.after(retryDelayMs, () => this.timerGroup.after(retryDelayMs, () =>
this.processReserve(reserveRecord, nextDelay), processReserveInternal(nextDelay),
); );
}
};
try {
processReserveInternal();
this.activeProcessReserveOperations[reservePub] = op.promise;
await op.promise;
} finally { } finally {
this.stopOperation(opId); this.stopOperation(opId);
delete this.activeProcessReserveOperations[reservePub];
} }
} }
/** /**
* Given a planchet, withdraw a coin from the exchange. * Given a planchet, withdraw a coin from the exchange.
*/ */
private async processPreCoin( private async processPreCoin(preCoinPub: string): Promise<void> {
preCoin: PreCoinRecord, const activeOperation = this.activeProcessPreCoinOperations[preCoinPub];
retryDelayMs = 200, if (activeOperation) {
): Promise<void> { return activeOperation;
// Throttle concurrent executions of this function, so we don't withdraw too many coins at once. }
const op = openPromise<void>();
const processPreCoinInternal = async (retryDelayMs: number = 200) => {
const preCoin = await this.q().get(Stores.precoins, preCoinPub);
if (!preCoin) {
console.log("processPreCoin: preCoinPub not found");
return;
}
// Throttle concurrent executions of this function,
// so we don't withdraw too many coins at once.
if ( if (
this.processPreCoinConcurrent >= 4 || this.processPreCoinConcurrent >= 4 ||
this.processPreCoinThrottle[preCoin.exchangeBaseUrl] this.processPreCoinThrottle[preCoin.exchangeBaseUrl]
) { ) {
console.log("delaying processPreCoin"); this.enableTracing && console.log("delaying processPreCoin");
this.timerGroup.after(retryDelayMs, () => this.timerGroup.after(retryDelayMs, () =>
this.processPreCoin(preCoin, Math.min(retryDelayMs * 2, 5 * 60 * 1000)), processPreCoinInternal(Math.min(retryDelayMs * 2, 5 * 60 * 1000)),
); );
return; return op.promise;
} }
console.log("executing processPreCoin", preCoin);
//console.log("executing processPreCoin", preCoin);
this.processPreCoinConcurrent++; this.processPreCoinConcurrent++;
try { try {
const exchange = await this.q().get( const exchange = await this.q().get(
Stores.exchanges, Stores.exchanges,
@ -1139,15 +1177,8 @@ export class Wallet {
} }
const coin = await this.withdrawExecute(preCoin); const coin = await this.withdrawExecute(preCoin);
console.log("processPreCoin: got coin", coin);
const mutateReserve = (r: ReserveRecord) => { const mutateReserve = (r: ReserveRecord) => {
console.log(
`before committing coin: current ${amountToPretty(
r.current_amount!,
)}, precoin: ${amountToPretty(r.precoin_amount)})}`,
);
const x = Amounts.sub( const x = Amounts.sub(
r.precoin_amount, r.precoin_amount,
preCoin.coinValue, preCoin.coinValue,
@ -1196,6 +1227,7 @@ export class Wallet {
} }
this.notifier.notify(); this.notifier.notify();
op.resolve();
} catch (e) { } catch (e) {
console.error( console.error(
"Failed to withdraw coin from precoin, retrying in", "Failed to withdraw coin from precoin, retrying in",
@ -1206,7 +1238,7 @@ export class Wallet {
// exponential backoff truncated at one minute // exponential backoff truncated at one minute
const nextRetryDelayMs = Math.min(retryDelayMs * 2, 5 * 60 * 1000); const nextRetryDelayMs = Math.min(retryDelayMs * 2, 5 * 60 * 1000);
this.timerGroup.after(retryDelayMs, () => this.timerGroup.after(retryDelayMs, () =>
this.processPreCoin(preCoin, nextRetryDelayMs), processPreCoinInternal(nextRetryDelayMs),
); );
const currentThrottle = const currentThrottle =
@ -1219,6 +1251,15 @@ export class Wallet {
} finally { } finally {
this.processPreCoinConcurrent--; this.processPreCoinConcurrent--;
} }
};
try {
this.activeProcessPreCoinOperations[preCoinPub] = op.promise;
await processPreCoinInternal();
return op.promise;
} finally {
delete this.activeProcessPreCoinOperations[preCoinPub];
}
} }
/** /**
@ -1332,10 +1373,9 @@ export class Wallet {
.finish(); .finish();
this.notifier.notify(); this.notifier.notify();
this.processReserve(reserve); this.processReserve(reserve.reserve_pub);
} }
private async withdrawExecute(pc: PreCoinRecord): Promise<CoinRecord> { private async withdrawExecute(pc: PreCoinRecord): Promise<CoinRecord> {
const wd: any = {}; const wd: any = {};
wd.denom_pub_hash = pc.denomPubHash; wd.denom_pub_hash = pc.denomPubHash;
@ -1424,20 +1464,22 @@ export class Wallet {
r.timestamp_depleted = new Date().getTime(); r.timestamp_depleted = new Date().getTime();
} }
console.log(
`after creating precoin: current ${amountToPretty(
r.current_amount,
)}, precoin: ${amountToPretty(r.precoin_amount)})}`,
);
return r; return r;
} }
const preCoin = await this.cryptoApi.createPreCoin(denom, reserve); const preCoin = await this.cryptoApi.createPreCoin(denom, reserve);
// This will fail and throw an exception if the remaining amount in the
// reserve is too low to create a pre-coin.
try {
await this.q() await this.q()
.put(Stores.precoins, preCoin) .put(Stores.precoins, preCoin)
.mutate(Stores.reserves, reserve.reserve_pub, mutateReserve); .mutate(Stores.reserves, reserve.reserve_pub, mutateReserve)
await this.processPreCoin(preCoin); .finish();
} catch (e) {
console.log("can't create pre-coin:", e.name, e.message);
return;
}
await this.processPreCoin(preCoin.coinPub);
}); });
await Promise.all(ps); await Promise.all(ps);
@ -1746,7 +1788,10 @@ export class Wallet {
return ret; return ret;
} }
async getExchangePaytoUri(exchangeBaseUrl: string, supportedTargetTypes: string[]): Promise<string> { async getExchangePaytoUri(
exchangeBaseUrl: string,
supportedTargetTypes: string[],
): Promise<string> {
const wireInfo = await this.getWireInfo(exchangeBaseUrl); const wireInfo = await this.getWireInfo(exchangeBaseUrl);
for (let account of wireInfo.accounts) { for (let account of wireInfo.accounts) {
const paytoUri = new URI(account.url); const paytoUri = new URI(account.url);
@ -1820,8 +1865,6 @@ export class Wallet {
throw Error("exchange doesn't offer any denominations"); throw Error("exchange doesn't offer any denominations");
} }
console.log("updating exchange with wireMethodDetails", wireMethodDetails);
const r = await this.q().get<ExchangeRecord>(Stores.exchanges, baseUrl); const r = await this.q().get<ExchangeRecord>(Stores.exchanges, baseUrl);
let exchangeInfo: ExchangeRecord; let exchangeInfo: ExchangeRecord;
@ -2714,7 +2757,7 @@ export class Wallet {
*/ */
stop() { stop() {
this.timerGroup.stopCurrentAndFutureTimers(); this.timerGroup.stopCurrentAndFutureTimers();
this.cryptoApi.terminateWorkers(); this.cryptoApi.stop();
} }
async getSenderWireInfos(): Promise<SenderWireInfos> { async getSenderWireInfos(): Promise<SenderWireInfos> {
@ -3199,7 +3242,7 @@ export class Wallet {
withdrawSig: response.reserve_sigs[i].reserve_sig, withdrawSig: response.reserve_sigs[i].reserve_sig,
}; };
await this.q().put(Stores.precoins, preCoin); await this.q().put(Stores.precoins, preCoin);
this.processPreCoin(preCoin); this.processPreCoin(preCoin.coinPub);
} }
tipRecord.pickedUp = true; tipRecord.pickedUp = true;

View File

@ -48,6 +48,7 @@
"src/libtoolVersion-test.ts", "src/libtoolVersion-test.ts",
"src/libtoolVersion.ts", "src/libtoolVersion.ts",
"src/logging.ts", "src/logging.ts",
"src/promiseUtils.ts",
"src/query.ts", "src/query.ts",
"src/talerTypes.ts", "src/talerTypes.ts",
"src/timer.ts", "src/timer.ts",