From bbff7403fbf46f9ad92240ac213df8d30ef31b64 Mon Sep 17 00:00:00 2001 From: Florian Dold Date: Thu, 20 Sep 2018 02:56:13 +0200 Subject: update packages --- node_modules/cloneable-readable/index.js | 68 ++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 13 deletions(-) (limited to 'node_modules/cloneable-readable/index.js') diff --git a/node_modules/cloneable-readable/index.js b/node_modules/cloneable-readable/index.js index 7696886dd..b6e490d49 100644 --- a/node_modules/cloneable-readable/index.js +++ b/node_modules/cloneable-readable/index.js @@ -1,9 +1,8 @@ 'use strict' -var through2 = require('through2') +var PassThrough = require('readable-stream').PassThrough var inherits = require('inherits') -var nextTick = require('process-nextick-args') -var Ctor = through2.ctor() +var p = require('process-nextick-args') function Cloneable (stream, opts) { if (!(this instanceof Cloneable)) { @@ -17,22 +16,33 @@ function Cloneable (stream, opts) { opts = opts || {} opts.objectMode = objectMode - Ctor.call(this, opts) + PassThrough.call(this, opts) forwardDestroy(stream, this) this.on('newListener', onData) + this.once('resume', onResume) + + this._hasListener = true } -inherits(Cloneable, Ctor) +inherits(Cloneable, PassThrough) function onData (event, listener) { if (event === 'data' || event === 'readable') { + this._hasListener = false this.removeListener('newListener', onData) - nextTick(clonePiped, this) + this.removeListener('resume', onResume) + p.nextTick(clonePiped, this) } } +function onResume () { + this._hasListener = false + this.removeListener('newListener', onData) + p.nextTick(clonePiped, this) +} + Cloneable.prototype.clone = function () { if (!this._original) { throw new Error('already started') @@ -44,22 +54,38 @@ Cloneable.prototype.clone = function () { // for starting the flow this.removeListener('newListener', onData) var clone = new Clone(this) - this.on('newListener', onData) + if (this._hasListener) { + this.on('newListener', onData) + } return clone } +Cloneable.prototype._destroy = function (err, cb) { + if (!err) { + this.push(null) + this.end() + this.emit('close') + } + + p.nextTick(cb, err) +} + function forwardDestroy (src, dest) { src.on('error', destroy) - src.on('close', destroy) + src.on('close', onClose) function destroy (err) { dest.destroy(err) } + + function onClose () { + dest.end() + } } function clonePiped (that) { - if (--that._clonesCount === 0 && !that._destroyed) { + if (--that._clonesCount === 0 && !that._readableState.destroyed) { that._original.pipe(that) that._original = undefined } @@ -77,9 +103,9 @@ function Clone (parent, opts) { this.parent = parent - Ctor.call(this, opts) + PassThrough.call(this, opts) - forwardDestroy(this.parent, this) + forwardDestroy(parent, this) parent.pipe(this) @@ -87,17 +113,23 @@ function Clone (parent, opts) { // for starting the flow // so we add the newListener handle after we are done this.on('newListener', onDataClone) + this.on('resume', onResumeClone) } function onDataClone (event, listener) { // We start the flow once all clones are piped or destroyed if (event === 'data' || event === 'readable' || event === 'close') { - nextTick(clonePiped, this.parent) + p.nextTick(clonePiped, this.parent) this.removeListener('newListener', onDataClone) } } -inherits(Clone, Ctor) +function onResumeClone () { + this.removeListener('newListener', onDataClone) + p.nextTick(clonePiped, this.parent) +} + +inherits(Clone, PassThrough) Clone.prototype.clone = function () { return this.parent.clone() @@ -107,4 +139,14 @@ Cloneable.isCloneable = function (stream) { return stream instanceof Cloneable || stream instanceof Clone } +Clone.prototype._destroy = function (err, cb) { + if (!err) { + this.push(null) + this.end() + this.emit('close') + } + + p.nextTick(cb, err) +} + module.exports = Cloneable -- cgit v1.2.3