aboutsummaryrefslogtreecommitdiff
path: root/node_modules/cloneable-readable/index.js
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2018-09-20 02:56:13 +0200
committerFlorian Dold <florian.dold@gmail.com>2018-09-20 02:56:13 +0200
commitbbff7403fbf46f9ad92240ac213df8d30ef31b64 (patch)
treec58400ec5124da1c7d56b01aea83309f80a56c3b /node_modules/cloneable-readable/index.js
parent003fb34971cf63466184351b4db5f7c67df4f444 (diff)
update packages
Diffstat (limited to 'node_modules/cloneable-readable/index.js')
-rw-r--r--node_modules/cloneable-readable/index.js68
1 files changed, 55 insertions, 13 deletions
diff --git a/node_modules/cloneable-readable/index.js b/node_modules/cloneable-readable/index.js
index 7696886dd..b6e490d49 100644
--- a/node_modules/cloneable-readable/index.js
+++ b/node_modules/cloneable-readable/index.js
@@ -1,9 +1,8 @@
'use strict'
-var through2 = require('through2')
+var PassThrough = require('readable-stream').PassThrough
var inherits = require('inherits')
-var nextTick = require('process-nextick-args')
-var Ctor = through2.ctor()
+var p = require('process-nextick-args')
function Cloneable (stream, opts) {
if (!(this instanceof Cloneable)) {
@@ -17,22 +16,33 @@ function Cloneable (stream, opts) {
opts = opts || {}
opts.objectMode = objectMode
- Ctor.call(this, opts)
+ PassThrough.call(this, opts)
forwardDestroy(stream, this)
this.on('newListener', onData)
+ this.once('resume', onResume)
+
+ this._hasListener = true
}
-inherits(Cloneable, Ctor)
+inherits(Cloneable, PassThrough)
function onData (event, listener) {
if (event === 'data' || event === 'readable') {
+ this._hasListener = false
this.removeListener('newListener', onData)
- nextTick(clonePiped, this)
+ this.removeListener('resume', onResume)
+ p.nextTick(clonePiped, this)
}
}
+function onResume () {
+ this._hasListener = false
+ this.removeListener('newListener', onData)
+ p.nextTick(clonePiped, this)
+}
+
Cloneable.prototype.clone = function () {
if (!this._original) {
throw new Error('already started')
@@ -44,22 +54,38 @@ Cloneable.prototype.clone = function () {
// for starting the flow
this.removeListener('newListener', onData)
var clone = new Clone(this)
- this.on('newListener', onData)
+ if (this._hasListener) {
+ this.on('newListener', onData)
+ }
return clone
}
+Cloneable.prototype._destroy = function (err, cb) {
+ if (!err) {
+ this.push(null)
+ this.end()
+ this.emit('close')
+ }
+
+ p.nextTick(cb, err)
+}
+
function forwardDestroy (src, dest) {
src.on('error', destroy)
- src.on('close', destroy)
+ src.on('close', onClose)
function destroy (err) {
dest.destroy(err)
}
+
+ function onClose () {
+ dest.end()
+ }
}
function clonePiped (that) {
- if (--that._clonesCount === 0 && !that._destroyed) {
+ if (--that._clonesCount === 0 && !that._readableState.destroyed) {
that._original.pipe(that)
that._original = undefined
}
@@ -77,9 +103,9 @@ function Clone (parent, opts) {
this.parent = parent
- Ctor.call(this, opts)
+ PassThrough.call(this, opts)
- forwardDestroy(this.parent, this)
+ forwardDestroy(parent, this)
parent.pipe(this)
@@ -87,17 +113,23 @@ function Clone (parent, opts) {
// for starting the flow
// so we add the newListener handle after we are done
this.on('newListener', onDataClone)
+ this.on('resume', onResumeClone)
}
function onDataClone (event, listener) {
// We start the flow once all clones are piped or destroyed
if (event === 'data' || event === 'readable' || event === 'close') {
- nextTick(clonePiped, this.parent)
+ p.nextTick(clonePiped, this.parent)
this.removeListener('newListener', onDataClone)
}
}
-inherits(Clone, Ctor)
+function onResumeClone () {
+ this.removeListener('newListener', onDataClone)
+ p.nextTick(clonePiped, this.parent)
+}
+
+inherits(Clone, PassThrough)
Clone.prototype.clone = function () {
return this.parent.clone()
@@ -107,4 +139,14 @@ Cloneable.isCloneable = function (stream) {
return stream instanceof Cloneable || stream instanceof Clone
}
+Clone.prototype._destroy = function (err, cb) {
+ if (!err) {
+ this.push(null)
+ this.end()
+ this.emit('close')
+ }
+
+ p.nextTick(cb, err)
+}
+
module.exports = Cloneable