diff options
author | Florian Dold <florian.dold@gmail.com> | 2016-10-13 20:02:42 +0200 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2016-10-13 20:02:42 +0200 |
commit | 0b198e08888830890622e983445c75f947186b4c (patch) | |
tree | 562eb58178c57f44c885adbf2bf52c578f1a32a8 /lib/wallet/cryptoApi.ts | |
parent | d3b49c0a2fb5d8a888e533fd63545103abd919c5 (diff) |
refactor work queue
Diffstat (limited to 'lib/wallet/cryptoApi.ts')
-rw-r--r-- | lib/wallet/cryptoApi.ts | 149 |
1 files changed, 67 insertions, 82 deletions
diff --git a/lib/wallet/cryptoApi.ts b/lib/wallet/cryptoApi.ts index ec20dd964..88b82ae3b 100644 --- a/lib/wallet/cryptoApi.ts +++ b/lib/wallet/cryptoApi.ts @@ -28,11 +28,6 @@ import {CoinWithDenom} from "./wallet"; import {PayCoinInfo} from "./types"; import {RefreshSession} from "./types"; -interface RegistryEntry { - resolve: any; - reject: any; - workerIndex: number; -} interface WorkerState { /** @@ -41,19 +36,14 @@ interface WorkerState { w: Worker|null; /** - * Are we currently running a task on this worker? + * Work we're currently executing or null if not busy. */ - busy: boolean; + currentWorkItem: WorkItem|null; /** * Timer to terminate the worker if it's not busy enough. */ terminationTimerHandle: number|null; - - /** - * Index of this worker in the list of workers. - */ - workerIndex: number; } interface WorkItem { @@ -61,6 +51,11 @@ interface WorkItem { args: any[]; resolve: any; reject: any; + + /** + * Serial id to identify a matching response. + */ + rpcId: number; } @@ -72,27 +67,21 @@ const NUM_PRIO = 5; export class CryptoApi { private nextRpcId: number = 1; - private rpcRegistry: {[n: number]: RegistryEntry} = {}; private workers: WorkerState[]; private workQueues: WorkItem[][]; /** * Number of busy workers. */ private numBusy: number = 0; - /** - * Number if pending work items. - */ - private numWaiting: number = 0; - /** * Start a worker (if not started) and set as busy. */ - wake(ws: WorkerState) { - if (ws.busy) { + wake<T>(ws: WorkerState, work: WorkItem): void { + if (ws.currentWorkItem != null) { throw Error("assertion failed"); } - ws.busy = true; + ws.currentWorkItem = work; this.numBusy++; if (!ws.w) { let w = new Worker("/lib/wallet/cryptoWorker.js"); @@ -100,27 +89,45 @@ export class CryptoApi { w.onerror = (e: ErrorEvent) => this.handleWorkerError(ws, e); ws.w = w; } + + let msg: any = { + operation: work.operation, args: work.args, + id: work.rpcId + }; + this.resetWorkerTimeout(ws); + ws.w!.postMessage(msg); + } + + resetWorkerTimeout(ws: WorkerState) { if (ws.terminationTimerHandle != null) { clearTimeout(ws.terminationTimerHandle); } let destroy = () => { - if (ws.w && !ws.busy) { + // terminate worker if it's idle + if (ws.w && ws.currentWorkItem == null) { ws.w!.terminate(); ws.w = null; } - } + }; ws.terminationTimerHandle = setTimeout(destroy, 20 * 1000); } handleWorkerError(ws: WorkerState, e: ErrorEvent) { - console.error("error in worker", e); + if (ws.currentWorkItem) { + console.error(`error in worker during ${ws.currentWorkItem!.operation}`, e); + } else { + console.error("error in worker", e); + } + console.error(e.message); try { ws.w!.terminate(); + ws.w = null; } catch (e) { console.error(e); } - if (ws.busy) { - ws.busy = false; + if (ws.currentWorkItem != null) { + ws.currentWorkItem.reject(e); + ws.currentWorkItem = null; this.numBusy--; } this.findWork(ws); @@ -132,14 +139,7 @@ export class CryptoApi { let q = this.workQueues[NUM_PRIO - i - 1]; if (q.length != 0) { let work: WorkItem = q.shift()!; - let msg: any = { - operation: work.operation, - args: work.args, - id: this.registerRpcId(work.resolve, work.reject, ws.workerIndex), - }; - - this.wake(ws); - ws.w!.postMessage(msg); + this.wake(ws, work); return; } } @@ -151,22 +151,19 @@ export class CryptoApi { console.error("rpc id must be number"); return; } - if (!this.rpcRegistry[id]) { - console.error(`RPC with id ${id} has no registry entry`); + let currentWorkItem = ws.currentWorkItem; + ws.currentWorkItem = null; + this.numBusy--; + this.findWork(ws); + if (!currentWorkItem) { + console.error("unsolicited response from worker"); return; } - let {resolve, workerIndex} = this.rpcRegistry[id]; - delete this.rpcRegistry[id]; - if (workerIndex != ws.workerIndex) { - throw Error("assertion failed"); - } - if (!ws.busy) { - throw Error("assertion failed"); + if (id != currentWorkItem.rpcId) { + console.error(`RPC with id ${id} has no registry entry`); + return; } - ws.busy = false; - this.numBusy--; - resolve(msg.data.result); - this.findWork(ws); + currentWorkItem.resolve(msg.data.result); } constructor() { @@ -175,9 +172,8 @@ export class CryptoApi { for (let i = 0; i < this.workers.length; i++) { this.workers[i] = { w: null, - busy: false, terminationTimerHandle: null, - workerIndex: i, + currentWorkItem: null, }; } this.workQueues = []; @@ -186,45 +182,34 @@ export class CryptoApi { } } - - private registerRpcId(resolve: any, reject: any, - workerIndex: number): number { - let id = this.nextRpcId++; - this.rpcRegistry[id] = {resolve, reject, workerIndex}; - return id; - } - - private doRpc<T>(operation: string, priority: number, ...args: any[]): Promise<T> { - if (this.numBusy == this.workers.length) { - let q = this.workQueues[priority]; - if (!q) { - throw Error("assertion failed"); - } - return new Promise<T>((resolve, reject) => { - this.workQueues[priority].push({operation, args, resolve, reject}); - }); - } - for (let i = 0; i < this.workers.length; i++) { - let ws = this.workers[i]; - if (ws.busy) { - continue; + return new Promise((resolve, reject) => { + let rpcId = this.nextRpcId++; + let workItem: WorkItem = {operation, args, resolve, reject, rpcId}; + + if (this.numBusy == this.workers.length) { + let q = this.workQueues[priority]; + if (!q) { + throw Error("assertion failed"); + } + this.workQueues[priority].push(workItem); + return; } - this.wake(ws); + for (let i = 0; i < this.workers.length; i++) { + let ws = this.workers[i]; + if (ws.currentWorkItem != null) { + continue; + } - return new Promise<T>((resolve, reject) => { - let msg: any = { - operation, args, - id: this.registerRpcId(resolve, reject, i), - }; - ws.w!.postMessage(msg); - }); - } + this.wake<T>(ws, workItem); + return; + } - throw Error("assertion failed"); + throw Error("assertion failed"); + }); } @@ -270,4 +255,4 @@ export class CryptoApi { meltAmount, meltFee); } -} +}
\ No newline at end of file |