diff options
author | Florian Dold <florian.dold@gmail.com> | 2016-11-03 01:33:53 +0100 |
---|---|---|
committer | Florian Dold <florian.dold@gmail.com> | 2016-11-03 01:33:53 +0100 |
commit | d1291f67551c58168af43698a359cb5ddfd266b0 (patch) | |
tree | 55a13ed29fe1915e3f42f1b1b7038dafa2e975a7 /node_modules/selenium-webdriver/lib/promise.js | |
parent | d0a0695fb5d34996850723f7d4b1b59c3df909c2 (diff) |
node_modules
Diffstat (limited to 'node_modules/selenium-webdriver/lib/promise.js')
-rw-r--r-- | node_modules/selenium-webdriver/lib/promise.js | 3039 |
1 files changed, 3039 insertions, 0 deletions
diff --git a/node_modules/selenium-webdriver/lib/promise.js b/node_modules/selenium-webdriver/lib/promise.js new file mode 100644 index 000000000..b98e7cf2e --- /dev/null +++ b/node_modules/selenium-webdriver/lib/promise.js @@ -0,0 +1,3039 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/** + * @fileoverview + * The promise module is centered around the {@linkplain ControlFlow}, a class + * that coordinates the execution of asynchronous tasks. The ControlFlow allows + * users to focus on the imperative commands for their script without worrying + * about chaining together every single asynchronous action, which can be + * tedious and verbose. APIs may be layered on top of the control flow to read + * as if they were synchronous. For instance, the core + * {@linkplain ./webdriver.WebDriver WebDriver} API is built on top of the + * control flow, allowing users to write + * + * driver.get('http://www.google.com/ncr'); + * driver.findElement({name: 'q'}).sendKeys('webdriver'); + * driver.findElement({name: 'btnGn'}).click(); + * + * instead of + * + * driver.get('http://www.google.com/ncr') + * .then(function() { + * return driver.findElement({name: 'q'}); + * }) + * .then(function(q) { + * return q.sendKeys('webdriver'); + * }) + * .then(function() { + * return driver.findElement({name: 'btnG'}); + * }) + * .then(function(btnG) { + * return btnG.click(); + * }); + * + * ## Tasks and Task Queues + * + * The control flow is based on the concept of tasks and task queues. Tasks are + * functions that define the basic unit of work for the control flow to execute. + * Each task is scheduled via {@link ControlFlow#execute()}, which will return + * a {@link ManagedPromise ManagedPromise} that will be resolved with the task's + * result. + * + * A task queue contains all of the tasks scheduled within a single turn of the + * [JavaScript event loop][JSEL]. The control flow will create a new task queue + * the first time a task is scheduled within an event loop. + * + * var flow = promise.controlFlow(); + * flow.execute(foo); // Creates a new task queue and inserts foo. + * flow.execute(bar); // Inserts bar into the same queue as foo. + * setTimeout(function() { + * flow.execute(baz); // Creates a new task queue and inserts baz. + * }, 0); + * + * Whenever the control flow creates a new task queue, it will automatically + * begin executing tasks in the next available turn of the event loop. This + * execution is scheduled using a "micro-task" timer, such as a (native) + * `ManagedPromise.then()` callback. + * + * setTimeout(() => console.log('a')); + * ManagedPromise.resolve().then(() => console.log('b')); // A native promise. + * flow.execute(() => console.log('c')); + * ManagedPromise.resolve().then(() => console.log('d')); + * setTimeout(() => console.log('fin')); + * // b + * // c + * // d + * // a + * // fin + * + * In the example above, b/c/d is logged before a/fin because native promises + * and this module use "micro-task" timers, which have a higher priority than + * "macro-tasks" like `setTimeout`. + * + * ## Task Execution + * + * Upon creating a task queue, and whenever an exisiting queue completes a task, + * the control flow will schedule a micro-task timer to process any scheduled + * tasks. This ensures no task is ever started within the same turn of the + * JavaScript event loop in which it was scheduled, nor is a task ever started + * within the same turn that another finishes. + * + * When the execution timer fires, a single task will be dequeued and executed. + * There are several important events that may occur while executing a task + * function: + * + * 1. A new task queue is created by a call to {@link ControlFlow#execute()}. + * Any tasks scheduled within this task queue are considered subtasks of the + * current task. + * 2. The task function throws an error. Any scheduled tasks are immediately + * discarded and the task's promised result (previously returned by + * {@link ControlFlow#execute()}) is immediately rejected with the thrown + * error. + * 3. The task function returns sucessfully. + * + * If a task function created a new task queue, the control flow will wait for + * that queue to complete before processing the task result. If the queue + * completes without error, the flow will settle the task's promise with the + * value originaly returned by the task function. On the other hand, if the task + * queue termintes with an error, the task's promise will be rejected with that + * error. + * + * flow.execute(function() { + * flow.execute(() => console.log('a')); + * flow.execute(() => console.log('b')); + * }); + * flow.execute(() => console.log('c')); + * // a + * // b + * // c + * + * ## ManagedPromise Integration + * + * In addition to the {@link ControlFlow} class, the promise module also exports + * a [ManagedPromise/A+] {@linkplain ManagedPromise implementation} that is deeply + * integrated with the ControlFlow. First and foremost, each promise + * {@linkplain ManagedPromise#then() callback} is scheduled with the + * control flow as a task. As a result, each callback is invoked in its own turn + * of the JavaScript event loop with its own task queue. If any tasks are + * scheduled within a callback, the callback's promised result will not be + * settled until the task queue has completed. + * + * promise.fulfilled().then(function() { + * flow.execute(function() { + * console.log('b'); + * }); + * }).then(() => console.log('a')); + * // b + * // a + * + * ### Scheduling ManagedPromise Callbacks <a id="scheduling_callbacks"></a> + * + * How callbacks are scheduled in the control flow depends on when they are + * attached to the promise. Callbacks attached to a _previously_ resolved + * promise are immediately enqueued as subtasks of the currently running task. + * + * var p = promise.fulfilled(); + * flow.execute(function() { + * flow.execute(() => console.log('A')); + * p.then( () => console.log('B')); + * flow.execute(() => console.log('C')); + * p.then( () => console.log('D')); + * }).then(function() { + * console.log('fin'); + * }); + * // A + * // B + * // C + * // D + * // fin + * + * When a promise is resolved while a task function is on the call stack, any + * callbacks also registered in that stack frame are scheduled as if the promise + * were already resolved: + * + * var d = promise.defer(); + * flow.execute(function() { + * flow.execute( () => console.log('A')); + * d.promise.then(() => console.log('B')); + * flow.execute( () => console.log('C')); + * d.promise.then(() => console.log('D')); + * + * d.fulfill(); + * }).then(function() { + * console.log('fin'); + * }); + * // A + * // B + * // C + * // D + * // fin + * + * Callbacks attached to an _unresolved_ promise within a task function are + * only weakly scheduled as subtasks and will be dropped if they reach the + * front of the queue before the promise is resolved. In the example below, the + * callbacks for `B` & `D` are dropped as sub-tasks since they are attached to + * an unresolved promise when they reach the front of the task queue. + * + * var d = promise.defer(); + * flow.execute(function() { + * flow.execute( () => console.log('A')); + * d.promise.then(() => console.log('B')); + * flow.execute( () => console.log('C')); + * d.promise.then(() => console.log('D')); + * + * setTimeout(d.fulfill, 20); + * }).then(function() { + * console.log('fin') + * }); + * // A + * // C + * // fin + * // B + * // D + * + * If a promise is resolved while a task function is on the call stack, any + * previously registered and unqueued callbacks (i.e. either attached while no + * task was on the call stack, or previously dropped as described above) act as + * _interrupts_ and are inserted at the front of the task queue. If multiple + * promises are fulfilled, their interrupts are enqueued in the order the + * promises are resolved. + * + * var d1 = promise.defer(); + * d1.promise.then(() => console.log('A')); + * + * var d2 = promise.defer(); + * d2.promise.then(() => console.log('B')); + * + * flow.execute(function() { + * d1.promise.then(() => console.log('C')); + * flow.execute(() => console.log('D')); + * }); + * flow.execute(function() { + * flow.execute(() => console.log('E')); + * flow.execute(() => console.log('F')); + * d1.fulfill(); + * d2.fulfill(); + * }).then(function() { + * console.log('fin'); + * }); + * // D + * // A + * // C + * // B + * // E + * // F + * // fin + * + * Within a task function (or callback), each step of a promise chain acts as + * an interrupt on the task queue: + * + * var d = promise.defer(); + * flow.execute(function() { + * d.promise. + * then(() => console.log('A')). + * then(() => console.log('B')). + * then(() => console.log('C')). + * then(() => console.log('D')); + * + * flow.execute(() => console.log('E')); + * d.fulfill(); + * }).then(function() { + * console.log('fin'); + * }); + * // A + * // B + * // C + * // D + * // E + * // fin + * + * If there are multiple promise chains derived from a single promise, they are + * processed in the order created: + * + * var d = promise.defer(); + * flow.execute(function() { + * var chain = d.promise.then(() => console.log('A')); + * + * chain.then(() => console.log('B')). + * then(() => console.log('C')); + * + * chain.then(() => console.log('D')). + * then(() => console.log('E')); + * + * flow.execute(() => console.log('F')); + * + * d.fulfill(); + * }).then(function() { + * console.log('fin'); + * }); + * // A + * // B + * // C + * // D + * // E + * // F + * // fin + * + * Even though a subtask's promised result will never resolve while the task + * function is on the stack, it will be treated as a promise resolved within the + * task. In all other scenarios, a task's promise behaves just like a normal + * promise. In the sample below, `C/D` is loggged before `B` because the + * resolution of `subtask1` interrupts the flow of the enclosing task. Within + * the final subtask, `E/F` is logged in order because `subtask1` is a resolved + * promise when that task runs. + * + * flow.execute(function() { + * var subtask1 = flow.execute(() => console.log('A')); + * var subtask2 = flow.execute(() => console.log('B')); + * + * subtask1.then(() => console.log('C')); + * subtask1.then(() => console.log('D')); + * + * flow.execute(function() { + * flow.execute(() => console.log('E')); + * subtask1.then(() => console.log('F')); + * }); + * }).then(function() { + * console.log('fin'); + * }); + * // A + * // C + * // D + * // B + * // E + * // F + * // fin + * + * Finally, consider the following: + * + * var d = promise.defer(); + * d.promise.then(() => console.log('A')); + * d.promise.then(() => console.log('B')); + * + * flow.execute(function() { + * flow.execute( () => console.log('C')); + * d.promise.then(() => console.log('D')); + * + * flow.execute( () => console.log('E')); + * d.promise.then(() => console.log('F')); + * + * d.fulfill(); + * + * flow.execute( () => console.log('G')); + * d.promise.then(() => console.log('H')); + * }).then(function() { + * console.log('fin'); + * }); + * // A + * // B + * // C + * // D + * // E + * // F + * // G + * // H + * // fin + * + * In this example, callbacks are registered on `d.promise` both before and + * during the invocation of the task function. When `d.fulfill()` is called, + * the callbacks registered before the task (`A` & `B`) are registered as + * interrupts. The remaining callbacks were all attached within the task and + * are scheduled in the flow as standard tasks. + * + * ## Generator Support + * + * [Generators][GF] may be scheduled as tasks within a control flow or attached + * as callbacks to a promise. Each time the generator yields a promise, the + * control flow will wait for that promise to settle before executing the next + * iteration of the generator. The yielded promise's fulfilled value will be + * passed back into the generator: + * + * flow.execute(function* () { + * var d = promise.defer(); + * + * setTimeout(() => console.log('...waiting...'), 25); + * setTimeout(() => d.fulfill(123), 50); + * + * console.log('start: ' + Date.now()); + * + * var value = yield d.promise; + * console.log('mid: %d; value = %d', Date.now(), value); + * + * yield promise.delayed(10); + * console.log('end: ' + Date.now()); + * }).then(function() { + * console.log('fin'); + * }); + * // start: 0 + * // ...waiting... + * // mid: 50; value = 123 + * // end: 60 + * // fin + * + * Yielding the result of a promise chain will wait for the entire chain to + * complete: + * + * promise.fulfilled().then(function* () { + * console.log('start: ' + Date.now()); + * + * var value = yield flow. + * execute(() => console.log('A')). + * then( () => console.log('B')). + * then( () => 123); + * + * console.log('mid: %s; value = %d', Date.now(), value); + * + * yield flow.execute(() => console.log('C')); + * }).then(function() { + * console.log('fin'); + * }); + * // start: 0 + * // A + * // B + * // mid: 2; value = 123 + * // C + * // fin + * + * Yielding a _rejected_ promise will cause the rejected value to be thrown + * within the generator function: + * + * flow.execute(function* () { + * console.log('start: ' + Date.now()); + * try { + * yield promise.delayed(10).then(function() { + * throw Error('boom'); + * }); + * } catch (ex) { + * console.log('caught time: ' + Date.now()); + * console.log(ex.message); + * } + * }); + * // start: 0 + * // caught time: 10 + * // boom + * + * # Error Handling + * + * ES6 promises do not require users to handle a promise rejections. This can + * result in subtle bugs as the rejections are silently "swallowed" by the + * ManagedPromise class. + * + * ManagedPromise.reject(Error('boom')); + * // ... *crickets* ... + * + * Selenium's promise module, on the other hand, requires that every rejection + * be explicitly handled. When a {@linkplain ManagedPromise ManagedPromise} is + * rejected and no callbacks are defined on that promise, it is considered an + * _unhandled rejection_ and reproted to the active task queue. If the rejection + * remains unhandled after a single turn of the [event loop][JSEL] (scheduled + * with a micro-task), it will propagate up the stack. + * + * ## Error Propagation + * + * If an unhandled rejection occurs within a task function, that task's promised + * result is rejected and all remaining subtasks are discarded: + * + * flow.execute(function() { + * // No callbacks registered on promise -> unhandled rejection + * promise.rejected(Error('boom')); + * flow.execute(function() { console.log('this will never run'); }); + * }).catch(function(e) { + * console.log(e.message); + * }); + * // boom + * + * The promised results for discarded tasks are silently rejected with a + * cancellation error and existing callback chains will never fire. + * + * flow.execute(function() { + * promise.rejected(Error('boom')); + * flow.execute(function() { console.log('a'); }). + * then(function() { console.log('b'); }); + * }).catch(function(e) { + * console.log(e.message); + * }); + * // boom + * + * An unhandled rejection takes precedence over a task function's returned + * result, even if that value is another promise: + * + * flow.execute(function() { + * promise.rejected(Error('boom')); + * return flow.execute(someOtherTask); + * }).catch(function(e) { + * console.log(e.message); + * }); + * // boom + * + * If there are multiple unhandled rejections within a task, they are packaged + * in a {@link MultipleUnhandledRejectionError}, which has an `errors` property + * that is a `Set` of the recorded unhandled rejections: + * + * flow.execute(function() { + * promise.rejected(Error('boom1')); + * promise.rejected(Error('boom2')); + * }).catch(function(ex) { + * console.log(ex instanceof MultipleUnhandledRejectionError); + * for (var e of ex.errors) { + * console.log(e.message); + * } + * }); + * // boom1 + * // boom2 + * + * When a subtask is discarded due to an unreported rejection in its parent + * frame, the existing callbacks on that task will never settle and the + * callbacks will not be invoked. If a new callback is attached ot the subtask + * _after_ it has been discarded, it is handled the same as adding a callback + * to a cancelled promise: the error-callback path is invoked. This behavior is + * intended to handle cases where the user saves a reference to a task promise, + * as illustrated below. + * + * var subTask; + * flow.execute(function() { + * promise.rejected(Error('boom')); + * subTask = flow.execute(function() {}); + * }).catch(function(e) { + * console.log(e.message); + * }).then(function() { + * return subTask.then( + * () => console.log('subtask success!'), + * (e) => console.log('subtask failed:\n' + e)); + * }); + * // boom + * // subtask failed: + * // DiscardedTaskError: Task was discarded due to a previous failure: boom + * + * When a subtask fails, its promised result is treated the same as any other + * promise: it must be handled within one turn of the rejection or the unhandled + * rejection is propagated to the parent task. This means users can catch errors + * from complex flows from the top level task: + * + * flow.execute(function() { + * flow.execute(function() { + * flow.execute(function() { + * throw Error('fail!'); + * }); + * }); + * }).catch(function(e) { + * console.log(e.message); + * }); + * // fail! + * + * ## Unhandled Rejection Events + * + * When an unhandled rejection propagates to the root of the control flow, the + * flow will emit an __uncaughtException__ event. If no listeners are registered + * on the flow, the error will be rethrown to the global error handler: an + * __uncaughtException__ event from the + * [`process`](https://nodejs.org/api/process.html) object in node, or + * `window.onerror` when running in a browser. + * + * Bottom line: you __*must*__ handle rejected promises. + * + * # ManagedPromise/A+ Compatibility + * + * This `promise` module is compliant with the [ManagedPromise/A+][] specification + * except for sections `2.2.6.1` and `2.2.6.2`: + * + * > + * > - `then` may be called multiple times on the same promise. + * > - If/when `promise` is fulfilled, all respective `onFulfilled` callbacks + * > must execute in the order of their originating calls to `then`. + * > - If/when `promise` is rejected, all respective `onRejected` callbacks + * > must execute in the order of their originating calls to `then`. + * > + * + * Specifically, the conformance tests contains the following scenario (for + * brevity, only the fulfillment version is shown): + * + * var p1 = ManagedPromise.resolve(); + * p1.then(function() { + * console.log('A'); + * p1.then(() => console.log('B')); + * }); + * p1.then(() => console.log('C')); + * // A + * // C + * // B + * + * Since the [ControlFlow](#scheduling_callbacks) executes promise callbacks as + * tasks, with this module, the result would be + * + * var p2 = promise.fulfilled(); + * p2.then(function() { + * console.log('A'); + * p2.then(() => console.log('B'); + * }); + * p2.then(() => console.log('C')); + * // A + * // B + * // C + * + * [JSEL]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/EventLoop + * [GF]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/function* + * [ManagedPromise/A+]: https://promisesaplus.com/ + */ + +'use strict'; + +const events = require('./events'); +const logging = require('./logging'); + + +/** + * Alias to help with readability and differentiate types. + * @const + */ +const NativePromise = Promise; + + +/** + * Whether to append traces of `then` to rejection errors. + * @type {boolean} + */ +var LONG_STACK_TRACES = false; // TODO: this should not be CONSTANT_CASE + + +/** @const */ +const LOG = logging.getLogger('promise'); + + +const UNIQUE_IDS = new WeakMap; +let nextId = 1; + + +function getUid(obj) { + let id = UNIQUE_IDS.get(obj); + if (!id) { + id = nextId; + nextId += 1; + UNIQUE_IDS.set(obj, id); + } + return id; +} + + +/** + * Runs the given function after a micro-task yield. + * @param {function()} fn The function to run. + */ +function asyncRun(fn) { + NativePromise.resolve().then(function() { + try { + fn(); + } catch (ignored) { + // Do nothing. + } + }); +} + + +/** + * @param {number} level What level of verbosity to log with. + * @param {(string|function(this: T): string)} loggable The message to log. + * @param {T=} opt_self The object in whose context to run the loggable + * function. + * @template T + */ +function vlog(level, loggable, opt_self) { + var logLevel = logging.Level.FINE; + if (level > 1) { + logLevel = logging.Level.FINEST; + } else if (level > 0) { + logLevel = logging.Level.FINER; + } + + if (typeof loggable === 'function') { + loggable = loggable.bind(opt_self); + } + + LOG.log(logLevel, loggable); +} + + +/** + * Generates an error to capture the current stack trace. + * @param {string} name Error name for this stack trace. + * @param {string} msg Message to record. + * @param {Function=} opt_topFn The function that should appear at the top of + * the stack; only applicable in V8. + * @return {!Error} The generated error. + */ +function captureStackTrace(name, msg, opt_topFn) { + var e = Error(msg); + e.name = name; + if (Error.captureStackTrace) { + Error.captureStackTrace(e, opt_topFn); + } else { + var stack = Error().stack; + if (stack) { + e.stack = e.toString(); + e.stack += '\n' + stack; + } + } + return e; +} + + +/** + * Error used when the computation of a promise is cancelled. + */ +class CancellationError extends Error { + /** + * @param {string=} opt_msg The cancellation message. + */ + constructor(opt_msg) { + super(opt_msg); + + /** @override */ + this.name = this.constructor.name; + + /** @private {boolean} */ + this.silent_ = false; + } + + /** + * Wraps the given error in a CancellationError. + * + * @param {*} error The error to wrap. + * @param {string=} opt_msg The prefix message to use. + * @return {!CancellationError} A cancellation error. + */ + static wrap(error, opt_msg) { + var message; + if (error instanceof CancellationError) { + return new CancellationError( + opt_msg ? (opt_msg + ': ' + error.message) : error.message); + } else if (opt_msg) { + message = opt_msg; + if (error) { + message += ': ' + error; + } + return new CancellationError(message); + } + if (error) { + message = error + ''; + } + return new CancellationError(message); + } +} + + +/** + * Error used to cancel tasks when a control flow is reset. + * @final + */ +class FlowResetError extends CancellationError { + constructor() { + super('ControlFlow was reset'); + this.silent_ = true; + } +} + + +/** + * Error used to cancel tasks that have been discarded due to an uncaught error + * reported earlier in the control flow. + * @final + */ +class DiscardedTaskError extends CancellationError { + /** @param {*} error The original error. */ + constructor(error) { + if (error instanceof DiscardedTaskError) { + return /** @type {!DiscardedTaskError} */(error); + } + + var msg = ''; + if (error) { + msg = ': ' + ( + typeof error.message === 'string' ? error.message : error); + } + + super('Task was discarded due to a previous failure' + msg); + this.silent_ = true; + } +} + + +/** + * Error used when there are multiple unhandled promise rejections detected + * within a task or callback. + * + * @final + */ +class MultipleUnhandledRejectionError extends Error { + /** + * @param {!(Set<*>)} errors The errors to report. + */ + constructor(errors) { + super('Multiple unhandled promise rejections reported'); + + /** @override */ + this.name = this.constructor.name; + + /** @type {!Set<*>} */ + this.errors = errors; + } +} + + +/** + * Property used to flag constructor's as implementing the Thenable interface + * for runtime type checking. + * @const + */ +const IMPLEMENTED_BY_SYMBOL = Symbol('promise.Thenable'); + + +/** + * Thenable is a promise-like object with a {@code then} method which may be + * used to schedule callbacks on a promised value. + * + * @interface + * @extends {IThenable<T>} + * @template T + */ +class Thenable { + /** + * Adds a property to a class prototype to allow runtime checks of whether + * instances of that class implement the Thenable interface. This function + * will also ensure the prototype's {@code then} function is exported from + * compiled code. + * @param {function(new: Thenable, ...?)} ctor The + * constructor whose prototype to modify. + */ + static addImplementation(ctor) { + ctor.prototype['then'] = ctor.prototype.then; + try { + ctor.prototype[IMPLEMENTED_BY_SYMBOL] = true; + } catch (ignored) { + // Property access denied? + } + } + + /** + * Checks if an object has been tagged for implementing the Thenable + * interface as defined by {@link Thenable.addImplementation}. + * @param {*} object The object to test. + * @return {boolean} Whether the object is an implementation of the Thenable + * interface. + */ + static isImplementation(object) { + if (!object) { + return false; + } + try { + return !!object[IMPLEMENTED_BY_SYMBOL]; + } catch (e) { + return false; // Property access seems to be forbidden. + } + } + + /** + * Cancels the computation of this promise's value, rejecting the promise in + * the process. This method is a no-op if the promise has already been + * resolved. + * + * @param {(string|Error)=} opt_reason The reason this promise is being + * cancelled. This value will be wrapped in a {@link CancellationError}. + */ + cancel(opt_reason) {} + + /** @return {boolean} Whether this promise's value is still being computed. */ + isPending() {} + + /** + * Registers listeners for when this instance is resolved. + * + * @param {?(function(T): (R|IThenable<R>))=} opt_callback The + * function to call if this promise is successfully resolved. The function + * should expect a single argument: the promise's resolved value. + * @param {?(function(*): (R|IThenable<R>))=} opt_errback + * The function to call if this promise is rejected. The function should + * expect a single argument: the rejection reason. + * @return {!ManagedPromise<R>} A new promise which will be + * resolved with the result of the invoked callback. + * @template R + */ + then(opt_callback, opt_errback) {} + + /** + * Registers a listener for when this promise is rejected. This is synonymous + * with the {@code catch} clause in a synchronous API: + * + * // Synchronous API: + * try { + * doSynchronousWork(); + * } catch (ex) { + * console.error(ex); + * } + * + * // Asynchronous promise API: + * doAsynchronousWork().catch(function(ex) { + * console.error(ex); + * }); + * + * @param {function(*): (R|IThenable<R>)} errback The + * function to call if this promise is rejected. The function should + * expect a single argument: the rejection reason. + * @return {!ManagedPromise<R>} A new promise which will be + * resolved with the result of the invoked callback. + * @template R + */ + catch(errback) {} + + /** + * Registers a listener to invoke when this promise is resolved, regardless + * of whether the promise's value was successfully computed. This function + * is synonymous with the {@code finally} clause in a synchronous API: + * + * // Synchronous API: + * try { + * doSynchronousWork(); + * } finally { + * cleanUp(); + * } + * + * // Asynchronous promise API: + * doAsynchronousWork().finally(cleanUp); + * + * __Note:__ similar to the {@code finally} clause, if the registered + * callback returns a rejected promise or throws an error, it will silently + * replace the rejection error (if any) from this promise: + * + * try { + * throw Error('one'); + * } finally { + * throw Error('two'); // Hides Error: one + * } + * + * promise.rejected(Error('one')) + * .finally(function() { + * throw Error('two'); // Hides Error: one + * }); + * + * @param {function(): (R|IThenable<R>)} callback The function to call when + * this promise is resolved. + * @return {!ManagedPromise<R>} A promise that will be fulfilled + * with the callback result. + * @template R + */ + finally(callback) {} +} + + +/** + * @enum {string} + */ +const PromiseState = { + PENDING: 'pending', + BLOCKED: 'blocked', + REJECTED: 'rejected', + FULFILLED: 'fulfilled' +}; + + +/** + * Internal map used to store cancellation handlers for {@link ManagedPromise} + * objects. This is an internal implementation detail used by the + * {@link TaskQueue} class to monitor for when a promise is cancelled without + * generating an extra promise via then(). + * + * @const {!WeakMap<!ManagedPromise, function(!CancellationError)>} + */ +const ON_CANCEL_HANDLER = new WeakMap; + + +/** + * Represents the eventual value of a completed operation. Each promise may be + * in one of three states: pending, fulfilled, or rejected. Each promise starts + * in the pending state and may make a single transition to either a + * fulfilled or rejected state, at which point the promise is considered + * resolved. + * + * @implements {Thenable<T>} + * @template T + * @see http://promises-aplus.github.io/promises-spec/ + */ +class ManagedPromise { + /** + * @param {function( + * function((T|IThenable<T>|Thenable)=), + * function(*=))} resolver + * Function that is invoked immediately to begin computation of this + * promise's value. The function should accept a pair of callback + * functions, one for fulfilling the promise and another for rejecting it. + * @param {ControlFlow=} opt_flow The control flow + * this instance was created under. Defaults to the currently active flow. + */ + constructor(resolver, opt_flow) { + getUid(this); + + /** @private {!ControlFlow} */ + this.flow_ = opt_flow || controlFlow(); + + /** @private {Error} */ + this.stack_ = null; + if (LONG_STACK_TRACES) { + this.stack_ = captureStackTrace('ManagedPromise', 'new', this.constructor); + } + + /** @private {Thenable<?>} */ + this.parent_ = null; + + /** @private {Array<!Task>} */ + this.callbacks_ = null; + + /** @private {PromiseState} */ + this.state_ = PromiseState.PENDING; + + /** @private {boolean} */ + this.handled_ = false; + + /** @private {*} */ + this.value_ = undefined; + + /** @private {TaskQueue} */ + this.queue_ = null; + + try { + var self = this; + resolver(function(value) { + self.resolve_(PromiseState.FULFILLED, value); + }, function(reason) { + self.resolve_(PromiseState.REJECTED, reason); + }); + } catch (ex) { + this.resolve_(PromiseState.REJECTED, ex); + } + } + + /** @override */ + toString() { + return 'ManagedPromise::' + getUid(this) + + ' {[[PromiseStatus]]: "' + this.state_ + '"}'; + } + + /** + * Resolves this promise. If the new value is itself a promise, this function + * will wait for it to be resolved before notifying the registered listeners. + * @param {PromiseState} newState The promise's new state. + * @param {*} newValue The promise's new value. + * @throws {TypeError} If {@code newValue === this}. + * @private + */ + resolve_(newState, newValue) { + if (PromiseState.PENDING !== this.state_) { + return; + } + + if (newValue === this) { + // See promise a+, 2.3.1 + // http://promises-aplus.github.io/promises-spec/#point-48 + newValue = new TypeError('A promise may not resolve to itself'); + newState = PromiseState.REJECTED; + } + + this.parent_ = null; + this.state_ = PromiseState.BLOCKED; + + if (newState !== PromiseState.REJECTED) { + if (Thenable.isImplementation(newValue)) { + // 2.3.2 + newValue = /** @type {!Thenable} */(newValue); + this.parent_ = newValue; + newValue.then( + this.unblockAndResolve_.bind(this, PromiseState.FULFILLED), + this.unblockAndResolve_.bind(this, PromiseState.REJECTED)); + return; + + } else if (newValue + && (typeof newValue === 'object' || typeof newValue === 'function')) { + // 2.3.3 + + try { + // 2.3.3.1 + var then = newValue['then']; + } catch (e) { + // 2.3.3.2 + this.state_ = PromiseState.REJECTED; + this.value_ = e; + this.scheduleNotifications_(); + return; + } + + if (typeof then === 'function') { + // 2.3.3.3 + this.invokeThen_(/** @type {!Object} */(newValue), then); + return; + } + } + } + + if (newState === PromiseState.REJECTED && + isError(newValue) && newValue.stack && this.stack_) { + newValue.stack += '\nFrom: ' + (this.stack_.stack || this.stack_); + } + + // 2.3.3.4 and 2.3.4 + this.state_ = newState; + this.value_ = newValue; + this.scheduleNotifications_(); + } + + /** + * Invokes a thenable's "then" method according to 2.3.3.3 of the promise + * A+ spec. + * @param {!Object} x The thenable object. + * @param {!Function} then The "then" function to invoke. + * @private + */ + invokeThen_(x, then) { + var called = false; + var self = this; + + var resolvePromise = function(value) { + if (!called) { // 2.3.3.3.3 + called = true; + // 2.3.3.3.1 + self.unblockAndResolve_(PromiseState.FULFILLED, value); + } + }; + + var rejectPromise = function(reason) { + if (!called) { // 2.3.3.3.3 + called = true; + // 2.3.3.3.2 + self.unblockAndResolve_(PromiseState.REJECTED, reason); + } + }; + + try { + // 2.3.3.3 + then.call(x, resolvePromise, rejectPromise); + } catch (e) { + // 2.3.3.3.4.2 + rejectPromise(e); + } + } + + /** + * @param {PromiseState} newState The promise's new state. + * @param {*} newValue The promise's new value. + * @private + */ + unblockAndResolve_(newState, newValue) { + if (this.state_ === PromiseState.BLOCKED) { + this.state_ = PromiseState.PENDING; + this.resolve_(newState, newValue); + } + } + + /** + * @private + */ + scheduleNotifications_() { + vlog(2, () => this + ' scheduling notifications', this); + + ON_CANCEL_HANDLER.delete(this); + if (this.value_ instanceof CancellationError + && this.value_.silent_) { + this.callbacks_ = null; + } + + if (!this.queue_) { + this.queue_ = this.flow_.getActiveQueue_(); + } + + if (!this.handled_ && + this.state_ === PromiseState.REJECTED && + !(this.value_ instanceof CancellationError)) { + this.queue_.addUnhandledRejection(this); + } + this.queue_.scheduleCallbacks(this); + } + + /** @override */ + cancel(opt_reason) { + if (!canCancel(this)) { + return; + } + + if (this.parent_ && canCancel(this.parent_)) { + this.parent_.cancel(opt_reason); + } else { + var reason = CancellationError.wrap(opt_reason); + let onCancel = ON_CANCEL_HANDLER.get(this); + if (onCancel) { + onCancel(reason); + ON_CANCEL_HANDLER.delete(this); + } + + if (this.state_ === PromiseState.BLOCKED) { + this.unblockAndResolve_(PromiseState.REJECTED, reason); + } else { + this.resolve_(PromiseState.REJECTED, reason); + } + } + + function canCancel(promise) { + if (!(promise instanceof ManagedPromise)) { + return Thenable.isImplementation(promise); + } + return promise.state_ === PromiseState.PENDING + || promise.state_ === PromiseState.BLOCKED; + } + } + + /** @override */ + isPending() { + return this.state_ === PromiseState.PENDING; + } + + /** @override */ + then(opt_callback, opt_errback) { + return this.addCallback_( + opt_callback, opt_errback, 'then', ManagedPromise.prototype.then); + } + + /** @override */ + catch(errback) { + return this.addCallback_( + null, errback, 'catch', ManagedPromise.prototype.catch); + } + + /** @override */ + finally(callback) { + var error; + var mustThrow = false; + return this.then(function() { + return callback(); + }, function(err) { + error = err; + mustThrow = true; + return callback(); + }).then(function() { + if (mustThrow) { + throw error; + } + }); + } + + /** + * Registers a new callback with this promise + * @param {(function(T): (R|IThenable<R>)|null|undefined)} callback The + * fulfillment callback. + * @param {(function(*): (R|IThenable<R>)|null|undefined)} errback The + * rejection callback. + * @param {string} name The callback name. + * @param {!Function} fn The function to use as the top of the stack when + * recording the callback's creation point. + * @return {!ManagedPromise<R>} A new promise which will be resolved with the + * esult of the invoked callback. + * @template R + * @private + */ + addCallback_(callback, errback, name, fn) { + if (typeof callback !== 'function' && typeof errback !== 'function') { + return this; + } + + this.handled_ = true; + if (this.queue_) { + this.queue_.clearUnhandledRejection(this); + } + + var cb = new Task( + this.flow_, + this.invokeCallback_.bind(this, callback, errback), + name, + LONG_STACK_TRACES ? {name: 'Promise', top: fn} : undefined); + cb.promise.parent_ = this; + + if (this.state_ !== PromiseState.PENDING && + this.state_ !== PromiseState.BLOCKED) { + this.flow_.getActiveQueue_().enqueue(cb); + } else { + if (!this.callbacks_) { + this.callbacks_ = []; + } + this.callbacks_.push(cb); + cb.blocked = true; + this.flow_.getActiveQueue_().enqueue(cb); + } + + return cb.promise; + } + + /** + * Invokes a callback function attached to this promise. + * @param {(function(T): (R|IThenable<R>)|null|undefined)} callback The + * fulfillment callback. + * @param {(function(*): (R|IThenable<R>)|null|undefined)} errback The + * rejection callback. + * @template R + * @private + */ + invokeCallback_(callback, errback) { + var callbackFn = callback; + if (this.state_ === PromiseState.REJECTED) { + callbackFn = errback; + } + + if (typeof callbackFn === 'function') { + if (isGenerator(callbackFn)) { + return consume(callbackFn, null, this.value_); + } + return callbackFn(this.value_); + } else if (this.state_ === PromiseState.REJECTED) { + throw this.value_; + } else { + return this.value_; + } + } +} +Thenable.addImplementation(ManagedPromise); + + +/** + * Represents a value that will be resolved at some point in the future. This + * class represents the protected "producer" half of a ManagedPromise - each Deferred + * has a {@code promise} property that may be returned to consumers for + * registering callbacks, reserving the ability to resolve the deferred to the + * producer. + * + * If this Deferred is rejected and there are no listeners registered before + * the next turn of the event loop, the rejection will be passed to the + * {@link ControlFlow} as an unhandled failure. + * + * @template T + */ +class Deferred { + /** + * @param {ControlFlow=} opt_flow The control flow this instance was + * created under. This should only be provided during unit tests. + */ + constructor(opt_flow) { + var fulfill, reject; + + /** @type {!ManagedPromise<T>} */ + this.promise = new ManagedPromise(function(f, r) { + fulfill = f; + reject = r; + }, opt_flow); + + var self = this; + var checkNotSelf = function(value) { + if (value === self) { + throw new TypeError('May not resolve a Deferred with itself'); + } + }; + + /** + * Resolves this deferred with the given value. It is safe to call this as a + * normal function (with no bound "this"). + * @param {(T|IThenable<T>|Thenable)=} opt_value The fulfilled value. + */ + this.fulfill = function(opt_value) { + checkNotSelf(opt_value); + fulfill(opt_value); + }; + + /** + * Rejects this promise with the given reason. It is safe to call this as a + * normal function (with no bound "this"). + * @param {*=} opt_reason The rejection reason. + */ + this.reject = function(opt_reason) { + checkNotSelf(opt_reason); + reject(opt_reason); + }; + } +} + + +/** + * Tests if a value is an Error-like object. This is more than an straight + * instanceof check since the value may originate from another context. + * @param {*} value The value to test. + * @return {boolean} Whether the value is an error. + */ +function isError(value) { + return value instanceof Error || + (!!value && typeof value === 'object' + && typeof value.message === 'string'); +} + + +/** + * Determines whether a {@code value} should be treated as a promise. + * Any object whose "then" property is a function will be considered a promise. + * + * @param {?} value The value to test. + * @return {boolean} Whether the value is a promise. + */ +function isPromise(value) { + try { + // Use array notation so the Closure compiler does not obfuscate away our + // contract. + return value + && (typeof value === 'object' || typeof value === 'function') + && typeof value['then'] === 'function'; + } catch (ex) { + return false; + } +} + + +/** + * Creates a promise that will be resolved at a set time in the future. + * @param {number} ms The amount of time, in milliseconds, to wait before + * resolving the promise. + * @return {!ManagedPromise} The promise. + */ +function delayed(ms) { + var key; + return new ManagedPromise(function(fulfill) { + key = setTimeout(function() { + key = null; + fulfill(); + }, ms); + }).catch(function(e) { + clearTimeout(key); + key = null; + throw e; + }); +} + + +/** + * Creates a new deferred object. + * @return {!Deferred<T>} The new deferred object. + * @template T + */ +function defer() { + return new Deferred(); +} + + +/** + * Creates a promise that has been resolved with the given value. + * @param {T=} opt_value The resolved value. + * @return {!ManagedPromise<T>} The resolved promise. + * @template T + */ +function fulfilled(opt_value) { + if (opt_value instanceof ManagedPromise) { + return opt_value; + } + return new ManagedPromise(function(fulfill) { + fulfill(opt_value); + }); +} + + +/** + * Creates a promise that has been rejected with the given reason. + * @param {*=} opt_reason The rejection reason; may be any value, but is + * usually an Error or a string. + * @return {!ManagedPromise<T>} The rejected promise. + * @template T + */ +function rejected(opt_reason) { + if (opt_reason instanceof ManagedPromise) { + return opt_reason; + } + return new ManagedPromise(function(_, reject) { + reject(opt_reason); + }); +} + + +/** + * Wraps a function that expects a node-style callback as its final + * argument. This callback expects two arguments: an error value (which will be + * null if the call succeeded), and the success value as the second argument. + * The callback will the resolve or reject the returned promise, based on its + * arguments. + * @param {!Function} fn The function to wrap. + * @param {...?} var_args The arguments to apply to the function, excluding the + * final callback. + * @return {!ManagedPromise} A promise that will be resolved with the + * result of the provided function's callback. + */ +function checkedNodeCall(fn, var_args) { + let args = Array.prototype.slice.call(arguments, 1); + return new ManagedPromise(function(fulfill, reject) { + try { + args.push(function(error, value) { + error ? reject(error) : fulfill(value); + }); + fn.apply(undefined, args); + } catch (ex) { + reject(ex); + } + }); +} + + +/** + * Registers an observer on a promised {@code value}, returning a new promise + * that will be resolved when the value is. If {@code value} is not a promise, + * then the return promise will be immediately resolved. + * @param {*} value The value to observe. + * @param {Function=} opt_callback The function to call when the value is + * resolved successfully. + * @param {Function=} opt_errback The function to call when the value is + * rejected. + * @return {!ManagedPromise} A new promise. + */ +function when(value, opt_callback, opt_errback) { + if (Thenable.isImplementation(value)) { + return value.then(opt_callback, opt_errback); + } + + return new ManagedPromise(function(fulfill) { + fulfill(value); + }).then(opt_callback, opt_errback); +} + + +/** + * Invokes the appropriate callback function as soon as a promised `value` is + * resolved. This function is similar to `when()`, except it does not return + * a new promise. + * @param {*} value The value to observe. + * @param {Function} callback The function to call when the value is + * resolved successfully. + * @param {Function=} opt_errback The function to call when the value is + * rejected. + */ +function asap(value, callback, opt_errback) { + if (isPromise(value)) { + value.then(callback, opt_errback); + + } else if (callback) { + callback(value); + } +} + + +/** + * Given an array of promises, will return a promise that will be fulfilled + * with the fulfillment values of the input array's values. If any of the + * input array's promises are rejected, the returned promise will be rejected + * with the same reason. + * + * @param {!Array<(T|!ManagedPromise<T>)>} arr An array of + * promises to wait on. + * @return {!ManagedPromise<!Array<T>>} A promise that is + * fulfilled with an array containing the fulfilled values of the + * input array, or rejected with the same reason as the first + * rejected value. + * @template T + */ +function all(arr) { + return new ManagedPromise(function(fulfill, reject) { + var n = arr.length; + var values = []; + + if (!n) { + fulfill(values); + return; + } + + var toFulfill = n; + var onFulfilled = function(index, value) { + values[index] = value; + toFulfill--; + if (toFulfill == 0) { + fulfill(values); + } + }; + + function processPromise(index) { + asap(arr[index], function(value) { + onFulfilled(index, value); + }, reject); + } + + for (var i = 0; i < n; ++i) { + processPromise(i); + } + }); +} + + +/** + * Calls a function for each element in an array and inserts the result into a + * new array, which is used as the fulfillment value of the promise returned + * by this function. + * + * If the return value of the mapping function is a promise, this function + * will wait for it to be fulfilled before inserting it into the new array. + * + * If the mapping function throws or returns a rejected promise, the + * promise returned by this function will be rejected with the same reason. + * Only the first failure will be reported; all subsequent errors will be + * silently ignored. + * + * @param {!(Array<TYPE>|ManagedPromise<!Array<TYPE>>)} arr The + * array to iterator over, or a promise that will resolve to said array. + * @param {function(this: SELF, TYPE, number, !Array<TYPE>): ?} fn The + * function to call for each element in the array. This function should + * expect three arguments (the element, the index, and the array itself. + * @param {SELF=} opt_self The object to be used as the value of 'this' within + * {@code fn}. + * @template TYPE, SELF + */ +function map(arr, fn, opt_self) { + return fulfilled(arr).then(function(v) { + if (!Array.isArray(v)) { + throw TypeError('not an array'); + } + var arr = /** @type {!Array} */(v); + return new ManagedPromise(function(fulfill, reject) { + var n = arr.length; + var values = new Array(n); + (function processNext(i) { + for (; i < n; i++) { + if (i in arr) { + break; + } + } + if (i >= n) { + fulfill(values); + return; + } + try { + asap( + fn.call(opt_self, arr[i], i, /** @type {!Array} */(arr)), + function(value) { + values[i] = value; + processNext(i + 1); + }, + reject); + } catch (ex) { + reject(ex); + } + })(0); + }); + }); +} + + +/** + * Calls a function for each element in an array, and if the function returns + * true adds the element to a new array. + * + * If the return value of the filter function is a promise, this function + * will wait for it to be fulfilled before determining whether to insert the + * element into the new array. + * + * If the filter function throws or returns a rejected promise, the promise + * returned by this function will be rejected with the same reason. Only the + * first failure will be reported; all subsequent errors will be silently + * ignored. + * + * @param {!(Array<TYPE>|ManagedPromise<!Array<TYPE>>)} arr The + * array to iterator over, or a promise that will resolve to said array. + * @param {function(this: SELF, TYPE, number, !Array<TYPE>): ( + * boolean|ManagedPromise<boolean>)} fn The function + * to call for each element in the array. + * @param {SELF=} opt_self The object to be used as the value of 'this' within + * {@code fn}. + * @template TYPE, SELF + */ +function filter(arr, fn, opt_self) { + return fulfilled(arr).then(function(v) { + if (!Array.isArray(v)) { + throw TypeError('not an array'); + } + var arr = /** @type {!Array} */(v); + return new ManagedPromise(function(fulfill, reject) { + var n = arr.length; + var values = []; + var valuesLength = 0; + (function processNext(i) { + for (; i < n; i++) { + if (i in arr) { + break; + } + } + if (i >= n) { + fulfill(values); + return; + } + try { + var value = arr[i]; + var include = fn.call(opt_self, value, i, /** @type {!Array} */(arr)); + asap(include, function(include) { + if (include) { + values[valuesLength++] = value; + } + processNext(i + 1); + }, reject); + } catch (ex) { + reject(ex); + } + })(0); + }); + }); +} + + +/** + * Returns a promise that will be resolved with the input value in a + * fully-resolved state. If the value is an array, each element will be fully + * resolved. Likewise, if the value is an object, all keys will be fully + * resolved. In both cases, all nested arrays and objects will also be + * fully resolved. All fields are resolved in place; the returned promise will + * resolve on {@code value} and not a copy. + * + * Warning: This function makes no checks against objects that contain + * cyclical references: + * + * var value = {}; + * value['self'] = value; + * promise.fullyResolved(value); // Stack overflow. + * + * @param {*} value The value to fully resolve. + * @return {!ManagedPromise} A promise for a fully resolved version + * of the input value. + */ +function fullyResolved(value) { + if (isPromise(value)) { + return when(value, fullyResolveValue); + } + return fullyResolveValue(value); +} + + +/** + * @param {*} value The value to fully resolve. If a promise, assumed to + * already be resolved. + * @return {!ManagedPromise} A promise for a fully resolved version + * of the input value. + */ +function fullyResolveValue(value) { + if (Array.isArray(value)) { + return fullyResolveKeys(/** @type {!Array} */ (value)); + } + + if (isPromise(value)) { + if (isPromise(value)) { + // We get here when the original input value is a promise that + // resolves to itself. When the user provides us with such a promise, + // trust that it counts as a "fully resolved" value and return it. + // Of course, since it's already a promise, we can just return it + // to the user instead of wrapping it in another promise. + return /** @type {!ManagedPromise} */ (value); + } + } + + if (value && typeof value === 'object') { + return fullyResolveKeys(/** @type {!Object} */ (value)); + } + + if (typeof value === 'function') { + return fullyResolveKeys(/** @type {!Object} */ (value)); + } + + return fulfilled(value); +} + + +/** + * @param {!(Array|Object)} obj the object to resolve. + * @return {!ManagedPromise} A promise that will be resolved with the + * input object once all of its values have been fully resolved. + */ +function fullyResolveKeys(obj) { + var isArray = Array.isArray(obj); + var numKeys = isArray ? obj.length : (function() { + let n = 0; + for (let key in obj) { + n += 1; + } + return n; + })(); + if (!numKeys) { + return fulfilled(obj); + } + + function forEachProperty(obj, fn) { + for (let key in obj) { + fn.call(null, obj[key], key, obj); + } + } + + function forEachElement(arr, fn) { + arr.forEach(fn); + } + + var numResolved = 0; + return new ManagedPromise(function(fulfill, reject) { + var forEachKey = isArray ? forEachElement: forEachProperty; + + forEachKey(obj, function(partialValue, key) { + if (!Array.isArray(partialValue) + && (!partialValue || typeof partialValue !== 'object')) { + maybeResolveValue(); + return; + } + + fullyResolved(partialValue).then( + function(resolvedValue) { + obj[key] = resolvedValue; + maybeResolveValue(); + }, + reject); + }); + + function maybeResolveValue() { + if (++numResolved == numKeys) { + fulfill(obj); + } + } + }); +} + + +////////////////////////////////////////////////////////////////////////////// +// +// ControlFlow +// +////////////////////////////////////////////////////////////////////////////// + + + +/** + * Handles the execution of scheduled tasks, each of which may be an + * asynchronous operation. The control flow will ensure tasks are executed in + * the ordered scheduled, starting each task only once those before it have + * completed. + * + * Each task scheduled within this flow may return a {@link ManagedPromise} to + * indicate it is an asynchronous operation. The ControlFlow will wait for such + * promises to be resolved before marking the task as completed. + * + * Tasks and each callback registered on a {@link ManagedPromise} will be run + * in their own ControlFlow frame. Any tasks scheduled within a frame will take + * priority over previously scheduled tasks. Furthermore, if any of the tasks in + * the frame fail, the remainder of the tasks in that frame will be discarded + * and the failure will be propagated to the user through the callback/task's + * promised result. + * + * Each time a ControlFlow empties its task queue, it will fire an + * {@link ControlFlow.EventType.IDLE IDLE} event. Conversely, + * whenever the flow terminates due to an unhandled error, it will remove all + * remaining tasks in its queue and fire an + * {@link ControlFlow.EventType.UNCAUGHT_EXCEPTION UNCAUGHT_EXCEPTION} event. + * If there are no listeners registered with the flow, the error will be + * rethrown to the global error handler. + * + * Refer to the {@link ./promise} module documentation fora detailed + * explanation of how the ControlFlow coordinates task execution. + * + * @final + */ +class ControlFlow extends events.EventEmitter { + constructor() { + super(); + + /** @private {boolean} */ + this.propagateUnhandledRejections_ = true; + + /** @private {TaskQueue} */ + this.activeQueue_ = null; + + /** @private {Set<TaskQueue>} */ + this.taskQueues_ = null; + + /** + * Micro task that controls shutting down the control flow. Upon shut down, + * the flow will emit an + * {@link ControlFlow.EventType.IDLE} event. Idle events + * always follow a brief timeout in order to catch latent errors from the + * last completed task. If this task had a callback registered, but no + * errback, and the task fails, the unhandled failure would not be reported + * by the promise system until the next turn of the event loop: + * + * // Schedule 1 task that fails. + * var result = promise.controlFlow().schedule('example', + * function() { return promise.rejected('failed'); }); + * // Set a callback on the result. This delays reporting the unhandled + * // failure for 1 turn of the event loop. + * result.then(function() {}); + * + * @private {MicroTask} + */ + this.shutdownTask_ = null; + + /** + * ID for a long running interval used to keep a Node.js process running + * while a control flow's event loop is still working. This is a cheap hack + * required since JS events are only scheduled to run when there is + * _actually_ something to run. When a control flow is waiting on a task, + * there will be nothing in the JS event loop and the process would + * terminate without this. + * @private + */ + this.hold_ = null; + } + + /** + * Returns a string representation of this control flow, which is its current + * {@linkplain #getSchedule() schedule}, sans task stack traces. + * @return {string} The string representation of this contorl flow. + * @override + */ + toString() { + return this.getSchedule(); + } + + /** + * Sets whether any unhandled rejections should propagate up through the + * control flow stack and cause rejections within parent tasks. If error + * propagation is disabled, tasks will not be aborted when an unhandled + * promise rejection is detected, but the rejection _will_ trigger an + * {@link ControlFlow.EventType.UNCAUGHT_EXCEPTION} + * event. + * + * The default behavior is to propagate all unhandled rejections. _The use + * of this option is highly discouraged._ + * + * @param {boolean} propagate whether to propagate errors. + */ + setPropagateUnhandledRejections(propagate) { + this.propagateUnhandledRejections_ = propagate; + } + + /** + * @return {boolean} Whether this flow is currently idle. + */ + isIdle() { + return !this.shutdownTask_ && (!this.taskQueues_ || !this.taskQueues_.size); + } + + /** + * Resets this instance, clearing its queue and removing all event listeners. + */ + reset() { + this.cancelQueues_(new FlowResetError); + this.emit(ControlFlow.EventType.RESET); + this.removeAllListeners(); + this.cancelShutdown_(); + } + + /** + * Generates an annotated string describing the internal state of this control + * flow, including the currently executing as well as pending tasks. If + * {@code opt_includeStackTraces === true}, the string will include the + * stack trace from when each task was scheduled. + * @param {string=} opt_includeStackTraces Whether to include the stack traces + * from when each task was scheduled. Defaults to false. + * @return {string} String representation of this flow's internal state. + */ + getSchedule(opt_includeStackTraces) { + var ret = 'ControlFlow::' + getUid(this); + var activeQueue = this.activeQueue_; + if (!this.taskQueues_ || !this.taskQueues_.size) { + return ret; + } + var childIndent = '| '; + for (var q of this.taskQueues_) { + ret += '\n' + printQ(q, childIndent); + } + return ret; + + function printQ(q, indent) { + var ret = q.toString(); + if (q === activeQueue) { + ret = '(active) ' + ret; + } + var prefix = indent + childIndent; + if (q.pending_) { + if (q.pending_.q.state_ !== TaskQueueState.FINISHED) { + ret += '\n' + prefix + '(pending) ' + q.pending_.task; + ret += '\n' + printQ(q.pending_.q, prefix + childIndent); + } else { + ret += '\n' + prefix + '(blocked) ' + q.pending_.task; + } + } + if (q.interrupts_) { + q.interrupts_.forEach((task) => { + ret += '\n' + prefix + task; + }); + } + if (q.tasks_) { + q.tasks_.forEach((task) => ret += printTask(task, '\n' + prefix)); + } + return indent + ret; + } + + function printTask(task, prefix) { + var ret = prefix + task; + if (opt_includeStackTraces && task.promise.stack_) { + ret += prefix + childIndent + + (task.promise.stack_.stack || task.promise.stack_) + .replace(/\n/g, prefix); + } + return ret; + } + } + + /** + * Returns the currently actively task queue for this flow. If there is no + * active queue, one will be created. + * @return {!TaskQueue} the currently active task queue for this flow. + * @private + */ + getActiveQueue_() { + if (this.activeQueue_) { + return this.activeQueue_; + } + + this.activeQueue_ = new TaskQueue(this); + if (!this.taskQueues_) { + this.taskQueues_ = new Set(); + } + this.taskQueues_.add(this.activeQueue_); + this.activeQueue_ + .once('end', this.onQueueEnd_, this) + .once('error', this.onQueueError_, this); + + asyncRun(() => this.activeQueue_ = null); + this.activeQueue_.start(); + return this.activeQueue_; + } + + /** + * Schedules a task for execution. If there is nothing currently in the + * queue, the task will be executed in the next turn of the event loop. If + * the task function is a generator, the task will be executed using + * {@link ./promise.consume consume()}. + * + * @param {function(): (T|ManagedPromise<T>)} fn The function to + * call to start the task. If the function returns a + * {@link ManagedPromise}, this instance will wait for it to be + * resolved before starting the next task. + * @param {string=} opt_description A description of the task. + * @return {!ManagedPromise<T>} A promise that will be resolved + * with the result of the action. + * @template T + */ + execute(fn, opt_description) { + if (isGenerator(fn)) { + let original = fn; + fn = () => consume(original); + } + + if (!this.hold_) { + var holdIntervalMs = 2147483647; // 2^31-1; max timer length for Node.js + this.hold_ = setInterval(function() {}, holdIntervalMs); + } + + var task = new Task( + this, fn, opt_description || '<anonymous>', + {name: 'Task', top: ControlFlow.prototype.execute}); + + var q = this.getActiveQueue_(); + q.enqueue(task); + this.emit(ControlFlow.EventType.SCHEDULE_TASK, task.description); + return task.promise; + } + + /** + * Inserts a {@code setTimeout} into the command queue. This is equivalent to + * a thread sleep in a synchronous programming language. + * + * @param {number} ms The timeout delay, in milliseconds. + * @param {string=} opt_description A description to accompany the timeout. + * @return {!ManagedPromise} A promise that will be resolved with + * the result of the action. + */ + timeout(ms, opt_description) { + return this.execute(function() { + return delayed(ms); + }, opt_description); + } + + /** + * Schedules a task that shall wait for a condition to hold. Each condition + * function may return any value, but it will always be evaluated as a + * boolean. + * + * Condition functions may schedule sub-tasks with this instance, however, + * their execution time will be factored into whether a wait has timed out. + * + * In the event a condition returns a ManagedPromise, the polling loop will wait for + * it to be resolved before evaluating whether the condition has been + * satisfied. The resolution time for a promise is factored into whether a + * wait has timed out. + * + * If the condition function throws, or returns a rejected promise, the + * wait task will fail. + * + * If the condition is defined as a promise, the flow will wait for it to + * settle. If the timeout expires before the promise settles, the promise + * returned by this function will be rejected. + * + * If this function is invoked with `timeout === 0`, or the timeout is + * omitted, the flow will wait indefinitely for the condition to be satisfied. + * + * @param {(!ManagedPromise<T>|function())} condition The condition to poll, + * or a promise to wait on. + * @param {number=} opt_timeout How long to wait, in milliseconds, for the + * condition to hold before timing out. If omitted, the flow will wait + * indefinitely. + * @param {string=} opt_message An optional error message to include if the + * wait times out; defaults to the empty string. + * @return {!ManagedPromise<T>} A promise that will be fulfilled + * when the condition has been satisified. The promise shall be rejected + * if the wait times out waiting for the condition. + * @throws {TypeError} If condition is not a function or promise or if timeout + * is not a number >= 0. + * @template T + */ + wait(condition, opt_timeout, opt_message) { + var timeout = opt_timeout || 0; + if (typeof timeout !== 'number' || timeout < 0) { + throw TypeError('timeout must be a number >= 0: ' + timeout); + } + + if (isPromise(condition)) { + return this.execute(function() { + if (!timeout) { + return condition; + } + return new ManagedPromise(function(fulfill, reject) { + var start = Date.now(); + var timer = setTimeout(function() { + timer = null; + reject(Error((opt_message ? opt_message + '\n' : '') + + 'Timed out waiting for promise to resolve after ' + + (Date.now() - start) + 'ms')); + }, timeout); + + /** @type {Thenable} */(condition).then( + function(value) { + timer && clearTimeout(timer); + fulfill(value); + }, + function(error) { + timer && clearTimeout(timer); + reject(error); + }); + }); + }, opt_message || '<anonymous wait: promise resolution>'); + } + + if (typeof condition !== 'function') { + throw TypeError('Invalid condition; must be a function or promise: ' + + typeof condition); + } + + if (isGenerator(condition)) { + let original = condition; + condition = () => consume(original); + } + + var self = this; + return this.execute(function() { + var startTime = Date.now(); + return new ManagedPromise(function(fulfill, reject) { + pollCondition(); + + function pollCondition() { + var conditionFn = /** @type {function()} */(condition); + self.execute(conditionFn).then(function(value) { + var elapsed = Date.now() - startTime; + if (!!value) { + fulfill(value); + } else if (timeout && elapsed >= timeout) { + reject(new Error((opt_message ? opt_message + '\n' : '') + + 'Wait timed out after ' + elapsed + 'ms')); + } else { + // Do not use asyncRun here because we need a non-micro yield + // here so the UI thread is given a chance when running in a + // browser. + setTimeout(pollCondition, 0); + } + }, reject); + } + }); + }, opt_message || '<anonymous wait>'); + } + + /** + * Executes a function in the next available turn of the JavaScript event + * loop. This ensures the function runs with its own task queue and any + * scheduled tasks will run in "parallel" to those scheduled in the current + * function. + * + * flow.execute(() => console.log('a')); + * flow.execute(() => console.log('b')); + * flow.execute(() => console.log('c')); + * flow.async(() => { + * flow.execute(() => console.log('d')); + * flow.execute(() => console.log('e')); + * }); + * flow.async(() => { + * flow.execute(() => console.log('f')); + * flow.execute(() => console.log('g')); + * }); + * flow.once('idle', () => console.log('fin')); + * // a + * // d + * // f + * // b + * // e + * // g + * // c + * // fin + * + * If the function itself throws, the error will be treated the same as an + * unhandled rejection within the control flow. + * + * __NOTE__: This function is considered _unstable_. + * + * @param {!Function} fn The function to execute. + * @param {Object=} opt_self The object in whose context to run the function. + * @param {...*} var_args Any arguments to pass to the function. + */ + async(fn, opt_self, var_args) { + asyncRun(() => { + // Clear any lingering queues, forces getActiveQueue_ to create a new one. + this.activeQueue_ = null; + var q = this.getActiveQueue_(); + try { + q.execute_(fn.bind(opt_self, var_args)); + } catch (ex) { + var cancellationError = CancellationError.wrap(ex, + 'Function passed to ControlFlow.async() threw'); + cancellationError.silent_ = true; + q.abort_(cancellationError); + } finally { + this.activeQueue_ = null; + } + }); + } + + /** + * Event handler for when a task queue is exhausted. This starts the shutdown + * sequence for this instance if there are no remaining task queues: after + * one turn of the event loop, this object will emit the + * {@link ControlFlow.EventType.IDLE IDLE} event to signal + * listeners that it has completed. During this wait, if another task is + * scheduled, the shutdown will be aborted. + * + * @param {!TaskQueue} q the completed task queue. + * @private + */ + onQueueEnd_(q) { + if (!this.taskQueues_) { + return; + } + this.taskQueues_.delete(q); + + vlog(1, () => q + ' has finished'); + vlog(1, () => this.taskQueues_.size + ' queues remain\n' + this, this); + + if (!this.taskQueues_.size) { + if (this.shutdownTask_) { + throw Error('Already have a shutdown task??'); + } + vlog(1, () => 'Scheduling shutdown\n' + this); + this.shutdownTask_ = new MicroTask(() => this.shutdown_()); + } + } + + /** + * Event handler for when a task queue terminates with an error. This triggers + * the cancellation of all other task queues and a + * {@link ControlFlow.EventType.UNCAUGHT_EXCEPTION} event. + * If there are no error event listeners registered with this instance, the + * error will be rethrown to the global error handler. + * + * @param {*} error the error that caused the task queue to terminate. + * @param {!TaskQueue} q the task queue. + * @private + */ + onQueueError_(error, q) { + if (this.taskQueues_) { + this.taskQueues_.delete(q); + } + this.cancelQueues_(CancellationError.wrap( + error, 'There was an uncaught error in the control flow')); + this.cancelShutdown_(); + this.cancelHold_(); + + setTimeout(() => { + let listeners = this.listeners(ControlFlow.EventType.UNCAUGHT_EXCEPTION); + if (!listeners.size) { + throw error; + } else { + this.reportUncaughtException_(error); + } + }, 0); + } + + /** + * Cancels all remaining task queues. + * @param {!CancellationError} reason The cancellation reason. + * @private + */ + cancelQueues_(reason) { + reason.silent_ = true; + if (this.taskQueues_) { + for (var q of this.taskQueues_) { + q.removeAllListeners(); + q.abort_(reason); + } + this.taskQueues_.clear(); + this.taskQueues_ = null; + } + } + + /** + * Reports an uncaught exception using a + * {@link ControlFlow.EventType.UNCAUGHT_EXCEPTION} event. + * + * @param {*} e the error to report. + * @private + */ + reportUncaughtException_(e) { + this.emit(ControlFlow.EventType.UNCAUGHT_EXCEPTION, e); + } + + /** @private */ + cancelHold_() { + if (this.hold_) { + clearInterval(this.hold_); + this.hold_ = null; + } + } + + /** @private */ + shutdown_() { + vlog(1, () => 'Going idle: ' + this); + this.cancelHold_(); + this.shutdownTask_ = null; + this.emit(ControlFlow.EventType.IDLE); + } + + /** + * Cancels the shutdown sequence if it is currently scheduled. + * @private + */ + cancelShutdown_() { + if (this.shutdownTask_) { + this.shutdownTask_.cancel(); + this.shutdownTask_ = null; + } + } +} + + +/** + * Events that may be emitted by an {@link ControlFlow}. + * @enum {string} + */ +ControlFlow.EventType = { + + /** Emitted when all tasks have been successfully executed. */ + IDLE: 'idle', + + /** Emitted when a ControlFlow has been reset. */ + RESET: 'reset', + + /** Emitted whenever a new task has been scheduled. */ + SCHEDULE_TASK: 'scheduleTask', + + /** + * Emitted whenever a control flow aborts due to an unhandled promise + * rejection. This event will be emitted along with the offending rejection + * reason. Upon emitting this event, the control flow will empty its task + * queue and revert to its initial state. + */ + UNCAUGHT_EXCEPTION: 'uncaughtException' +}; + + +/** + * Wraps a function to execute as a cancellable micro task. + * @final + */ +class MicroTask { + /** + * @param {function()} fn The function to run as a micro task. + */ + constructor(fn) { + /** @private {boolean} */ + this.cancelled_ = false; + asyncRun(() => { + if (!this.cancelled_) { + fn(); + } + }); + } + + /** + * Runs the given function after a micro-task yield. + * @param {function()} fn The function to run. + */ + static run(fn) { + NativePromise.resolve().then(function() { + try { + fn(); + } catch (ignored) { + // Do nothing. + } + }); + } + + /** + * Cancels the execution of this task. Note: this will not prevent the task + * timer from firing, just the invocation of the wrapped function. + */ + cancel() { + this.cancelled_ = true; + } +} + + +/** + * A task to be executed by a {@link ControlFlow}. + * + * @template T + * @final + */ +class Task extends Deferred { + /** + * @param {!ControlFlow} flow The flow this instances belongs + * to. + * @param {function(): (T|!ManagedPromise<T>)} fn The function to + * call when the task executes. If it returns a + * {@link ManagedPromise}, the flow will wait for it to be + * resolved before starting the next task. + * @param {string} description A description of the task for debugging. + * @param {{name: string, top: !Function}=} opt_stackOptions Options to use + * when capturing the stacktrace for when this task was created. + */ + constructor(flow, fn, description, opt_stackOptions) { + super(flow); + getUid(this); + + /** @type {function(): (T|!ManagedPromise<T>)} */ + this.execute = fn; + + /** @type {string} */ + this.description = description; + + /** @type {TaskQueue} */ + this.queue = null; + + /** + * Whether this task is considered block. A blocked task may be registered + * in a task queue, but will be dropped if it is still blocked when it + * reaches the front of the queue. A dropped task may always be rescheduled. + * + * Blocked tasks are used when a callback is attached to an unsettled + * promise to reserve a spot in line (in a manner of speaking). If the + * promise is not settled before the callback reaches the front of the + * of the queue, it will be dropped. Once the promise is settled, the + * dropped task will be rescheduled as an interrupt on the currently task + * queue. + * + * @type {boolean} + */ + this.blocked = false; + + if (opt_stackOptions) { + this.promise.stack_ = captureStackTrace( + opt_stackOptions.name, this.description, opt_stackOptions.top); + } + } + + /** @override */ + toString() { + return 'Task::' + getUid(this) + '<' + this.description + '>'; + } +} + + +/** @enum {string} */ +const TaskQueueState = { + NEW: 'new', + STARTED: 'started', + FINISHED: 'finished' +}; + + +/** + * @final + */ +class TaskQueue extends events.EventEmitter { + /** @param {!ControlFlow} flow . */ + constructor(flow) { + super(); + + /** @private {string} */ + this.name_ = 'TaskQueue::' + getUid(this); + + /** @private {!ControlFlow} */ + this.flow_ = flow; + + /** @private {!Array<!Task>} */ + this.tasks_ = []; + + /** @private {Array<!Task>} */ + this.interrupts_ = null; + + /** @private {({task: !Task, q: !TaskQueue}|null)} */ + this.pending_ = null; + + /** @private {TaskQueueState} */ + this.state_ = TaskQueueState.NEW; + + /** @private {!Set<!ManagedPromise>} */ + this.unhandledRejections_ = new Set(); + } + + /** @override */ + toString() { + return 'TaskQueue::' + getUid(this); + } + + /** + * @param {!ManagedPromise} promise . + */ + addUnhandledRejection(promise) { + // TODO: node 4.0.0+ + vlog(2, () => this + ' registering unhandled rejection: ' + promise, this); + this.unhandledRejections_.add(promise); + } + + /** + * @param {!ManagedPromise} promise . + */ + clearUnhandledRejection(promise) { + var deleted = this.unhandledRejections_.delete(promise); + if (deleted) { + // TODO: node 4.0.0+ + vlog(2, () => this + ' clearing unhandled rejection: ' + promise, this); + } + } + + /** + * Enqueues a new task for execution. + * @param {!Task} task The task to enqueue. + * @throws {Error} If this instance has already started execution. + */ + enqueue(task) { + if (this.state_ !== TaskQueueState.NEW) { + throw Error('TaskQueue has started: ' + this); + } + + if (task.queue) { + throw Error('Task is already scheduled in another queue'); + } + + this.tasks_.push(task); + task.queue = this; + ON_CANCEL_HANDLER.set( + task.promise, + (e) => this.onTaskCancelled_(task, e)); + + vlog(1, () => this + '.enqueue(' + task + ')', this); + vlog(2, () => this.flow_.toString(), this); + } + + /** + * Schedules the callbacks registered on the given promise in this queue. + * + * @param {!ManagedPromise} promise the promise whose callbacks should be + * registered as interrupts in this task queue. + * @throws {Error} if this queue has already finished. + */ + scheduleCallbacks(promise) { + if (this.state_ === TaskQueueState.FINISHED) { + throw new Error('cannot interrupt a finished q(' + this + ')'); + } + + if (this.pending_ && this.pending_.task.promise === promise) { + this.pending_.task.promise.queue_ = null; + this.pending_ = null; + asyncRun(() => this.executeNext_()); + } + + if (!promise.callbacks_) { + return; + } + promise.callbacks_.forEach(function(cb) { + cb.blocked = false; + if (cb.queue) { + return; + } + + ON_CANCEL_HANDLER.set( + cb.promise, + (e) => this.onTaskCancelled_(cb, e)); + + if (cb.queue === this && this.tasks_.indexOf(cb) !== -1) { + return; + } + + if (cb.queue) { + cb.queue.dropTask_(cb); + } + + cb.queue = this; + if (!this.interrupts_) { + this.interrupts_ = []; + } + this.interrupts_.push(cb); + }, this); + promise.callbacks_ = null; + vlog(2, () => this + ' interrupted\n' + this.flow_, this); + } + + /** + * Starts executing tasks in this queue. Once called, no further tasks may + * be {@linkplain #enqueue() enqueued} with this instance. + * + * @throws {Error} if this queue has already been started. + */ + start() { + if (this.state_ !== TaskQueueState.NEW) { + throw new Error('TaskQueue has already started'); + } + // Always asynchronously execute next, even if there doesn't look like + // there is anything in the queue. This will catch pending unhandled + // rejections that were registered before start was called. + asyncRun(() => this.executeNext_()); + } + + /** + * Aborts this task queue. If there are any scheduled tasks, they are silently + * cancelled and discarded (their callbacks will never fire). If this queue + * has a _pending_ task, the abortion error is used to cancel that task. + * Otherwise, this queue will emit an error event. + * + * @param {*} error The abortion reason. + * @private + */ + abort_(error) { + var cancellation; + + if (error instanceof FlowResetError) { + cancellation = error; + } else { + cancellation = new DiscardedTaskError(error); + } + + if (this.interrupts_ && this.interrupts_.length) { + this.interrupts_.forEach((t) => t.reject(cancellation)); + this.interrupts_ = []; + } + + if (this.tasks_ && this.tasks_.length) { + this.tasks_.forEach((t) => t.reject(cancellation)); + this.tasks_ = []; + } + + // Now that all of the remaining tasks have been silently cancelled (e.g. no + // exisitng callbacks on those tasks will fire), clear the silence bit on + // the cancellation error. This ensures additional callbacks registered in + // the future will actually execute. + cancellation.silent_ = false; + + if (this.pending_) { + vlog(2, () => this + '.abort(); cancelling pending task', this); + this.pending_.task.promise.cancel( + /** @type {!CancellationError} */(error)); + + } else { + vlog(2, () => this + '.abort(); emitting error event', this); + this.emit('error', error, this); + } + } + + /** @private */ + executeNext_() { + if (this.state_ === TaskQueueState.FINISHED) { + return; + } + this.state_ = TaskQueueState.STARTED; + + if (this.pending_ !== null || this.processUnhandledRejections_()) { + return; + } + + var task; + do { + task = this.getNextTask_(); + } while (task && !task.promise.isPending()); + + if (!task) { + this.state_ = TaskQueueState.FINISHED; + this.tasks_ = []; + this.interrupts_ = null; + vlog(2, () => this + '.emit(end)', this); + this.emit('end', this); + return; + } + + var self = this; + var subQ = new TaskQueue(this.flow_); + subQ.once('end', () => self.onTaskComplete_(result)) + .once('error', (e) => self.onTaskFailure_(result, e)); + vlog(2, () => self + ' created ' + subQ + ' for ' + task); + + var result = undefined; + try { + this.pending_ = {task: task, q: subQ}; + task.promise.queue_ = this; + result = subQ.execute_(task.execute); + subQ.start(); + } catch (ex) { + subQ.abort_(ex); + } + } + + /** + * @param {!Function} fn . + * @return {T} . + * @template T + * @private + */ + execute_(fn) { + try { + activeFlows.push(this.flow_); + this.flow_.activeQueue_ = this; + return fn(); + } finally { + this.flow_.activeQueue_ = null; + activeFlows.pop(); + } + } + + /** + * Process any unhandled rejections registered with this task queue. If there + * is a rejection, this queue will be aborted with the rejection error. If + * there are multiple rejections registered, this queue will be aborted with + * a {@link MultipleUnhandledRejectionError}. + * @return {boolean} whether there was an unhandled rejection. + * @private + */ + processUnhandledRejections_() { + if (!this.unhandledRejections_.size) { + return false; + } + + var errors = new Set(); + for (var rejection of this.unhandledRejections_) { + errors.add(rejection.value_); + } + this.unhandledRejections_.clear(); + + var errorToReport = errors.size === 1 + ? errors.values().next().value + : new MultipleUnhandledRejectionError(errors); + + vlog(1, () => this + ' aborting due to unhandled rejections', this); + if (this.flow_.propagateUnhandledRejections_) { + this.abort_(errorToReport); + return true; + } else { + vlog(1, 'error propagation disabled; reporting to control flow'); + this.flow_.reportUncaughtException_(errorToReport); + return false; + } + } + + /** + * @param {!Task} task The task to drop. + * @private + */ + dropTask_(task) { + var index; + if (this.interrupts_) { + index = this.interrupts_.indexOf(task); + if (index != -1) { + task.queue = null; + this.interrupts_.splice(index, 1); + return; + } + } + + index = this.tasks_.indexOf(task); + if (index != -1) { + task.queue = null; + this.tasks_.splice(index, 1); + } + } + + /** + * @param {!Task} task The task that was cancelled. + * @param {!CancellationError} reason The cancellation reason. + * @private + */ + onTaskCancelled_(task, reason) { + if (this.pending_ && this.pending_.task === task) { + this.pending_.q.abort_(reason); + } else { + this.dropTask_(task); + } + } + + /** + * @param {*} value the value originally returned by the task function. + * @private + */ + onTaskComplete_(value) { + if (this.pending_) { + this.pending_.task.fulfill(value); + } + } + + /** + * @param {*} taskFnResult the value originally returned by the task function. + * @param {*} error the error that caused the task function to terminate. + * @private + */ + onTaskFailure_(taskFnResult, error) { + if (Thenable.isImplementation(taskFnResult)) { + taskFnResult.cancel(CancellationError.wrap(error)); + } + this.pending_.task.reject(error); + } + + /** + * @return {(Task|undefined)} the next task scheduled within this queue, + * if any. + * @private + */ + getNextTask_() { + var task = undefined; + while (true) { + if (this.interrupts_) { + task = this.interrupts_.shift(); + } + if (!task && this.tasks_) { + task = this.tasks_.shift(); + } + if (task && task.blocked) { + vlog(2, () => this + ' skipping blocked task ' + task, this); + task.queue = null; + task = null; + // TODO: recurse when tail-call optimization is available in node. + } else { + break; + } + } + return task; + } +}; + + + +/** + * The default flow to use if no others are active. + * @type {!ControlFlow} + */ +var defaultFlow = new ControlFlow(); + + +/** + * A stack of active control flows, with the top of the stack used to schedule + * commands. When there are multiple flows on the stack, the flow at index N + * represents a callback triggered within a task owned by the flow at index + * N-1. + * @type {!Array<!ControlFlow>} + */ +var activeFlows = []; + + +/** + * Changes the default flow to use when no others are active. + * @param {!ControlFlow} flow The new default flow. + * @throws {Error} If the default flow is not currently active. + */ +function setDefaultFlow(flow) { + if (activeFlows.length) { + throw Error('You may only change the default flow while it is active'); + } + defaultFlow = flow; +} + + +/** + * @return {!ControlFlow} The currently active control flow. + */ +function controlFlow() { + return /** @type {!ControlFlow} */ ( + activeFlows.length ? activeFlows[activeFlows.length - 1] : defaultFlow); +} + + +/** + * Creates a new control flow. The provided callback will be invoked as the + * first task within the new flow, with the flow as its sole argument. Returns + * a promise that resolves to the callback result. + * @param {function(!ControlFlow)} callback The entry point + * to the newly created flow. + * @return {!ManagedPromise} A promise that resolves to the callback + * result. + */ +function createFlow(callback) { + var flow = new ControlFlow; + return flow.execute(function() { + return callback(flow); + }); +} + + +/** + * Tests is a function is a generator. + * @param {!Function} fn The function to test. + * @return {boolean} Whether the function is a generator. + */ +function isGenerator(fn) { + return fn.constructor.name === 'GeneratorFunction'; +} + + +/** + * Consumes a {@code GeneratorFunction}. Each time the generator yields a + * promise, this function will wait for it to be fulfilled before feeding the + * fulfilled value back into {@code next}. Likewise, if a yielded promise is + * rejected, the rejection error will be passed to {@code throw}. + * + * __Example 1:__ the Fibonacci Sequence. + * + * promise.consume(function* fibonacci() { + * var n1 = 1, n2 = 1; + * for (var i = 0; i < 4; ++i) { + * var tmp = yield n1 + n2; + * n1 = n2; + * n2 = tmp; + * } + * return n1 + n2; + * }).then(function(result) { + * console.log(result); // 13 + * }); + * + * __Example 2:__ a generator that throws. + * + * promise.consume(function* () { + * yield promise.delayed(250).then(function() { + * throw Error('boom'); + * }); + * }).catch(function(e) { + * console.log(e.toString()); // Error: boom + * }); + * + * @param {!Function} generatorFn The generator function to execute. + * @param {Object=} opt_self The object to use as "this" when invoking the + * initial generator. + * @param {...*} var_args Any arguments to pass to the initial generator. + * @return {!ManagedPromise<?>} A promise that will resolve to the + * generator's final result. + * @throws {TypeError} If the given function is not a generator. + */ +function consume(generatorFn, opt_self, var_args) { + if (!isGenerator(generatorFn)) { + throw new TypeError('Input is not a GeneratorFunction: ' + + generatorFn.constructor.name); + } + + var deferred = defer(); + var generator = generatorFn.apply( + opt_self, Array.prototype.slice.call(arguments, 2)); + callNext(); + return deferred.promise; + + /** @param {*=} opt_value . */ + function callNext(opt_value) { + pump(generator.next, opt_value); + } + + /** @param {*=} opt_error . */ + function callThrow(opt_error) { + // Dictionary lookup required because Closure compiler's built-in + // externs does not include GeneratorFunction.prototype.throw. + pump(generator['throw'], opt_error); + } + + function pump(fn, opt_arg) { + if (!deferred.promise.isPending()) { + return; // Defererd was cancelled; silently abort. + } + + try { + var result = fn.call(generator, opt_arg); + } catch (ex) { + deferred.reject(ex); + return; + } + + if (result.done) { + deferred.fulfill(result.value); + return; + } + + asap(result.value, callNext, callThrow); + } +} + + +// PUBLIC API + + +module.exports = { + CancellationError: CancellationError, + ControlFlow: ControlFlow, + Deferred: Deferred, + MultipleUnhandledRejectionError: MultipleUnhandledRejectionError, + Thenable: Thenable, + Promise: ManagedPromise, + all: all, + asap: asap, + captureStackTrace: captureStackTrace, + checkedNodeCall: checkedNodeCall, + consume: consume, + controlFlow: controlFlow, + createFlow: createFlow, + defer: defer, + delayed: delayed, + filter: filter, + fulfilled: fulfilled, + fullyResolved: fullyResolved, + isGenerator: isGenerator, + isPromise: isPromise, + map: map, + rejected: rejected, + setDefaultFlow: setDefaultFlow, + when: when, + + get LONG_STACK_TRACES() { return LONG_STACK_TRACES; }, + set LONG_STACK_TRACES(v) { LONG_STACK_TRACES = v; }, +}; |