aboutsummaryrefslogtreecommitdiff
path: root/node_modules/map-stream/package/index.js
diff options
context:
space:
mode:
authorFlorian Dold <florian.dold@gmail.com>2017-08-14 05:01:11 +0200
committerFlorian Dold <florian.dold@gmail.com>2017-08-14 05:02:09 +0200
commit363723fc84f7b8477592e0105aeb331ec9a017af (patch)
tree29f92724f34131bac64d6a318dd7e30612e631c7 /node_modules/map-stream/package/index.js
parent5634e77ad96bfe1818f6b6ee70b7379652e5487f (diff)
node_modules
Diffstat (limited to 'node_modules/map-stream/package/index.js')
-rw-r--r--node_modules/map-stream/package/index.js145
1 files changed, 0 insertions, 145 deletions
diff --git a/node_modules/map-stream/package/index.js b/node_modules/map-stream/package/index.js
deleted file mode 100644
index 09ae829de..000000000
--- a/node_modules/map-stream/package/index.js
+++ /dev/null
@@ -1,145 +0,0 @@
-//filter will reemit the data if cb(err,pass) pass is truthy
-
-// reduce is more tricky
-// maybe we want to group the reductions or emit progress updates occasionally
-// the most basic reduce just emits one 'data' event after it has recieved 'end'
-
-
-var Stream = require('stream').Stream
-
-
-//create an event stream and apply function to each .write
-//emitting each response as data
-//unless it's an empty callback
-
-module.exports = function (mapper, opts) {
-
- var stream = new Stream()
- , self = this
- , inputs = 0
- , outputs = 0
- , ended = false
- , paused = false
- , destroyed = false
- , lastWritten = 0
- , inNext = false
-
- this.opts = opts || {};
- var errorEventName = this.opts.failures ? 'failure' : 'error';
-
- // Items that are not ready to be written yet (because they would come out of
- // order) get stuck in a queue for later.
- var writeQueue = {}
-
- stream.writable = true
- stream.readable = true
-
- function queueData (data, number) {
- var nextToWrite = lastWritten + 1
-
- if (number === nextToWrite) {
- // If it's next, and its not undefined write it
- if (data !== undefined) {
- stream.emit.apply(stream, ['data', data])
- }
- lastWritten ++
- nextToWrite ++
- } else {
- // Otherwise queue it for later.
- writeQueue[number] = data
- }
-
- // If the next value is in the queue, write it
- if (writeQueue.hasOwnProperty(nextToWrite)) {
- var dataToWrite = writeQueue[nextToWrite]
- delete writeQueue[nextToWrite]
- return queueData(dataToWrite, nextToWrite)
- }
-
- outputs ++
- if(inputs === outputs) {
- if(paused) paused = false, stream.emit('drain') //written all the incoming events
- if(ended) end()
- }
- }
-
- function next (err, data, number) {
- if(destroyed) return
- inNext = true
-
- if (!err || self.opts.failures) {
- queueData(data, number)
- }
-
- if (err) {
- stream.emit.apply(stream, [ errorEventName, err ]);
- }
-
- inNext = false;
- }
-
- // Wrap the mapper function by calling its callback with the order number of
- // the item in the stream.
- function wrappedMapper (input, number, callback) {
- return mapper.call(null, input, function(err, data){
- callback(err, data, number)
- })
- }
-
- stream.write = function (data) {
- if(ended) throw new Error('map stream is not writable')
- inNext = false
- inputs ++
-
- try {
- //catch sync errors and handle them like async errors
- var written = wrappedMapper(data, inputs, next)
- paused = (written === false)
- return !paused
- } catch (err) {
- //if the callback has been called syncronously, and the error
- //has occured in an listener, throw it again.
- if(inNext)
- throw err
- next(err)
- return !paused
- }
- }
-
- function end (data) {
- //if end was called with args, write it,
- ended = true //write will emit 'end' if ended is true
- stream.writable = false
- if(data !== undefined) {
- return queueData(data, inputs)
- } else if (inputs == outputs) { //wait for processing
- stream.readable = false, stream.emit('end'), stream.destroy()
- }
- }
-
- stream.end = function (data) {
- if(ended) return
- end()
- }
-
- stream.destroy = function () {
- ended = destroyed = true
- stream.writable = stream.readable = paused = false
- process.nextTick(function () {
- stream.emit('close')
- })
- }
- stream.pause = function () {
- paused = true
- }
-
- stream.resume = function () {
- paused = false
- }
-
- return stream
-}
-
-
-
-