diff options
Diffstat (limited to 'node_modules/cloneable-readable')
-rw-r--r-- | node_modules/cloneable-readable/.npmignore | 33 | ||||
-rw-r--r-- | node_modules/cloneable-readable/.travis.yml | 3 | ||||
-rw-r--r-- | node_modules/cloneable-readable/index.js | 68 | ||||
-rw-r--r-- | node_modules/cloneable-readable/package.json | 12 | ||||
-rw-r--r-- | node_modules/cloneable-readable/test.js | 162 |
5 files changed, 199 insertions, 79 deletions
diff --git a/node_modules/cloneable-readable/.npmignore b/node_modules/cloneable-readable/.npmignore deleted file mode 100644 index e920c1671..000000000 --- a/node_modules/cloneable-readable/.npmignore +++ /dev/null @@ -1,33 +0,0 @@ -# Logs -logs -*.log -npm-debug.log* - -# Runtime data -pids -*.pid -*.seed - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul -coverage - -# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# node-waf configuration -.lock-wscript - -# Compiled binary addons (http://nodejs.org/api/addons.html) -build/Release - -# Dependency directory -node_modules - -# Optional npm cache directory -.npm - -# Optional REPL history -.node_repl_history diff --git a/node_modules/cloneable-readable/.travis.yml b/node_modules/cloneable-readable/.travis.yml index 3e4281a78..bf894c7c1 100644 --- a/node_modules/cloneable-readable/.travis.yml +++ b/node_modules/cloneable-readable/.travis.yml @@ -6,3 +6,6 @@ node_js: - "4" - "5" - "6" + - "7" + - "8" + - "9" diff --git a/node_modules/cloneable-readable/index.js b/node_modules/cloneable-readable/index.js index 7696886dd..b6e490d49 100644 --- a/node_modules/cloneable-readable/index.js +++ b/node_modules/cloneable-readable/index.js @@ -1,9 +1,8 @@ 'use strict' -var through2 = require('through2') +var PassThrough = require('readable-stream').PassThrough var inherits = require('inherits') -var nextTick = require('process-nextick-args') -var Ctor = through2.ctor() +var p = require('process-nextick-args') function Cloneable (stream, opts) { if (!(this instanceof Cloneable)) { @@ -17,22 +16,33 @@ function Cloneable (stream, opts) { opts = opts || {} opts.objectMode = objectMode - Ctor.call(this, opts) + PassThrough.call(this, opts) forwardDestroy(stream, this) this.on('newListener', onData) + this.once('resume', onResume) + + this._hasListener = true } -inherits(Cloneable, Ctor) +inherits(Cloneable, PassThrough) function onData (event, listener) { if (event === 'data' || event === 'readable') { + this._hasListener = false this.removeListener('newListener', onData) - nextTick(clonePiped, this) + this.removeListener('resume', onResume) + p.nextTick(clonePiped, this) } } +function onResume () { + this._hasListener = false + this.removeListener('newListener', onData) + p.nextTick(clonePiped, this) +} + Cloneable.prototype.clone = function () { if (!this._original) { throw new Error('already started') @@ -44,22 +54,38 @@ Cloneable.prototype.clone = function () { // for starting the flow this.removeListener('newListener', onData) var clone = new Clone(this) - this.on('newListener', onData) + if (this._hasListener) { + this.on('newListener', onData) + } return clone } +Cloneable.prototype._destroy = function (err, cb) { + if (!err) { + this.push(null) + this.end() + this.emit('close') + } + + p.nextTick(cb, err) +} + function forwardDestroy (src, dest) { src.on('error', destroy) - src.on('close', destroy) + src.on('close', onClose) function destroy (err) { dest.destroy(err) } + + function onClose () { + dest.end() + } } function clonePiped (that) { - if (--that._clonesCount === 0 && !that._destroyed) { + if (--that._clonesCount === 0 && !that._readableState.destroyed) { that._original.pipe(that) that._original = undefined } @@ -77,9 +103,9 @@ function Clone (parent, opts) { this.parent = parent - Ctor.call(this, opts) + PassThrough.call(this, opts) - forwardDestroy(this.parent, this) + forwardDestroy(parent, this) parent.pipe(this) @@ -87,17 +113,23 @@ function Clone (parent, opts) { // for starting the flow // so we add the newListener handle after we are done this.on('newListener', onDataClone) + this.on('resume', onResumeClone) } function onDataClone (event, listener) { // We start the flow once all clones are piped or destroyed if (event === 'data' || event === 'readable' || event === 'close') { - nextTick(clonePiped, this.parent) + p.nextTick(clonePiped, this.parent) this.removeListener('newListener', onDataClone) } } -inherits(Clone, Ctor) +function onResumeClone () { + this.removeListener('newListener', onDataClone) + p.nextTick(clonePiped, this.parent) +} + +inherits(Clone, PassThrough) Clone.prototype.clone = function () { return this.parent.clone() @@ -107,4 +139,14 @@ Cloneable.isCloneable = function (stream) { return stream instanceof Cloneable || stream instanceof Clone } +Clone.prototype._destroy = function (err, cb) { + if (!err) { + this.push(null) + this.end() + this.emit('close') + } + + p.nextTick(cb, err) +} + module.exports = Cloneable diff --git a/node_modules/cloneable-readable/package.json b/node_modules/cloneable-readable/package.json index 30ad00c47..ba8e37e75 100644 --- a/node_modules/cloneable-readable/package.json +++ b/node_modules/cloneable-readable/package.json @@ -1,6 +1,6 @@ { "name": "cloneable-readable", - "version": "1.0.0", + "version": "1.1.2", "description": "Clone a Readable stream, safely", "main": "index.js", "scripts": { @@ -26,14 +26,14 @@ "flush-write-stream": "^1.0.0", "from2": "^2.1.1", "pre-commit": "^1.1.2", - "readable-stream": "^2.1.0", - "standard": "^8.0.0", + "readable-stream": "^2.3.5", + "standard": "^11.0.0", "tap-spec": "^4.1.1", - "tape": "^4.6.0" + "tape": "^4.9.0" }, "dependencies": { "inherits": "^2.0.1", - "process-nextick-args": "^1.0.6", - "through2": "^2.0.1" + "process-nextick-args": "^2.0.0", + "readable-stream": "^2.3.5" } } diff --git a/node_modules/cloneable-readable/test.js b/node_modules/cloneable-readable/test.js index e1d88d4d9..37379c361 100644 --- a/node_modules/cloneable-readable/test.js +++ b/node_modules/cloneable-readable/test.js @@ -1,7 +1,10 @@ 'use strict' +var fs = require('fs') +var path = require('path') var test = require('tape').test var from = require('from2') +var crypto = require('crypto') var sink = require('flush-write-stream') var cloneable = require('./') @@ -297,7 +300,7 @@ test('basic passthrough with readable event on clone', function (t) { }) test('source error destroys all', function (t) { - t.plan(5) + t.plan(3) var source = from() var instance = cloneable(source) @@ -310,17 +313,9 @@ test('source error destroys all', function (t) { t.ok(err === err2, 'instance receives same error') }) - instance.on('close', function () { - t.pass('instance is closed') - }) - clone.on('error', function (err3) { t.ok(err === err3, 'clone receives same error') }) - - clone.on('close', function () { - t.pass('clone is closed') - }) }) source.emit('error', new Error()) @@ -333,19 +328,22 @@ test('source destroy destroys all', function (t) { var instance = cloneable(source) var clone = instance.clone() - instance.on('close', function () { - t.pass('instance is closed') + instance.on('end', function () { + t.pass('instance has ended') }) - clone.on('close', function () { - t.pass('clone is closed') + clone.on('end', function () { + t.pass('clone has ended') }) + clone.resume() + instance.resume() + source.destroy() }) test('instance error destroys all but the source', function (t) { - t.plan(4) + t.plan(2) var source = from() var instance = cloneable(source) @@ -357,18 +355,18 @@ test('instance error destroys all but the source', function (t) { instance.on('error', function (err) { t.is(err.message, 'beep', 'instance errors') + }) - instance.on('close', function () { - t.pass('instance is closed') - }) + instance.on('close', function () { + t.fail('close should not be emitted') + }) - clone.on('error', function (err3) { - t.ok(err === err3, 'clone receives same error') - }) + clone.on('error', function (err) { + t.is(err.message, 'beep', 'instance errors') + }) - clone.on('close', function () { - t.pass('clone is closed') - }) + clone.on('close', function () { + t.fail('close should not be emitted') }) instance.destroy(new Error('beep')) @@ -385,14 +383,17 @@ test('instance destroy destroys all but the source', function (t) { t.fail('source should not be closed') }) - instance.on('close', function () { - t.pass('instance is closed') + instance.on('end', function () { + t.pass('instance has ended') }) - clone.on('close', function () { - t.pass('clone is closed') + clone.on('end', function () { + t.pass('clone has ended') }) + instance.resume() + clone.resume() + instance.destroy() }) @@ -574,3 +575,110 @@ test('isCloneable', function (t) { var cloneClone = clone.clone() t.ok(cloneable.isCloneable(cloneClone), 'a clone of a clone is cloneable') }) + +test('emits finish', function (t) { + var chunks = ['a', 'b', 'c', 'd', null] + var e1 = ['a', 'b', 'c', 'd'] + var e2 = ['a', 'b', 'c', 'd'] + + t.plan(2 + e1.length + e2.length) + + var source = from(function (size, next) { + setImmediate(next, null, chunks.shift()) + }) + + var instance = cloneable(source) + + var clone = instance.clone() + + clone.on('finish', t.pass.bind(null, 'clone emits finish')) + instance.on('finish', t.pass.bind(null, 'main emits finish')) + + instance.pipe(sink(function (chunk, enc, cb) { + t.equal(chunk.toString(), e1.shift(), 'chunk matches') + cb() + })) + + clone.on('data', function (chunk) { + t.equal(chunk.toString(), e2.shift(), 'chunk matches') + }) +}) + +test('clone async w resume', function (t) { + t.plan(4) + + var read = false + var source = from(function (size, next) { + if (read) { + this.push(null) + } else { + read = true + this.push('hello world') + } + next() + }) + + var instance = cloneable(source) + t.notOk(read, 'stream not started') + + var cloned = instance.clone() + t.notOk(read, 'stream not started') + + instance.on('end', t.pass.bind(null, 'end emitted')) + instance.resume() + + setImmediate(function () { + cloned.on('end', t.pass.bind(null, 'end emitted')) + cloned.resume() + }) +}) + +test('big file', function (t) { + t.plan(13) + + var stream = cloneable(fs.createReadStream(path.join(__dirname, 'big'))) + var hash = crypto.createHash('sha1') + hash.setEncoding('hex') + + var toCheck + + fs.createReadStream(path.join(__dirname, 'big')) + .pipe(hash) + .once('readable', function () { + toCheck = hash.read() + t.ok(toCheck) + }) + + function pipe (s, num) { + s.on('end', function () { + t.pass('end for ' + num) + }) + + var dest = path.join(__dirname, 'out') + + s.pipe(fs.createWriteStream(dest)) + .on('finish', function () { + t.pass('finish for ' + num) + + var destHash = crypto.createHash('sha1') + destHash.setEncoding('hex') + + fs.createReadStream(dest) + .pipe(destHash) + .once('readable', function () { + var hash = destHash.read() + t.ok(hash) + t.equal(hash, toCheck) + }) + }) + } + + // Pipe in another event loop tick <-- this one finished only, it's the original cloneable. + setImmediate(pipe.bind(null, stream, 1)) + + // Pipe in the same event loop tick + pipe(stream.clone(), 0) + + // Pipe a long time after + setTimeout(pipe.bind(null, stream.clone(), 2), 1000) +}) |