wallet-core/node_modules/cloneable-readable/index.js

153 lines
3.1 KiB
JavaScript
Raw Normal View History

2016-10-10 03:43:44 +02:00
'use strict'
2018-09-20 02:56:13 +02:00
var PassThrough = require('readable-stream').PassThrough
2016-10-10 03:43:44 +02:00
var inherits = require('inherits')
2018-09-20 02:56:13 +02:00
var p = require('process-nextick-args')
2016-10-10 03:43:44 +02:00
function Cloneable (stream, opts) {
if (!(this instanceof Cloneable)) {
return new Cloneable(stream, opts)
}
var objectMode = stream._readableState.objectMode
this._original = stream
this._clonesCount = 1
opts = opts || {}
opts.objectMode = objectMode
2018-09-20 02:56:13 +02:00
PassThrough.call(this, opts)
2016-10-10 03:43:44 +02:00
forwardDestroy(stream, this)
this.on('newListener', onData)
2018-09-20 02:56:13 +02:00
this.once('resume', onResume)
this._hasListener = true
2016-10-10 03:43:44 +02:00
}
2018-09-20 02:56:13 +02:00
inherits(Cloneable, PassThrough)
2016-10-10 03:43:44 +02:00
function onData (event, listener) {
if (event === 'data' || event === 'readable') {
2018-09-20 02:56:13 +02:00
this._hasListener = false
2016-10-10 03:43:44 +02:00
this.removeListener('newListener', onData)
2018-09-20 02:56:13 +02:00
this.removeListener('resume', onResume)
p.nextTick(clonePiped, this)
2016-10-10 03:43:44 +02:00
}
}
2018-09-20 02:56:13 +02:00
function onResume () {
this._hasListener = false
this.removeListener('newListener', onData)
p.nextTick(clonePiped, this)
}
2016-10-10 03:43:44 +02:00
Cloneable.prototype.clone = function () {
if (!this._original) {
throw new Error('already started')
}
this._clonesCount++
// the events added by the clone should not count
// for starting the flow
this.removeListener('newListener', onData)
var clone = new Clone(this)
2018-09-20 02:56:13 +02:00
if (this._hasListener) {
this.on('newListener', onData)
}
2016-10-10 03:43:44 +02:00
return clone
}
2018-09-20 02:56:13 +02:00
Cloneable.prototype._destroy = function (err, cb) {
if (!err) {
this.push(null)
this.end()
this.emit('close')
}
p.nextTick(cb, err)
}
2016-10-10 03:43:44 +02:00
function forwardDestroy (src, dest) {
src.on('error', destroy)
2018-09-20 02:56:13 +02:00
src.on('close', onClose)
2016-10-10 03:43:44 +02:00
function destroy (err) {
dest.destroy(err)
}
2018-09-20 02:56:13 +02:00
function onClose () {
dest.end()
}
2016-10-10 03:43:44 +02:00
}
function clonePiped (that) {
2018-09-20 02:56:13 +02:00
if (--that._clonesCount === 0 && !that._readableState.destroyed) {
2016-10-10 03:43:44 +02:00
that._original.pipe(that)
that._original = undefined
}
}
function Clone (parent, opts) {
if (!(this instanceof Clone)) {
return new Clone(parent, opts)
}
var objectMode = parent._readableState.objectMode
opts = opts || {}
opts.objectMode = objectMode
this.parent = parent
2018-09-20 02:56:13 +02:00
PassThrough.call(this, opts)
2016-10-10 03:43:44 +02:00
2018-09-20 02:56:13 +02:00
forwardDestroy(parent, this)
2016-10-10 03:43:44 +02:00
parent.pipe(this)
// the events added by the clone should not count
// for starting the flow
// so we add the newListener handle after we are done
this.on('newListener', onDataClone)
2018-09-20 02:56:13 +02:00
this.on('resume', onResumeClone)
2016-10-10 03:43:44 +02:00
}
function onDataClone (event, listener) {
// We start the flow once all clones are piped or destroyed
if (event === 'data' || event === 'readable' || event === 'close') {
2018-09-20 02:56:13 +02:00
p.nextTick(clonePiped, this.parent)
2016-10-10 03:43:44 +02:00
this.removeListener('newListener', onDataClone)
}
}
2018-09-20 02:56:13 +02:00
function onResumeClone () {
this.removeListener('newListener', onDataClone)
p.nextTick(clonePiped, this.parent)
}
inherits(Clone, PassThrough)
2016-10-10 03:43:44 +02:00
Clone.prototype.clone = function () {
return this.parent.clone()
}
Cloneable.isCloneable = function (stream) {
return stream instanceof Cloneable || stream instanceof Clone
}
2018-09-20 02:56:13 +02:00
Clone.prototype._destroy = function (err, cb) {
if (!err) {
this.push(null)
this.end()
this.emit('close')
}
p.nextTick(cb, err)
}
2016-10-10 03:43:44 +02:00
module.exports = Cloneable