aboutsummaryrefslogtreecommitdiff
path: root/node_modules/readable-stream/lib
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2016-11-03 01:33:53 +0100
committerFlorian Dold <florian.dold@gmail.com>2016-11-03 01:33:53 +0100
commitd1291f67551c58168af43698a359cb5ddfd266b0 (patch)
tree55a13ed29fe1915e3f42f1b1b7038dafa2e975a7 /node_modules/readable-stream/lib
parentd0a0695fb5d34996850723f7d4b1b59c3df909c2 (diff)
node_modules
Diffstat (limited to 'node_modules/readable-stream/lib')
-rw-r--r--node_modules/readable-stream/lib/_stream_duplex.js70
-rw-r--r--node_modules/readable-stream/lib/_stream_passthrough.js30
-rw-r--r--node_modules/readable-stream/lib/_stream_readable.js670
-rw-r--r--node_modules/readable-stream/lib/_stream_transform.js91
-rw-r--r--node_modules/readable-stream/lib/_stream_writable.js367
-rw-r--r--node_modules/readable-stream/lib/internal/streams/BufferList.js64
6 files changed, 664 insertions, 628 deletions
diff --git a/node_modules/readable-stream/lib/_stream_duplex.js b/node_modules/readable-stream/lib/_stream_duplex.js
index b513d61a9..736693b84 100644
--- a/node_modules/readable-stream/lib/_stream_duplex.js
+++ b/node_modules/readable-stream/lib/_stream_duplex.js
@@ -1,39 +1,25 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
// a duplex stream is just a stream that is both readable and writable.
// Since JS doesn't have multiple prototypal inheritance, this class
// prototypally inherits from Readable, and then parasitically from
// Writable.
-module.exports = Duplex;
+'use strict';
/*<replacement>*/
+
var objectKeys = Object.keys || function (obj) {
var keys = [];
- for (var key in obj) keys.push(key);
- return keys;
-}
+ for (var key in obj) {
+ keys.push(key);
+ }return keys;
+};
/*</replacement>*/
+module.exports = Duplex;
+
+/*<replacement>*/
+var processNextTick = require('process-nextick-args');
+/*</replacement>*/
/*<replacement>*/
var util = require('core-util-is');
@@ -45,27 +31,24 @@ var Writable = require('./_stream_writable');
util.inherits(Duplex, Readable);
-forEach(objectKeys(Writable.prototype), function(method) {
- if (!Duplex.prototype[method])
- Duplex.prototype[method] = Writable.prototype[method];
-});
+var keys = objectKeys(Writable.prototype);
+for (var v = 0; v < keys.length; v++) {
+ var method = keys[v];
+ if (!Duplex.prototype[method]) Duplex.prototype[method] = Writable.prototype[method];
+}
function Duplex(options) {
- if (!(this instanceof Duplex))
- return new Duplex(options);
+ if (!(this instanceof Duplex)) return new Duplex(options);
Readable.call(this, options);
Writable.call(this, options);
- if (options && options.readable === false)
- this.readable = false;
+ if (options && options.readable === false) this.readable = false;
- if (options && options.writable === false)
- this.writable = false;
+ if (options && options.writable === false) this.writable = false;
this.allowHalfOpen = true;
- if (options && options.allowHalfOpen === false)
- this.allowHalfOpen = false;
+ if (options && options.allowHalfOpen === false) this.allowHalfOpen = false;
this.once('end', onend);
}
@@ -74,16 +57,19 @@ function Duplex(options) {
function onend() {
// if we allow half-open state, or if the writable side ended,
// then we're ok.
- if (this.allowHalfOpen || this._writableState.ended)
- return;
+ if (this.allowHalfOpen || this._writableState.ended) return;
// no more data can be written.
// But allow more writes to happen in this tick.
- process.nextTick(this.end.bind(this));
+ processNextTick(onEndNT, this);
}
-function forEach (xs, f) {
+function onEndNT(self) {
+ self.end();
+}
+
+function forEach(xs, f) {
for (var i = 0, l = xs.length; i < l; i++) {
f(xs[i], i);
}
-}
+} \ No newline at end of file
diff --git a/node_modules/readable-stream/lib/_stream_passthrough.js b/node_modules/readable-stream/lib/_stream_passthrough.js
index 895ca50a1..d06f71f18 100644
--- a/node_modules/readable-stream/lib/_stream_passthrough.js
+++ b/node_modules/readable-stream/lib/_stream_passthrough.js
@@ -1,28 +1,9 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
// a passthrough stream.
// basically just the most minimal sort of Transform stream.
// Every written chunk gets output as-is.
+'use strict';
+
module.exports = PassThrough;
var Transform = require('./_stream_transform');
@@ -35,12 +16,11 @@ util.inherits = require('inherits');
util.inherits(PassThrough, Transform);
function PassThrough(options) {
- if (!(this instanceof PassThrough))
- return new PassThrough(options);
+ if (!(this instanceof PassThrough)) return new PassThrough(options);
Transform.call(this, options);
}
-PassThrough.prototype._transform = function(chunk, encoding, cb) {
+PassThrough.prototype._transform = function (chunk, encoding, cb) {
cb(null, chunk);
-};
+}; \ No newline at end of file
diff --git a/node_modules/readable-stream/lib/_stream_readable.js b/node_modules/readable-stream/lib/_stream_readable.js
index 19ab35889..208cc65f1 100644
--- a/node_modules/readable-stream/lib/_stream_readable.js
+++ b/node_modules/readable-stream/lib/_stream_readable.js
@@ -1,82 +1,98 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
+'use strict';
module.exports = Readable;
/*<replacement>*/
-var isArray = require('isarray');
+var processNextTick = require('process-nextick-args');
/*</replacement>*/
-
/*<replacement>*/
-var Buffer = require('buffer').Buffer;
+var isArray = require('isarray');
/*</replacement>*/
Readable.ReadableState = ReadableState;
+/*<replacement>*/
var EE = require('events').EventEmitter;
-/*<replacement>*/
-if (!EE.listenerCount) EE.listenerCount = function(emitter, type) {
+var EElistenerCount = function (emitter, type) {
return emitter.listeners(type).length;
};
/*</replacement>*/
-var Stream = require('stream');
+/*<replacement>*/
+var Stream;
+(function () {
+ try {
+ Stream = require('st' + 'ream');
+ } catch (_) {} finally {
+ if (!Stream) Stream = require('events').EventEmitter;
+ }
+})();
+/*</replacement>*/
+
+var Buffer = require('buffer').Buffer;
+/*<replacement>*/
+var bufferShim = require('buffer-shims');
+/*</replacement>*/
/*<replacement>*/
var util = require('core-util-is');
util.inherits = require('inherits');
/*</replacement>*/
-var StringDecoder;
-
-
/*<replacement>*/
-var debug = require('util');
-if (debug && debug.debuglog) {
- debug = debug.debuglog('stream');
+var debugUtil = require('util');
+var debug = void 0;
+if (debugUtil && debugUtil.debuglog) {
+ debug = debugUtil.debuglog('stream');
} else {
debug = function () {};
}
/*</replacement>*/
+var BufferList = require('./internal/streams/BufferList');
+var StringDecoder;
util.inherits(Readable, Stream);
+function prependListener(emitter, event, fn) {
+ if (typeof emitter.prependListener === 'function') {
+ return emitter.prependListener(event, fn);
+ } else {
+ // This is a hack to make sure that our error handler is attached before any
+ // userland ones. NEVER DO THIS. This is here only because this code needs
+ // to continue to work with older versions of Node.js that do not include
+ // the prependListener() method. The goal is to eventually remove this hack.
+ if (!emitter._events || !emitter._events[event]) emitter.on(event, fn);else if (isArray(emitter._events[event])) emitter._events[event].unshift(fn);else emitter._events[event] = [fn, emitter._events[event]];
+ }
+}
+
+var Duplex;
function ReadableState(options, stream) {
- var Duplex = require('./_stream_duplex');
+ Duplex = Duplex || require('./_stream_duplex');
options = options || {};
+ // object stream flag. Used to make read(n) ignore n and to
+ // make all the buffer merging and length checks go away
+ this.objectMode = !!options.objectMode;
+
+ if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.readableObjectMode;
+
// the point at which it stops calling _read() to fill the buffer
// Note: 0 is a valid value, means "don't call _read preemptively ever"
var hwm = options.highWaterMark;
- var defaultHwm = options.objectMode ? 16 : 16 * 1024;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
+ var defaultHwm = this.objectMode ? 16 : 16 * 1024;
+ this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
// cast to ints.
- this.highWaterMark = ~~this.highWaterMark;
+ this.highWaterMark = ~ ~this.highWaterMark;
- this.buffer = [];
+ // A linked list is used to store data chunks instead of an array because the
+ // linked list can remove elements from the beginning faster than
+ // array.shift()
+ this.buffer = new BufferList();
this.length = 0;
this.pipes = null;
this.pipesCount = 0;
@@ -96,14 +112,7 @@ function ReadableState(options, stream) {
this.needReadable = false;
this.emittedReadable = false;
this.readableListening = false;
-
-
- // object stream flag. Used to make read(n) ignore n and to
- // make all the buffer merging and length checks go away
- this.objectMode = !!options.objectMode;
-
- if (stream instanceof Duplex)
- this.objectMode = this.objectMode || !!options.readableObjectMode;
+ this.resumeScheduled = false;
// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
@@ -123,24 +132,25 @@ function ReadableState(options, stream) {
this.decoder = null;
this.encoding = null;
if (options.encoding) {
- if (!StringDecoder)
- StringDecoder = require('string_decoder/').StringDecoder;
+ if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
this.decoder = new StringDecoder(options.encoding);
this.encoding = options.encoding;
}
}
+var Duplex;
function Readable(options) {
- var Duplex = require('./_stream_duplex');
+ Duplex = Duplex || require('./_stream_duplex');
- if (!(this instanceof Readable))
- return new Readable(options);
+ if (!(this instanceof Readable)) return new Readable(options);
this._readableState = new ReadableState(options, this);
// legacy
this.readable = true;
+ if (options && typeof options.read === 'function') this._read = options.read;
+
Stream.call(this);
}
@@ -148,13 +158,13 @@ function Readable(options) {
// This returns true if the highWaterMark has not been hit yet,
// similar to how Writable.write() returns true if you should
// write() some more.
-Readable.prototype.push = function(chunk, encoding) {
+Readable.prototype.push = function (chunk, encoding) {
var state = this._readableState;
- if (util.isString(chunk) && !state.objectMode) {
+ if (!state.objectMode && typeof chunk === 'string') {
encoding = encoding || state.defaultEncoding;
if (encoding !== state.encoding) {
- chunk = new Buffer(chunk, encoding);
+ chunk = bufferShim.from(chunk, encoding);
encoding = '';
}
}
@@ -163,47 +173,52 @@ Readable.prototype.push = function(chunk, encoding) {
};
// Unshift should *always* be something directly out of read()
-Readable.prototype.unshift = function(chunk) {
+Readable.prototype.unshift = function (chunk) {
var state = this._readableState;
return readableAddChunk(this, state, chunk, '', true);
};
+Readable.prototype.isPaused = function () {
+ return this._readableState.flowing === false;
+};
+
function readableAddChunk(stream, state, chunk, encoding, addToFront) {
var er = chunkInvalid(state, chunk);
if (er) {
stream.emit('error', er);
- } else if (util.isNullOrUndefined(chunk)) {
+ } else if (chunk === null) {
state.reading = false;
- if (!state.ended)
- onEofChunk(stream, state);
+ onEofChunk(stream, state);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (state.ended && !addToFront) {
var e = new Error('stream.push() after EOF');
stream.emit('error', e);
} else if (state.endEmitted && addToFront) {
- var e = new Error('stream.unshift() after end event');
- stream.emit('error', e);
+ var _e = new Error('stream.unshift() after end event');
+ stream.emit('error', _e);
} else {
- if (state.decoder && !addToFront && !encoding)
+ var skipAdd;
+ if (state.decoder && !addToFront && !encoding) {
chunk = state.decoder.write(chunk);
+ skipAdd = !state.objectMode && chunk.length === 0;
+ }
- if (!addToFront)
- state.reading = false;
-
- // if we want the data now, just emit it.
- if (state.flowing && state.length === 0 && !state.sync) {
- stream.emit('data', chunk);
- stream.read(0);
- } else {
- // update the buffer info.
- state.length += state.objectMode ? 1 : chunk.length;
- if (addToFront)
- state.buffer.unshift(chunk);
- else
- state.buffer.push(chunk);
-
- if (state.needReadable)
- emitReadable(stream);
+ if (!addToFront) state.reading = false;
+
+ // Don't add to the buffer if we've decoded to an empty string chunk and
+ // we're not in object mode
+ if (!skipAdd) {
+ // if we want the data now, just emit it.
+ if (state.flowing && state.length === 0 && !state.sync) {
+ stream.emit('data', chunk);
+ stream.read(0);
+ } else {
+ // update the buffer info.
+ state.length += state.objectMode ? 1 : chunk.length;
+ if (addToFront) state.buffer.unshift(chunk);else state.buffer.push(chunk);
+
+ if (state.needReadable) emitReadable(stream);
+ }
}
maybeReadMore(stream, state);
@@ -215,8 +230,6 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) {
return needMoreData(state);
}
-
-
// if it's past the high water mark, we can push in some more.
// Also, if we have no data yet, we can stand some
// more bytes. This is to work around cases where hwm=0,
@@ -225,92 +238,71 @@ function readableAddChunk(stream, state, chunk, encoding, addToFront) {
// needReadable was set, then we ought to push more, so that another
// 'readable' event will be triggered.
function needMoreData(state) {
- return !state.ended &&
- (state.needReadable ||
- state.length < state.highWaterMark ||
- state.length === 0);
+ return !state.ended && (state.needReadable || state.length < state.highWaterMark || state.length === 0);
}
// backwards compatibility.
-Readable.prototype.setEncoding = function(enc) {
- if (!StringDecoder)
- StringDecoder = require('string_decoder/').StringDecoder;
+Readable.prototype.setEncoding = function (enc) {
+ if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
this._readableState.decoder = new StringDecoder(enc);
this._readableState.encoding = enc;
return this;
};
-// Don't raise the hwm > 128MB
+// Don't raise the hwm > 8MB
var MAX_HWM = 0x800000;
-function roundUpToNextPowerOf2(n) {
+function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
n = MAX_HWM;
} else {
- // Get the next highest power of 2
+ // Get the next highest power of 2 to prevent increasing hwm excessively in
+ // tiny amounts
n--;
- for (var p = 1; p < 32; p <<= 1) n |= n >> p;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
n++;
}
return n;
}
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
function howMuchToRead(n, state) {
- if (state.length === 0 && state.ended)
+ if (n <= 0 || state.length === 0 && state.ended) return 0;
+ if (state.objectMode) return 1;
+ if (n !== n) {
+ // Only flow one buffer at a time
+ if (state.flowing && state.length) return state.buffer.head.data.length;else return state.length;
+ }
+ // If we're asking for more than the current hwm, then raise the hwm.
+ if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n);
+ if (n <= state.length) return n;
+ // Don't have enough
+ if (!state.ended) {
+ state.needReadable = true;
return 0;
-
- if (state.objectMode)
- return n === 0 ? 0 : 1;
-
- if (isNaN(n) || util.isNull(n)) {
- // only flow one buffer at a time
- if (state.flowing && state.buffer.length)
- return state.buffer[0].length;
- else
- return state.length;
}
-
- if (n <= 0)
- return 0;
-
- // If we're asking for more than the target buffer level,
- // then raise the water mark. Bump up to the next highest
- // power of 2, to prevent increasing it excessively in tiny
- // amounts.
- if (n > state.highWaterMark)
- state.highWaterMark = roundUpToNextPowerOf2(n);
-
- // don't have that much. return null, unless we've ended.
- if (n > state.length) {
- if (!state.ended) {
- state.needReadable = true;
- return 0;
- } else
- return state.length;
- }
-
- return n;
+ return state.length;
}
// you can override either this method, or the async _read(n) below.
-Readable.prototype.read = function(n) {
+Readable.prototype.read = function (n) {
debug('read', n);
+ n = parseInt(n, 10);
var state = this._readableState;
var nOrig = n;
- if (!util.isNumber(n) || n > 0)
- state.emittedReadable = false;
+ if (n !== 0) state.emittedReadable = false;
// if we're doing read(0) to trigger a readable event, but we
// already have a bunch of data in the buffer, then just trigger
// the 'readable' event and move on.
- if (n === 0 &&
- state.needReadable &&
- (state.length >= state.highWaterMark || state.ended)) {
+ if (n === 0 && state.needReadable && (state.length >= state.highWaterMark || state.ended)) {
debug('read: emitReadable', state.length, state.ended);
- if (state.length === 0 && state.ended)
- endReadable(this);
- else
- emitReadable(this);
+ if (state.length === 0 && state.ended) endReadable(this);else emitReadable(this);
return null;
}
@@ -318,8 +310,7 @@ Readable.prototype.read = function(n) {
// if we've ended, and we're now clear, then finish it up.
if (n === 0 && state.ended) {
- if (state.length === 0)
- endReadable(this);
+ if (state.length === 0) endReadable(this);
return null;
}
@@ -360,67 +351,55 @@ Readable.prototype.read = function(n) {
if (state.ended || state.reading) {
doRead = false;
debug('reading or ended', doRead);
- }
-
- if (doRead) {
+ } else if (doRead) {
debug('do read');
state.reading = true;
state.sync = true;
// if the length is currently zero, then we *need* a readable event.
- if (state.length === 0)
- state.needReadable = true;
+ if (state.length === 0) state.needReadable = true;
// call internal read method
this._read(state.highWaterMark);
state.sync = false;
+ // If _read pushed data synchronously, then `reading` will be false,
+ // and we need to re-evaluate how much data we can return to the user.
+ if (!state.reading) n = howMuchToRead(nOrig, state);
}
- // If _read pushed data synchronously, then `reading` will be false,
- // and we need to re-evaluate how much data we can return to the user.
- if (doRead && !state.reading)
- n = howMuchToRead(nOrig, state);
-
var ret;
- if (n > 0)
- ret = fromList(n, state);
- else
- ret = null;
+ if (n > 0) ret = fromList(n, state);else ret = null;
- if (util.isNull(ret)) {
+ if (ret === null) {
state.needReadable = true;
n = 0;
+ } else {
+ state.length -= n;
}
- state.length -= n;
-
- // If we have nothing in the buffer, then we want to know
- // as soon as we *do* get something into the buffer.
- if (state.length === 0 && !state.ended)
- state.needReadable = true;
+ if (state.length === 0) {
+ // If we have nothing in the buffer, then we want to know
+ // as soon as we *do* get something into the buffer.
+ if (!state.ended) state.needReadable = true;
- // If we tried to read() past the EOF, then emit end on the next tick.
- if (nOrig !== n && state.ended && state.length === 0)
- endReadable(this);
+ // If we tried to read() past the EOF, then emit end on the next tick.
+ if (nOrig !== n && state.ended) endReadable(this);
+ }
- if (!util.isNull(ret))
- this.emit('data', ret);
+ if (ret !== null) this.emit('data', ret);
return ret;
};
function chunkInvalid(state, chunk) {
var er = null;
- if (!util.isBuffer(chunk) &&
- !util.isString(chunk) &&
- !util.isNullOrUndefined(chunk) &&
- !state.objectMode) {
+ if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
er = new TypeError('Invalid non-string/buffer chunk');
}
return er;
}
-
function onEofChunk(stream, state) {
- if (state.decoder && !state.ended) {
+ if (state.ended) return;
+ if (state.decoder) {
var chunk = state.decoder.end();
if (chunk && chunk.length) {
state.buffer.push(chunk);
@@ -442,12 +421,7 @@ function emitReadable(stream) {
if (!state.emittedReadable) {
debug('emitReadable', state.flowing);
state.emittedReadable = true;
- if (state.sync)
- process.nextTick(function() {
- emitReadable_(stream);
- });
- else
- emitReadable_(stream);
+ if (state.sync) processNextTick(emitReadable_, stream);else emitReadable_(stream);
}
}
@@ -457,7 +431,6 @@ function emitReadable_(stream) {
flow(stream);
}
-
// at this point, the user has presumably seen the 'readable' event,
// and called read() to consume some data. that may have triggered
// in turn another _read(n) call, in which case reading = true if
@@ -467,23 +440,18 @@ function emitReadable_(stream) {
function maybeReadMore(stream, state) {
if (!state.readingMore) {
state.readingMore = true;
- process.nextTick(function() {
- maybeReadMore_(stream, state);
- });
+ processNextTick(maybeReadMore_, stream, state);
}
}
function maybeReadMore_(stream, state) {
var len = state.length;
- while (!state.reading && !state.flowing && !state.ended &&
- state.length < state.highWaterMark) {
+ while (!state.reading && !state.flowing && !state.ended && state.length < state.highWaterMark) {
debug('maybeReadMore read 0');
stream.read(0);
if (len === state.length)
// didn't get any data, stop spinning.
- break;
- else
- len = state.length;
+ break;else len = state.length;
}
state.readingMore = false;
}
@@ -492,11 +460,11 @@ function maybeReadMore_(stream, state) {
// call cb(er, data) where data is <= n in length.
// for virtual (non-string, non-buffer) streams, "length" is somewhat
// arbitrary, and perhaps not very meaningful.
-Readable.prototype._read = function(n) {
+Readable.prototype._read = function (n) {
this.emit('error', new Error('not implemented'));
};
-Readable.prototype.pipe = function(dest, pipeOpts) {
+Readable.prototype.pipe = function (dest, pipeOpts) {
var src = this;
var state = this._readableState;
@@ -514,15 +482,10 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
state.pipesCount += 1;
debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
- var doEnd = (!pipeOpts || pipeOpts.end !== false) &&
- dest !== process.stdout &&
- dest !== process.stderr;
+ var doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr;
var endFn = doEnd ? onend : cleanup;
- if (state.endEmitted)
- process.nextTick(endFn);
- else
- src.once('end', endFn);
+ if (state.endEmitted) processNextTick(endFn);else src.once('end', endFn);
dest.on('unpipe', onunpipe);
function onunpipe(readable) {
@@ -544,6 +507,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
var ondrain = pipeOnDrain(src);
dest.on('drain', ondrain);
+ var cleanedUp = false;
function cleanup() {
debug('cleanup');
// cleanup event handlers once the pipe is broken
@@ -556,24 +520,36 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
src.removeListener('end', cleanup);
src.removeListener('data', ondata);
+ cleanedUp = true;
+
// if the reader is waiting for a drain event from this
// specific writer, then it would cause it to never start
// flowing again.
// So, if this is awaiting a drain, then we just call it now.
// If we don't know, then assume that we are waiting for one.
- if (state.awaitDrain &&
- (!dest._writableState || dest._writableState.needDrain))
- ondrain();
+ if (state.awaitDrain && (!dest._writableState || dest._writableState.needDrain)) ondrain();
}
+ // If the user pushes more data while we're writing to dest then we'll end up
+ // in ondata again. However, we only want to increase awaitDrain once because
+ // dest will only emit one 'drain' event for the multiple writes.
+ // => Introduce a guard on increasing awaitDrain.
+ var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
+ increasedAwaitDrain = false;
var ret = dest.write(chunk);
- if (false === ret) {
- debug('false write response, pause',
- src._readableState.awaitDrain);
- src._readableState.awaitDrain++;
+ if (false === ret && !increasedAwaitDrain) {
+ // If the user unpiped during `dest.write()`, it is possible
+ // to get stuck in a permanently paused state if that write
+ // also returned false.
+ // => Check whether `dest` is still a piping destination.
+ if ((state.pipesCount === 1 && state.pipes === dest || state.pipesCount > 1 && indexOf(state.pipes, dest) !== -1) && !cleanedUp) {
+ debug('false write response, pause', src._readableState.awaitDrain);
+ src._readableState.awaitDrain++;
+ increasedAwaitDrain = true;
+ }
src.pause();
}
}
@@ -584,19 +560,11 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
debug('onerror', er);
unpipe();
dest.removeListener('error', onerror);
- if (EE.listenerCount(dest, 'error') === 0)
- dest.emit('error', er);
+ if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
}
- // This is a brutally ugly hack to make sure that our error handler
- // is attached before any userland ones. NEVER DO THIS.
- if (!dest._events || !dest._events.error)
- dest.on('error', onerror);
- else if (isArray(dest._events.error))
- dest._events.error.unshift(onerror);
- else
- dest._events.error = [onerror, dest._events.error];
-
+ // Make sure our error handler is attached before userland ones.
+ prependListener(dest, 'error', onerror);
// Both close and finish should trigger unpipe, but only once.
function onclose() {
@@ -629,41 +597,35 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
};
function pipeOnDrain(src) {
- return function() {
+ return function () {
var state = src._readableState;
debug('pipeOnDrain', state.awaitDrain);
- if (state.awaitDrain)
- state.awaitDrain--;
- if (state.awaitDrain === 0 && EE.listenerCount(src, 'data')) {
+ if (state.awaitDrain) state.awaitDrain--;
+ if (state.awaitDrain === 0 && EElistenerCount(src, 'data')) {
state.flowing = true;
flow(src);
}
};
}
-
-Readable.prototype.unpipe = function(dest) {
+Readable.prototype.unpipe = function (dest) {
var state = this._readableState;
// if we're not piping anywhere, then do nothing.
- if (state.pipesCount === 0)
- return this;
+ if (state.pipesCount === 0) return this;
// just one destination. most common case.
if (state.pipesCount === 1) {
// passed in one, but it's not the right one.
- if (dest && dest !== state.pipes)
- return this;
+ if (dest && dest !== state.pipes) return this;
- if (!dest)
- dest = state.pipes;
+ if (!dest) dest = state.pipes;
// got a match.
state.pipes = null;
state.pipesCount = 0;
state.flowing = false;
- if (dest)
- dest.emit('unpipe', this);
+ if (dest) dest.emit('unpipe', this);
return this;
}
@@ -677,20 +639,18 @@ Readable.prototype.unpipe = function(dest) {
state.pipesCount = 0;
state.flowing = false;
- for (var i = 0; i < len; i++)
- dests[i].emit('unpipe', this);
- return this;
+ for (var _i = 0; _i < len; _i++) {
+ dests[_i].emit('unpipe', this);
+ }return this;
}
// try to find the right one.
var i = indexOf(state.pipes, dest);
- if (i === -1)
- return this;
+ if (i === -1) return this;
state.pipes.splice(i, 1);
state.pipesCount -= 1;
- if (state.pipesCount === 1)
- state.pipes = state.pipes[0];
+ if (state.pipesCount === 1) state.pipes = state.pipes[0];
dest.emit('unpipe', this);
@@ -699,27 +659,19 @@ Readable.prototype.unpipe = function(dest) {
// set up data events if they are asked for
// Ensure readable listeners eventually get something
-Readable.prototype.on = function(ev, fn) {
+Readable.prototype.on = function (ev, fn) {
var res = Stream.prototype.on.call(this, ev, fn);
- // If listening to data, and it has not explicitly been paused,
- // then call resume to start the flow of data on the next tick.
- if (ev === 'data' && false !== this._readableState.flowing) {
- this.resume();
- }
-
- if (ev === 'readable' && this.readable) {
+ if (ev === 'data') {
+ // Start flowing on next tick if stream isn't explicitly paused
+ if (this._readableState.flowing !== false) this.resume();
+ } else if (ev === 'readable') {
var state = this._readableState;
- if (!state.readableListening) {
- state.readableListening = true;
+ if (!state.endEmitted && !state.readableListening) {
+ state.readableListening = state.needReadable = true;
state.emittedReadable = false;
- state.needReadable = true;
if (!state.reading) {
- var self = this;
- process.nextTick(function() {
- debug('readable nexttick read 0');
- self.read(0);
- });
+ processNextTick(nReadingNextTick, this);
} else if (state.length) {
emitReadable(this, state);
}
@@ -730,17 +682,18 @@ Readable.prototype.on = function(ev, fn) {
};
Readable.prototype.addListener = Readable.prototype.on;
+function nReadingNextTick(self) {
+ debug('readable nexttick read 0');
+ self.read(0);
+}
+
// pause() and resume() are remnants of the legacy readable stream API
// If the user uses them, then switch into old mode.
-Readable.prototype.resume = function() {
+Readable.prototype.resume = function () {
var state = this._readableState;
if (!state.flowing) {
debug('resume');
state.flowing = true;
- if (!state.reading) {
- debug('resume read 0');
- this.read(0);
- }
resume(this, state);
}
return this;
@@ -749,21 +702,24 @@ Readable.prototype.resume = function() {
function resume(stream, state) {
if (!state.resumeScheduled) {
state.resumeScheduled = true;
- process.nextTick(function() {
- resume_(stream, state);
- });
+ processNextTick(resume_, stream, state);
}
}
function resume_(stream, state) {
+ if (!state.reading) {
+ debug('resume read 0');
+ stream.read(0);
+ }
+
state.resumeScheduled = false;
+ state.awaitDrain = 0;
stream.emit('resume');
flow(stream);
- if (state.flowing && !state.reading)
- stream.read(0);
+ if (state.flowing && !state.reading) stream.read(0);
}
-Readable.prototype.pause = function() {
+Readable.prototype.pause = function () {
debug('call pause flowing=%j', this._readableState.flowing);
if (false !== this._readableState.flowing) {
debug('pause');
@@ -776,38 +732,33 @@ Readable.prototype.pause = function() {
function flow(stream) {
var state = stream._readableState;
debug('flow', state.flowing);
- if (state.flowing) {
- do {
- var chunk = stream.read();
- } while (null !== chunk && state.flowing);
- }
+ while (state.flowing && stream.read() !== null) {}
}
// wrap an old-style stream as the async data source.
// This is *not* part of the readable stream interface.
// It is an ugly unfortunate mess of history.
-Readable.prototype.wrap = function(stream) {
+Readable.prototype.wrap = function (stream) {
var state = this._readableState;
var paused = false;
var self = this;
- stream.on('end', function() {
+ stream.on('end', function () {
debug('wrapped end');
if (state.decoder && !state.ended) {
var chunk = state.decoder.end();
- if (chunk && chunk.length)
- self.push(chunk);
+ if (chunk && chunk.length) self.push(chunk);
}
self.push(null);
});
- stream.on('data', function(chunk) {
+ stream.on('data', function (chunk) {
debug('wrapped data');
- if (state.decoder)
- chunk = state.decoder.write(chunk);
- if (!chunk || !state.objectMode && !chunk.length)
- return;
+ if (state.decoder) chunk = state.decoder.write(chunk);
+
+ // don't skip over falsy values in objectMode
+ if (state.objectMode && (chunk === null || chunk === undefined)) return;else if (!state.objectMode && (!chunk || !chunk.length)) return;
var ret = self.push(chunk);
if (!ret) {
@@ -819,22 +770,24 @@ Readable.prototype.wrap = function(stream) {
// proxy all the other methods.
// important when wrapping filters and duplexes.
for (var i in stream) {
- if (util.isFunction(stream[i]) && util.isUndefined(this[i])) {
- this[i] = function(method) { return function() {
- return stream[method].apply(stream, arguments);
- }}(i);
+ if (this[i] === undefined && typeof stream[i] === 'function') {
+ this[i] = function (method) {
+ return function () {
+ return stream[method].apply(stream, arguments);
+ };
+ }(i);
}
}
// proxy certain important events.
var events = ['error', 'close', 'destroy', 'pause', 'resume'];
- forEach(events, function(ev) {
+ forEach(events, function (ev) {
stream.on(ev, self.emit.bind(self, ev));
});
// when we try to consume some more bytes, simply unpause the
// underlying stream.
- self._read = function(n) {
+ self._read = function (n) {
debug('wrapped _read', n);
if (paused) {
paused = false;
@@ -845,74 +798,106 @@ Readable.prototype.wrap = function(stream) {
return self;
};
-
-
// exposed for testing purposes only.
Readable._fromList = fromList;
// Pluck off n bytes from an array of buffers.
// Length is the combined lengths of all the buffers in the list.
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
function fromList(n, state) {
- var list = state.buffer;
- var length = state.length;
- var stringMode = !!state.decoder;
- var objectMode = !!state.objectMode;
+ // nothing buffered
+ if (state.length === 0) return null;
+
var ret;
+ if (state.objectMode) ret = state.buffer.shift();else if (!n || n >= state.length) {
+ // read it all, truncate the list
+ if (state.decoder) ret = state.buffer.join('');else if (state.buffer.length === 1) ret = state.buffer.head.data;else ret = state.buffer.concat(state.length);
+ state.buffer.clear();
+ } else {
+ // read part of list
+ ret = fromListPartial(n, state.buffer, state.decoder);
+ }
- // nothing in the list, definitely empty.
- if (list.length === 0)
- return null;
+ return ret;
+}
- if (length === 0)
- ret = null;
- else if (objectMode)
+// Extracts only enough buffered data to satisfy the amount requested.
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
+function fromListPartial(n, list, hasStrings) {
+ var ret;
+ if (n < list.head.data.length) {
+ // slice is the same for buffers and strings
+ ret = list.head.data.slice(0, n);
+ list.head.data = list.head.data.slice(n);
+ } else if (n === list.head.data.length) {
+ // first chunk is a perfect match
ret = list.shift();
- else if (!n || n >= length) {
- // read it all, truncate the array.
- if (stringMode)
- ret = list.join('');
- else
- ret = Buffer.concat(list, length);
- list.length = 0;
} else {
- // read just some of it.
- if (n < list[0].length) {
- // just take a part of the first list item.
- // slice is the same for buffers and strings.
- var buf = list[0];
- ret = buf.slice(0, n);
- list[0] = buf.slice(n);
- } else if (n === list[0].length) {
- // first list is a perfect match
- ret = list.shift();
- } else {
- // complex case.
- // we have enough to cover it, but it spans past the first buffer.
- if (stringMode)
- ret = '';
- else
- ret = new Buffer(n);
-
- var c = 0;
- for (var i = 0, l = list.length; i < l && c < n; i++) {
- var buf = list[0];
- var cpy = Math.min(n - c, buf.length);
-
- if (stringMode)
- ret += buf.slice(0, cpy);
- else
- buf.copy(ret, c, 0, cpy);
-
- if (cpy < buf.length)
- list[0] = buf.slice(cpy);
- else
- list.shift();
-
- c += cpy;
+ // result spans more than one buffer
+ ret = hasStrings ? copyFromBufferString(n, list) : copyFromBuffer(n, list);
+ }
+ return ret;
+}
+
+// Copies a specified amount of characters from the list of buffered data
+// chunks.
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
+function copyFromBufferString(n, list) {
+ var p = list.head;
+ var c = 1;
+ var ret = p.data;
+ n -= ret.length;
+ while (p = p.next) {
+ var str = p.data;
+ var nb = n > str.length ? str.length : n;
+ if (nb === str.length) ret += str;else ret += str.slice(0, n);
+ n -= nb;
+ if (n === 0) {
+ if (nb === str.length) {
+ ++c;
+ if (p.next) list.head = p.next;else list.head = list.tail = null;
+ } else {
+ list.head = p;
+ p.data = str.slice(nb);
}
+ break;
}
+ ++c;
}
+ list.length -= c;
+ return ret;
+}
+// Copies a specified amount of bytes from the list of buffered data chunks.
+// This function is designed to be inlinable, so please take care when making
+// changes to the function body.
+function copyFromBuffer(n, list) {
+ var ret = bufferShim.allocUnsafe(n);
+ var p = list.head;
+ var c = 1;
+ p.data.copy(ret);
+ n -= p.data.length;
+ while (p = p.next) {
+ var buf = p.data;
+ var nb = n > buf.length ? buf.length : n;
+ buf.copy(ret, ret.length - n, 0, nb);
+ n -= nb;
+ if (n === 0) {
+ if (nb === buf.length) {
+ ++c;
+ if (p.next) list.head = p.next;else list.head = list.tail = null;
+ } else {
+ list.head = p;
+ p.data = buf.slice(nb);
+ }
+ break;
+ }
+ ++c;
+ }
+ list.length -= c;
return ret;
}
@@ -921,31 +906,32 @@ function endReadable(stream) {
// If we get here before consuming all the bytes, then that is a
// bug in node. Should never happen.
- if (state.length > 0)
- throw new Error('endReadable called on non-empty stream');
+ if (state.length > 0) throw new Error('"endReadable()" called on non-empty stream');
if (!state.endEmitted) {
state.ended = true;
- process.nextTick(function() {
- // Check that we didn't get one last unshift.
- if (!state.endEmitted && state.length === 0) {
- state.endEmitted = true;
- stream.readable = false;
- stream.emit('end');
- }
- });
+ processNextTick(endReadableNT, state, stream);
+ }
+}
+
+function endReadableNT(state, stream) {
+ // Check that we didn't get one last unshift.
+ if (!state.endEmitted && state.length === 0) {
+ state.endEmitted = true;
+ stream.readable = false;
+ stream.emit('end');
}
}
-function forEach (xs, f) {
+function forEach(xs, f) {
for (var i = 0, l = xs.length; i < l; i++) {
f(xs[i], i);
}
}
-function indexOf (xs, x) {
+function indexOf(xs, x) {
for (var i = 0, l = xs.length; i < l; i++) {
if (xs[i] === x) return i;
}
return -1;
-}
+} \ No newline at end of file
diff --git a/node_modules/readable-stream/lib/_stream_transform.js b/node_modules/readable-stream/lib/_stream_transform.js
index 905c5e450..dbc996ede 100644
--- a/node_modules/readable-stream/lib/_stream_transform.js
+++ b/node_modules/readable-stream/lib/_stream_transform.js
@@ -1,25 +1,3 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-
// a transform stream is a readable/writable stream where you do
// something with the data. Sometimes it's called a "filter",
// but that's not a great name for it, since that implies a thing where
@@ -62,6 +40,8 @@
// would be consumed, and then the rest would wait (un-transformed) until
// the results of the previous transformed chunk were consumed.
+'use strict';
+
module.exports = Transform;
var Duplex = require('./_stream_duplex');
@@ -73,9 +53,8 @@ util.inherits = require('inherits');
util.inherits(Transform, Duplex);
-
-function TransformState(options, stream) {
- this.afterTransform = function(er, data) {
+function TransformState(stream) {
+ this.afterTransform = function (er, data) {
return afterTransform(stream, er, data);
};
@@ -83,6 +62,7 @@ function TransformState(options, stream) {
this.transforming = false;
this.writecb = null;
this.writechunk = null;
+ this.writeencoding = null;
}
function afterTransform(stream, er, data) {
@@ -91,17 +71,14 @@ function afterTransform(stream, er, data) {
var cb = ts.writecb;
- if (!cb)
- return stream.emit('error', new Error('no writecb in Transform class'));
+ if (!cb) return stream.emit('error', new Error('no writecb in Transform class'));
ts.writechunk = null;
ts.writecb = null;
- if (!util.isNullOrUndefined(data))
- stream.push(data);
+ if (data !== null && data !== undefined) stream.push(data);
- if (cb)
- cb(er);
+ cb(er);
var rs = stream._readableState;
rs.reading = false;
@@ -110,14 +87,12 @@ function afterTransform(stream, er, data) {
}
}
-
function Transform(options) {
- if (!(this instanceof Transform))
- return new Transform(options);
+ if (!(this instanceof Transform)) return new Transform(options);
Duplex.call(this, options);
- this._transformState = new TransformState(options, this);
+ this._transformState = new TransformState(this);
// when the writable side finishes, then flush out anything remaining.
var stream = this;
@@ -130,17 +105,20 @@ function Transform(options) {
// sync guard flag.
this._readableState.sync = false;
- this.once('prefinish', function() {
- if (util.isFunction(this._flush))
- this._flush(function(er) {
- done(stream, er);
- });
- else
- done(stream);
+ if (options) {
+ if (typeof options.transform === 'function') this._transform = options.transform;
+
+ if (typeof options.flush === 'function') this._flush = options.flush;
+ }
+
+ this.once('prefinish', function () {
+ if (typeof this._flush === 'function') this._flush(function (er) {
+ done(stream, er);
+ });else done(stream);
});
}
-Transform.prototype.push = function(chunk, encoding) {
+Transform.prototype.push = function (chunk, encoding) {
this._transformState.needTransform = false;
return Duplex.prototype.push.call(this, chunk, encoding);
};
@@ -155,31 +133,28 @@ Transform.prototype.push = function(chunk, encoding) {
// Call `cb(err)` when you are done with this chunk. If you pass
// an error, then that'll put the hurt on the whole operation. If you
// never call cb(), then you'll never get another chunk.
-Transform.prototype._transform = function(chunk, encoding, cb) {
- throw new Error('not implemented');
+Transform.prototype._transform = function (chunk, encoding, cb) {
+ throw new Error('Not implemented');
};
-Transform.prototype._write = function(chunk, encoding, cb) {
+Transform.prototype._write = function (chunk, encoding, cb) {
var ts = this._transformState;
ts.writecb = cb;
ts.writechunk = chunk;
ts.writeencoding = encoding;
if (!ts.transforming) {
var rs = this._readableState;
- if (ts.needTransform ||
- rs.needReadable ||
- rs.length < rs.highWaterMark)
- this._read(rs.highWaterMark);
+ if (ts.needTransform || rs.needReadable || rs.length < rs.highWaterMark) this._read(rs.highWaterMark);
}
};
// Doesn't matter what the args are here.
// _transform does all the work.
// That we got here means that the readable side wants more data.
-Transform.prototype._read = function(n) {
+Transform.prototype._read = function (n) {
var ts = this._transformState;
- if (!util.isNull(ts.writechunk) && ts.writecb && !ts.transforming) {
+ if (ts.writechunk !== null && ts.writecb && !ts.transforming) {
ts.transforming = true;
this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform);
} else {
@@ -189,21 +164,17 @@ Transform.prototype._read = function(n) {
}
};
-
function done(stream, er) {
- if (er)
- return stream.emit('error', er);
+ if (er) return stream.emit('error', er);
// if there's nothing in the write buffer, then that means
// that nothing more will ever be provided
var ws = stream._writableState;
var ts = stream._transformState;
- if (ws.length)
- throw new Error('calling transform done when ws.length != 0');
+ if (ws.length) throw new Error('Calling transform done when ws.length != 0');
- if (ts.transforming)
- throw new Error('calling transform done when still transforming');
+ if (ts.transforming) throw new Error('Calling transform done when still transforming');
return stream.push(null);
-}
+} \ No newline at end of file
diff --git a/node_modules/readable-stream/lib/_stream_writable.js b/node_modules/readable-stream/lib/_stream_writable.js
index db8539cd5..ed5efcbd2 100644
--- a/node_modules/readable-stream/lib/_stream_writable.js
+++ b/node_modules/readable-stream/lib/_stream_writable.js
@@ -1,73 +1,80 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
// A bit simpler than readable streams.
-// Implement an async ._write(chunk, cb), and it'll handle all
+// Implement an async ._write(chunk, encoding, cb), and it'll handle all
// the drain event emission and buffering.
+'use strict';
+
module.exports = Writable;
/*<replacement>*/
-var Buffer = require('buffer').Buffer;
+var processNextTick = require('process-nextick-args');
/*</replacement>*/
-Writable.WritableState = WritableState;
+/*<replacement>*/
+var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
+/*</replacement>*/
+Writable.WritableState = WritableState;
/*<replacement>*/
var util = require('core-util-is');
util.inherits = require('inherits');
/*</replacement>*/
-var Stream = require('stream');
+/*<replacement>*/
+var internalUtil = {
+ deprecate: require('util-deprecate')
+};
+/*</replacement>*/
+
+/*<replacement>*/
+var Stream;
+(function () {
+ try {
+ Stream = require('st' + 'ream');
+ } catch (_) {} finally {
+ if (!Stream) Stream = require('events').EventEmitter;
+ }
+})();
+/*</replacement>*/
+
+var Buffer = require('buffer').Buffer;
+/*<replacement>*/
+var bufferShim = require('buffer-shims');
+/*</replacement>*/
util.inherits(Writable, Stream);
+function nop() {}
+
function WriteReq(chunk, encoding, cb) {
this.chunk = chunk;
this.encoding = encoding;
this.callback = cb;
+ this.next = null;
}
+var Duplex;
function WritableState(options, stream) {
- var Duplex = require('./_stream_duplex');
+ Duplex = Duplex || require('./_stream_duplex');
options = options || {};
- // the point at which write() starts returning false
- // Note: 0 is a valid value, means that we always return false if
- // the entire buffer is not flushed immediately on write()
- var hwm = options.highWaterMark;
- var defaultHwm = options.objectMode ? 16 : 16 * 1024;
- this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
-
// object stream flag to indicate whether or not this stream
// contains buffers or objects.
this.objectMode = !!options.objectMode;
- if (stream instanceof Duplex)
- this.objectMode = this.objectMode || !!options.writableObjectMode;
+ if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
+
+ // the point at which write() starts returning false
+ // Note: 0 is a valid value, means that we always return false if
+ // the entire buffer is not flushed immediately on write()
+ var hwm = options.highWaterMark;
+ var defaultHwm = this.objectMode ? 16 : 16 * 1024;
+ this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
// cast to ints.
- this.highWaterMark = ~~this.highWaterMark;
+ this.highWaterMark = ~ ~this.highWaterMark;
this.needDrain = false;
// at the start of calling end()
@@ -111,7 +118,7 @@ function WritableState(options, stream) {
this.bufferProcessing = false;
// the callback that's passed to _write(chunk,cb)
- this.onwrite = function(er) {
+ this.onwrite = function (er) {
onwrite(stream, er);
};
@@ -121,7 +128,8 @@ function WritableState(options, stream) {
// the amount that is being written when _write is called.
this.writelen = 0;
- this.buffer = [];
+ this.bufferedRequest = null;
+ this.lastBufferedRequest = null;
// number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted
@@ -133,37 +141,67 @@ function WritableState(options, stream) {
// True if the error was already emitted and should not be thrown again
this.errorEmitted = false;
+
+ // count buffered requests
+ this.bufferedRequestCount = 0;
+
+ // allocate the first CorkedRequest, there is always
+ // one allocated and free to use, and we maintain at most two
+ this.corkedRequestsFree = new CorkedRequest(this);
}
+WritableState.prototype.getBuffer = function writableStateGetBuffer() {
+ var current = this.bufferedRequest;
+ var out = [];
+ while (current) {
+ out.push(current);
+ current = current.next;
+ }
+ return out;
+};
+
+(function () {
+ try {
+ Object.defineProperty(WritableState.prototype, 'buffer', {
+ get: internalUtil.deprecate(function () {
+ return this.getBuffer();
+ }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
+ });
+ } catch (_) {}
+})();
+
+var Duplex;
function Writable(options) {
- var Duplex = require('./_stream_duplex');
+ Duplex = Duplex || require('./_stream_duplex');
// Writable ctor is applied to Duplexes, though they're not
// instanceof Writable, they're instanceof Readable.
- if (!(this instanceof Writable) && !(this instanceof Duplex))
- return new Writable(options);
+ if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
this._writableState = new WritableState(options, this);
// legacy.
this.writable = true;
+ if (options) {
+ if (typeof options.write === 'function') this._write = options.write;
+
+ if (typeof options.writev === 'function') this._writev = options.writev;
+ }
+
Stream.call(this);
}
// Otherwise people can pipe Writable streams, which is just wrong.
-Writable.prototype.pipe = function() {
- this.emit('error', new Error('Cannot pipe. Not readable.'));
+Writable.prototype.pipe = function () {
+ this.emit('error', new Error('Cannot pipe, not readable'));
};
-
-function writeAfterEnd(stream, state, cb) {
+function writeAfterEnd(stream, cb) {
var er = new Error('write after end');
// TODO: defer error events consistently everywhere, not just the cb
stream.emit('error', er);
- process.nextTick(function() {
- cb(er);
- });
+ processNextTick(cb, er);
}
// If we get something that is not a buffer, string, null, or undefined,
@@ -173,40 +211,37 @@ function writeAfterEnd(stream, state, cb) {
// how many bytes or characters.
function validChunk(stream, state, chunk, cb) {
var valid = true;
- if (!util.isBuffer(chunk) &&
- !util.isString(chunk) &&
- !util.isNullOrUndefined(chunk) &&
- !state.objectMode) {
- var er = new TypeError('Invalid non-string/buffer chunk');
+ var er = false;
+ // Always throw error if a null is written
+ // if we are not in object mode then throw
+ // if it is not a buffer, string, or undefined.
+ if (chunk === null) {
+ er = new TypeError('May not write null values to stream');
+ } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
+ er = new TypeError('Invalid non-string/buffer chunk');
+ }
+ if (er) {
stream.emit('error', er);
- process.nextTick(function() {
- cb(er);
- });
+ processNextTick(cb, er);
valid = false;
}
return valid;
}
-Writable.prototype.write = function(chunk, encoding, cb) {
+Writable.prototype.write = function (chunk, encoding, cb) {
var state = this._writableState;
var ret = false;
- if (util.isFunction(encoding)) {
+ if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
- if (util.isBuffer(chunk))
- encoding = 'buffer';
- else if (!encoding)
- encoding = state.defaultEncoding;
+ if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
- if (!util.isFunction(cb))
- cb = function() {};
+ if (typeof cb !== 'function') cb = nop;
- if (state.ended)
- writeAfterEnd(this, state, cb);
- else if (validChunk(this, state, chunk, cb)) {
+ if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
state.pendingcb++;
ret = writeOrBuffer(this, state, chunk, encoding, cb);
}
@@ -214,32 +249,33 @@ Writable.prototype.write = function(chunk, encoding, cb) {
return ret;
};
-Writable.prototype.cork = function() {
+Writable.prototype.cork = function () {
var state = this._writableState;
state.corked++;
};
-Writable.prototype.uncork = function() {
+Writable.prototype.uncork = function () {
var state = this._writableState;
if (state.corked) {
state.corked--;
- if (!state.writing &&
- !state.corked &&
- !state.finished &&
- !state.bufferProcessing &&
- state.buffer.length)
- clearBuffer(this, state);
+ if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
}
};
+Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
+ // node::ParseEncoding() requires lower case.
+ if (typeof encoding === 'string') encoding = encoding.toLowerCase();
+ if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
+ this._writableState.defaultEncoding = encoding;
+ return this;
+};
+
function decodeChunk(state, chunk, encoding) {
- if (!state.objectMode &&
- state.decodeStrings !== false &&
- util.isString(chunk)) {
- chunk = new Buffer(chunk, encoding);
+ if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
+ chunk = bufferShim.from(chunk, encoding);
}
return chunk;
}
@@ -249,21 +285,28 @@ function decodeChunk(state, chunk, encoding) {
// If we return false, then we need a drain event, so set that flag.
function writeOrBuffer(stream, state, chunk, encoding, cb) {
chunk = decodeChunk(state, chunk, encoding);
- if (util.isBuffer(chunk))
- encoding = 'buffer';
+
+ if (Buffer.isBuffer(chunk)) encoding = 'buffer';
var len = state.objectMode ? 1 : chunk.length;
state.length += len;
var ret = state.length < state.highWaterMark;
// we must ensure that previous needDrain will not be reset to false.
- if (!ret)
- state.needDrain = true;
+ if (!ret) state.needDrain = true;
- if (state.writing || state.corked)
- state.buffer.push(new WriteReq(chunk, encoding, cb));
- else
+ if (state.writing || state.corked) {
+ var last = state.lastBufferedRequest;
+ state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
+ if (last) {
+ last.next = state.lastBufferedRequest;
+ } else {
+ state.bufferedRequest = state.lastBufferedRequest;
+ }
+ state.bufferedRequestCount += 1;
+ } else {
doWrite(stream, state, false, len, chunk, encoding, cb);
+ }
return ret;
}
@@ -273,23 +316,13 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writecb = cb;
state.writing = true;
state.sync = true;
- if (writev)
- stream._writev(chunk, state.onwrite);
- else
- stream._write(chunk, encoding, state.onwrite);
+ if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}
function onwriteError(stream, state, sync, er, cb) {
- if (sync)
- process.nextTick(function() {
- state.pendingcb--;
- cb(er);
- });
- else {
- state.pendingcb--;
- cb(er);
- }
+ --state.pendingcb;
+ if (sync) processNextTick(cb, er);else cb(er);
stream._writableState.errorEmitted = true;
stream.emit('error', er);
@@ -309,32 +342,26 @@ function onwrite(stream, er) {
onwriteStateUpdate(state);
- if (er)
- onwriteError(stream, state, sync, er, cb);
- else {
+ if (er) onwriteError(stream, state, sync, er, cb);else {
// Check if we're actually ready to finish, but don't emit yet
- var finished = needFinish(stream, state);
+ var finished = needFinish(state);
- if (!finished &&
- !state.corked &&
- !state.bufferProcessing &&
- state.buffer.length) {
+ if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
clearBuffer(stream, state);
}
if (sync) {
- process.nextTick(function() {
- afterWrite(stream, state, finished, cb);
- });
+ /*<replacement>*/
+ asyncWrite(afterWrite, stream, state, finished, cb);
+ /*</replacement>*/
} else {
- afterWrite(stream, state, finished, cb);
- }
+ afterWrite(stream, state, finished, cb);
+ }
}
}
function afterWrite(stream, state, finished, cb) {
- if (!finished)
- onwriteDrain(stream, state);
+ if (!finished) onwriteDrain(stream, state);
state.pendingcb--;
cb();
finishMaybe(stream, state);
@@ -350,80 +377,83 @@ function onwriteDrain(stream, state) {
}
}
-
// if there's something in the buffer waiting, then process it
function clearBuffer(stream, state) {
state.bufferProcessing = true;
+ var entry = state.bufferedRequest;
- if (stream._writev && state.buffer.length > 1) {
+ if (stream._writev && entry && entry.next) {
// Fast case, write everything using _writev()
- var cbs = [];
- for (var c = 0; c < state.buffer.length; c++)
- cbs.push(state.buffer[c].callback);
+ var l = state.bufferedRequestCount;
+ var buffer = new Array(l);
+ var holder = state.corkedRequestsFree;
+ holder.entry = entry;
+
+ var count = 0;
+ while (entry) {
+ buffer[count] = entry;
+ entry = entry.next;
+ count += 1;
+ }
- // count the one we are adding, as well.
- // TODO(isaacs) clean this up
- state.pendingcb++;
- doWrite(stream, state, true, state.length, state.buffer, '', function(err) {
- for (var i = 0; i < cbs.length; i++) {
- state.pendingcb--;
- cbs[i](err);
- }
- });
+ doWrite(stream, state, true, state.length, buffer, '', holder.finish);
- // Clear buffer
- state.buffer = [];
+ // doWrite is almost always async, defer these to save a bit of time
+ // as the hot path ends with doWrite
+ state.pendingcb++;
+ state.lastBufferedRequest = null;
+ if (holder.next) {
+ state.corkedRequestsFree = holder.next;
+ holder.next = null;
+ } else {
+ state.corkedRequestsFree = new CorkedRequest(state);
+ }
} else {
// Slow case, write chunks one-by-one
- for (var c = 0; c < state.buffer.length; c++) {
- var entry = state.buffer[c];
+ while (entry) {
var chunk = entry.chunk;
var encoding = entry.encoding;
var cb = entry.callback;
var len = state.objectMode ? 1 : chunk.length;
doWrite(stream, state, false, len, chunk, encoding, cb);
-
+ entry = entry.next;
// if we didn't call the onwrite immediately, then
// it means that we need to wait until it does.
// also, that means that the chunk and cb are currently
// being processed, so move the buffer counter past them.
if (state.writing) {
- c++;
break;
}
}
- if (c < state.buffer.length)
- state.buffer = state.buffer.slice(c);
- else
- state.buffer.length = 0;
+ if (entry === null) state.lastBufferedRequest = null;
}
+ state.bufferedRequestCount = 0;
+ state.bufferedRequest = entry;
state.bufferProcessing = false;
}
-Writable.prototype._write = function(chunk, encoding, cb) {
+Writable.prototype._write = function (chunk, encoding, cb) {
cb(new Error('not implemented'));
-
};
Writable.prototype._writev = null;
-Writable.prototype.end = function(chunk, encoding, cb) {
+Writable.prototype.end = function (chunk, encoding, cb) {
var state = this._writableState;
- if (util.isFunction(chunk)) {
+ if (typeof chunk === 'function') {
cb = chunk;
chunk = null;
encoding = null;
- } else if (util.isFunction(encoding)) {
+ } else if (typeof encoding === 'function') {
cb = encoding;
encoding = null;
}
- if (!util.isNullOrUndefined(chunk))
- this.write(chunk, encoding);
+ if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
// .end() fully uncorks
if (state.corked) {
@@ -432,16 +462,11 @@ Writable.prototype.end = function(chunk, encoding, cb) {
}
// ignore unnecessary end() calls.
- if (!state.ending && !state.finished)
- endWritable(this, state, cb);
+ if (!state.ending && !state.finished) endWritable(this, state, cb);
};
-
-function needFinish(stream, state) {
- return (state.ending &&
- state.length === 0 &&
- !state.finished &&
- !state.writing);
+function needFinish(state) {
+ return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
}
function prefinish(stream, state) {
@@ -452,14 +477,15 @@ function prefinish(stream, state) {
}
function finishMaybe(stream, state) {
- var need = needFinish(stream, state);
+ var need = needFinish(state);
if (need) {
if (state.pendingcb === 0) {
prefinish(stream, state);
state.finished = true;
stream.emit('finish');
- } else
+ } else {
prefinish(stream, state);
+ }
}
return need;
}
@@ -468,10 +494,33 @@ function endWritable(stream, state, cb) {
state.ending = true;
finishMaybe(stream, state);
if (cb) {
- if (state.finished)
- process.nextTick(cb);
- else
- stream.once('finish', cb);
+ if (state.finished) processNextTick(cb);else stream.once('finish', cb);
}
state.ended = true;
+ stream.writable = false;
}
+
+// It seems a linked list but it is not
+// there will be only 2 of these for each stream
+function CorkedRequest(state) {
+ var _this = this;
+
+ this.next = null;
+ this.entry = null;
+
+ this.finish = function (err) {
+ var entry = _this.entry;
+ _this.entry = null;
+ while (entry) {
+ var cb = entry.callback;
+ state.pendingcb--;
+ cb(err);
+ entry = entry.next;
+ }
+ if (state.corkedRequestsFree) {
+ state.corkedRequestsFree.next = _this;
+ } else {
+ state.corkedRequestsFree = _this;
+ }
+ };
+} \ No newline at end of file
diff --git a/node_modules/readable-stream/lib/internal/streams/BufferList.js b/node_modules/readable-stream/lib/internal/streams/BufferList.js
new file mode 100644
index 000000000..e4bfcf02d
--- /dev/null
+++ b/node_modules/readable-stream/lib/internal/streams/BufferList.js
@@ -0,0 +1,64 @@
+'use strict';
+
+var Buffer = require('buffer').Buffer;
+/*<replacement>*/
+var bufferShim = require('buffer-shims');
+/*</replacement>*/
+
+module.exports = BufferList;
+
+function BufferList() {
+ this.head = null;
+ this.tail = null;
+ this.length = 0;
+}
+
+BufferList.prototype.push = function (v) {
+ var entry = { data: v, next: null };
+ if (this.length > 0) this.tail.next = entry;else this.head = entry;
+ this.tail = entry;
+ ++this.length;
+};
+
+BufferList.prototype.unshift = function (v) {
+ var entry = { data: v, next: this.head };
+ if (this.length === 0) this.tail = entry;
+ this.head = entry;
+ ++this.length;
+};
+
+BufferList.prototype.shift = function () {
+ if (this.length === 0) return;
+ var ret = this.head.data;
+ if (this.length === 1) this.head = this.tail = null;else this.head = this.head.next;
+ --this.length;
+ return ret;
+};
+
+BufferList.prototype.clear = function () {
+ this.head = this.tail = null;
+ this.length = 0;
+};
+
+BufferList.prototype.join = function (s) {
+ if (this.length === 0) return '';
+ var p = this.head;
+ var ret = '' + p.data;
+ while (p = p.next) {
+ ret += s + p.data;
+ }return ret;
+};
+
+BufferList.prototype.concat = function (n) {
+ if (this.length === 0) return bufferShim.alloc(0);
+ if (this.length === 1) return this.head.data;
+ var ret = bufferShim.allocUnsafe(n >>> 0);
+ var p = this.head;
+ var i = 0;
+ while (p) {
+ p.data.copy(ret, i);
+ i += p.data.length;
+ p = p.next;
+ }
+ return ret;
+}; \ No newline at end of file