1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
var assert = require('assert');
var pipe = require('..');
var Stream = require('stream');
describe('pipe(a)', function(){
it('should return a', function(){
var readable = Readable();
var stream = pipe(readable);
assert.equal(stream, readable);
});
});
describe('pipe(a, b, c)', function(){
it('should pipe internally', function(done){
pipe(Readable(), Transform(), Writable(done));
});
it('should be writable', function(done){
var stream = pipe(Transform(), Writable(done));
assert(stream.writable);
Readable().pipe(stream);
});
it('should be readable', function(done){
var stream = pipe(Readable(), Transform());
assert(stream.readable);
stream.pipe(Writable(done));
});
it('should be readable and writable', function(done){
var stream = pipe(Transform(), Transform());
assert(stream.readable);
assert(stream.writable);
Readable()
.pipe(stream)
.pipe(Writable(done));
});
describe('errors', function(){
it('should reemit', function(done){
var a = Transform();
var b = Transform();
var c = Transform();
var stream = pipe(a, b, c);
var err = new Error;
var i = 0;
stream.on('error', function(_err){
i++;
assert.equal(_err, err);
assert(i <= 3);
if (i == 3) done();
});
a.emit('error', err);
b.emit('error', err);
c.emit('error', err);
});
it('should not reemit endlessly', function(done){
var a = Transform();
var b = Transform();
var c = Transform();
c.readable = false;
var stream = pipe(a, b, c);
var err = new Error;
var i = 0;
stream.on('error', function(_err){
i++;
assert.equal(_err, err);
assert(i <= 3);
if (i == 3) done();
});
a.emit('error', err);
b.emit('error', err);
c.emit('error', err);
});
});
});
describe('pipe(a, b, c, fn)', function(){
it('should call on finish', function(done){
var finished = false;
var a = Readable();
var b = Transform();
var c = Writable(function(){
finished = true;
});
pipe(a, b, c, function(err){
assert(!err);
assert(finished);
done();
});
});
it('should call with error once', function(done){
var a = Readable();
var b = Transform();
var c = Writable();
var err = new Error;
pipe(a, b, c, function(err){
assert(err);
done();
});
a.emit('error', err);
b.emit('error', err);
c.emit('error', err);
});
});
function Readable(){
var readable = new Stream.Readable({ objectMode: true });
readable._read = function(){
this.push('a');
this.push(null);
};
return readable;
}
function Transform(){
var transform = new Stream.Transform({ objectMode: true });
transform._transform = function(chunk, _, done){
done(null, chunk.toUpperCase());
};
return transform;
}
function Writable(cb){
var writable = new Stream.Writable({ objectMode: true });
writable._write = function(chunk, _, done){
assert.equal(chunk, 'A');
done();
cb();
};
return writable;
}
|