wallet-core/node_modules/cloneable-readable/index.js
2016-10-10 03:43:44 +02:00

111 lines
2.3 KiB
JavaScript

'use strict'
var through2 = require('through2')
var inherits = require('inherits')
var nextTick = require('process-nextick-args')
var Ctor = through2.ctor()
function Cloneable (stream, opts) {
if (!(this instanceof Cloneable)) {
return new Cloneable(stream, opts)
}
var objectMode = stream._readableState.objectMode
this._original = stream
this._clonesCount = 1
opts = opts || {}
opts.objectMode = objectMode
Ctor.call(this, opts)
forwardDestroy(stream, this)
this.on('newListener', onData)
}
inherits(Cloneable, Ctor)
function onData (event, listener) {
if (event === 'data' || event === 'readable') {
this.removeListener('newListener', onData)
nextTick(clonePiped, this)
}
}
Cloneable.prototype.clone = function () {
if (!this._original) {
throw new Error('already started')
}
this._clonesCount++
// the events added by the clone should not count
// for starting the flow
this.removeListener('newListener', onData)
var clone = new Clone(this)
this.on('newListener', onData)
return clone
}
function forwardDestroy (src, dest) {
src.on('error', destroy)
src.on('close', destroy)
function destroy (err) {
dest.destroy(err)
}
}
function clonePiped (that) {
if (--that._clonesCount === 0 && !that._destroyed) {
that._original.pipe(that)
that._original = undefined
}
}
function Clone (parent, opts) {
if (!(this instanceof Clone)) {
return new Clone(parent, opts)
}
var objectMode = parent._readableState.objectMode
opts = opts || {}
opts.objectMode = objectMode
this.parent = parent
Ctor.call(this, opts)
forwardDestroy(this.parent, this)
parent.pipe(this)
// the events added by the clone should not count
// for starting the flow
// so we add the newListener handle after we are done
this.on('newListener', onDataClone)
}
function onDataClone (event, listener) {
// We start the flow once all clones are piped or destroyed
if (event === 'data' || event === 'readable' || event === 'close') {
nextTick(clonePiped, this.parent)
this.removeListener('newListener', onDataClone)
}
}
inherits(Clone, Ctor)
Clone.prototype.clone = function () {
return this.parent.clone()
}
Cloneable.isCloneable = function (stream) {
return stream instanceof Cloneable || stream instanceof Clone
}
module.exports = Cloneable