aboutsummaryrefslogtreecommitdiff
path: root/node_modules/map-stream/package/test/simple-map.asynct.js
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/map-stream/package/test/simple-map.asynct.js')
-rw-r--r--node_modules/map-stream/package/test/simple-map.asynct.js318
1 files changed, 318 insertions, 0 deletions
diff --git a/node_modules/map-stream/package/test/simple-map.asynct.js b/node_modules/map-stream/package/test/simple-map.asynct.js
new file mode 100644
index 000000000..2b9a29223
--- /dev/null
+++ b/node_modules/map-stream/package/test/simple-map.asynct.js
@@ -0,0 +1,318 @@
+'use strict';
+
+var map = require('../')
+ , it = require('it-is')
+ , u = require('ubelt')
+ , spec = require('stream-spec')
+ , from = require('from')
+ , Stream = require('stream')
+ , es = require('event-stream')
+
+//REFACTOR THIS TEST TO USE es.readArray and es.writeArray
+
+function writeArray(array, stream) {
+
+ array.forEach( function (j) {
+ stream.write(j)
+ })
+ stream.end()
+
+}
+
+function readStream(stream, done) {
+
+ var array = []
+ stream.on('data', function (data) {
+ array.push(data)
+ })
+ stream.on('error', done)
+ stream.on('end', function (data) {
+ done(null, array)
+ })
+
+}
+
+//call sink on each write,
+//and complete when finished.
+
+function pauseStream (prob, delay) {
+ var pauseIf = (
+ 'number' == typeof prob
+ ? function () {
+ return Math.random() < prob
+ }
+ : 'function' == typeof prob
+ ? prob
+ : 0.1
+ )
+ var delayer = (
+ !delay
+ ? process.nextTick
+ : 'number' == typeof delay
+ ? function (next) { setTimeout(next, delay) }
+ : delay
+ )
+
+ return es.through(function (data) {
+ if(!this.paused && pauseIf()) {
+ console.log('PAUSE STREAM PAUSING')
+ this.pause()
+ var self = this
+ delayer(function () {
+ console.log('PAUSE STREAM RESUMING')
+ self.resume()
+ })
+ }
+ console.log("emit ('data', " + data + ')')
+ this.emit('data', data)
+ })
+}
+
+exports ['simple map applied to a stream'] = function (test) {
+
+ var input = [1,2,3,7,5,3,1,9,0,2,4,6]
+ //create event stream from
+
+ var doubler = map(function (data, cb) {
+ cb(null, data * 2)
+ })
+
+ spec(doubler).through().validateOnExit()
+
+ //a map is only a middle man, so it is both readable and writable
+
+ it(doubler).has({
+ readable: true,
+ writable: true,
+ })
+
+ readStream(doubler, function (err, output) {
+ it(output).deepEqual(input.map(function (j) {
+ return j * 2
+ }))
+// process.nextTick(x.validate)
+ test.done()
+ })
+
+ writeArray(input, doubler)
+
+}
+
+exports ['stream comes back in the correct order'] = function (test) {
+ var input = [3, 2, 1]
+
+ var delayer = map(function(data, cb){
+ setTimeout(function () {
+ cb(null, data)
+ }, 100 * data)
+ })
+
+ readStream(delayer, function (err, output) {
+ it(output).deepEqual(input)
+ test.done()
+ })
+
+ writeArray(input, delayer)
+}
+
+exports ['continues on error event with failures `true`'] = function (test) {
+ var input = [1, 2, 3]
+
+ var delayer = map(function(data, cb){
+ cb(new Error('Something gone wrong'), data)
+ }, { failures: true })
+
+ readStream(delayer, function (err, output) {
+ it(output).deepEqual(input)
+ test.done()
+ })
+
+ writeArray(input, delayer)
+}
+
+exports['pipe two maps together'] = function (test) {
+
+ var input = [1,2,3,7,5,3,1,9,0,2,4,6]
+ //create event stream from
+ function dd (data, cb) {
+ cb(null, data * 2)
+ }
+ var doubler1 = map(dd), doubler2 = map(dd)
+
+ doubler1.pipe(doubler2)
+
+ spec(doubler1).through().validateOnExit()
+ spec(doubler2).through().validateOnExit()
+
+ readStream(doubler2, function (err, output) {
+ it(output).deepEqual(input.map(function (j) {
+ return j * 4
+ }))
+ test.done()
+ })
+
+ writeArray(input, doubler1)
+
+}
+
+//next:
+//
+// test pause, resume and drian.
+//
+
+// then make a pipe joiner:
+//
+// plumber (evStr1, evStr2, evStr3, evStr4, evStr5)
+//
+// will return a single stream that write goes to the first
+
+exports ['map will not call end until the callback'] = function (test) {
+
+ var ticker = map(function (data, cb) {
+ process.nextTick(function () {
+ cb(null, data * 2)
+ })
+ })
+
+ spec(ticker).through().validateOnExit()
+
+ ticker.write('x')
+ ticker.end()
+
+ ticker.on('end', function () {
+ test.done()
+ })
+}
+
+exports ['emit failures with opts.failures === `ture`'] = function (test) {
+
+ var err = new Error('INTENSIONAL ERROR')
+ , mapper =
+ map(function () {
+ throw err
+ }, { failures: true })
+
+ mapper.on('failure', function (_err) {
+ it(_err).equal(err)
+ test.done()
+ })
+
+ mapper.write('hello')
+
+}
+
+exports ['emit error thrown'] = function (test) {
+
+ var err = new Error('INTENSIONAL ERROR')
+ , mapper =
+ map(function () {
+ throw err
+ })
+
+ mapper.on('error', function (_err) {
+ it(_err).equal(err)
+ test.done()
+ })
+
+ mapper.write('hello')
+
+}
+
+exports ['emit error calledback'] = function (test) {
+
+ var err = new Error('INTENSIONAL ERROR')
+ , mapper =
+ map(function (data, callback) {
+ callback(err)
+ })
+
+ mapper.on('error', function (_err) {
+ it(_err).equal(err)
+ test.done()
+ })
+
+ mapper.write('hello')
+
+}
+
+exports ['do not emit drain if not paused'] = function (test) {
+
+ var maps = map(function (data, callback) {
+ u.delay(callback)(null, 1)
+ return true
+ })
+
+ spec(maps).through().pausable().validateOnExit()
+
+ maps.on('drain', function () {
+ it(false).ok('should not emit drain unless the stream is paused')
+ })
+
+ it(maps.write('hello')).equal(true)
+ it(maps.write('hello')).equal(true)
+ it(maps.write('hello')).equal(true)
+ setTimeout(function () {maps.end()},10)
+ maps.on('end', test.done)
+}
+
+exports ['emits drain if paused, when all '] = function (test) {
+ var active = 0
+ var drained = false
+ var maps = map(function (data, callback) {
+ active ++
+ u.delay(function () {
+ active --
+ callback(null, 1)
+ })()
+ console.log('WRITE', false)
+ return false
+ })
+
+ spec(maps).through().validateOnExit()
+
+ maps.on('drain', function () {
+ drained = true
+ it(active).equal(0, 'should emit drain when all maps are done')
+ })
+
+ it(maps.write('hello')).equal(false)
+ it(maps.write('hello')).equal(false)
+ it(maps.write('hello')).equal(false)
+
+ process.nextTick(function () {maps.end()},10)
+
+ maps.on('end', function () {
+ console.log('end')
+ it(drained).ok('shoud have emitted drain before end')
+ test.done()
+ })
+
+}
+
+exports ['map applied to a stream with filtering'] = function (test) {
+
+ var input = [1,2,3,7,5,3,1,9,0,2,4,6]
+
+ var doubler = map(function (data, callback) {
+ if (data % 2)
+ callback(null, data * 2)
+ else
+ callback()
+ })
+
+ readStream(doubler, function (err, output) {
+ it(output).deepEqual(input.filter(function (j) {
+ return j % 2
+ }).map(function (j) {
+ return j * 2
+ }))
+ test.done()
+ })
+
+ spec(doubler).through().validateOnExit()
+
+ writeArray(input, doubler)
+
+}
+
+