nodejs/test/parallel/test-stream-pipeline.js

541 lines
9.6 KiB
JavaScript

'use strict';
const common = require('../common');
const {
Stream,
Writable,
Readable,
Transform,
pipeline,
PassThrough
} = require('stream');
const assert = require('assert');
const http = require('http');
const { promisify } = require('util');
{
let finished = false;
const processed = [];
const expected = [
Buffer.from('a'),
Buffer.from('b'),
Buffer.from('c')
];
const read = new Readable({
read() {}
});
const write = new Writable({
write(data, enc, cb) {
processed.push(data);
cb();
}
});
write.on('finish', () => {
finished = true;
});
for (let i = 0; i < expected.length; i++) {
read.push(expected[i]);
}
read.push(null);
pipeline(read, write, common.mustCall((err) => {
assert.ok(!err, 'no error');
assert.ok(finished);
assert.deepStrictEqual(processed, expected);
}));
}
{
const read = new Readable({
read() {}
});
assert.throws(() => {
pipeline(read, () => {});
}, /ERR_MISSING_ARGS/);
assert.throws(() => {
pipeline(() => {});
}, /ERR_MISSING_ARGS/);
assert.throws(() => {
pipeline();
}, /ERR_INVALID_CALLBACK/);
}
{
const read = new Readable({
read() {}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.push('data');
setImmediate(() => read.destroy());
pipeline(read, write, common.mustCall((err) => {
assert.ok(err, 'should have an error');
}));
}
{
const read = new Readable({
read() {}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.push('data');
setImmediate(() => read.destroy(new Error('kaboom')));
const dst = pipeline(read, write, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
assert.strictEqual(dst, write);
}
{
const read = new Readable({
read() {}
});
const transform = new Transform({
transform(data, enc, cb) {
cb(new Error('kaboom'));
}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.on('close', common.mustCall());
transform.on('close', common.mustCall());
write.on('close', common.mustCall());
[read, transform, write].forEach((stream) => {
stream.on('error', common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
});
const dst = pipeline(read, transform, write, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
}));
assert.strictEqual(dst, write);
read.push('hello');
}
{
const server = http.createServer((req, res) => {
const rs = new Readable({
read() {
rs.push('hello');
rs.push(null);
}
});
pipeline(rs, res, () => {});
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
req.end();
req.on('response', (res) => {
const buf = [];
res.on('data', (data) => buf.push(data));
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(
Buffer.concat(buf),
Buffer.from('hello')
);
server.close();
}));
});
});
}
{
const server = http.createServer((req, res) => {
let sent = false;
const rs = new Readable({
read() {
if (sent) {
return;
}
sent = true;
rs.push('hello');
},
destroy: common.mustCall((err, cb) => {
// Prevents fd leaks by destroying http pipelines
cb();
})
});
pipeline(rs, res, () => {});
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
req.end();
req.on('response', (res) => {
setImmediate(() => {
res.destroy();
server.close();
});
});
});
}
{
const server = http.createServer((req, res) => {
let sent = 0;
const rs = new Readable({
read() {
if (sent++ > 10) {
return;
}
rs.push('hello');
},
destroy: common.mustCall((err, cb) => {
cb();
})
});
pipeline(rs, res, () => {});
});
let cnt = 10;
const badSink = new Writable({
write(data, enc, cb) {
cnt--;
if (cnt === 0) cb(new Error('kaboom'));
else cb();
}
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
req.end();
req.on('response', (res) => {
pipeline(res, badSink, common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
server.close();
}));
});
});
}
{
const server = http.createServer((req, res) => {
pipeline(req, res, common.mustCall());
});
server.listen(0, () => {
const req = http.request({
port: server.address().port
});
let sent = 0;
const rs = new Readable({
read() {
if (sent++ > 10) {
return;
}
rs.push('hello');
}
});
pipeline(rs, req, common.mustCall(() => {
server.close();
}));
req.on('response', (res) => {
let cnt = 10;
res.on('data', () => {
cnt--;
if (cnt === 0) rs.destroy();
});
});
});
}
{
const makeTransform = () => {
const tr = new Transform({
transform(data, enc, cb) {
cb(null, data);
}
});
tr.on('close', common.mustCall());
return tr;
};
const rs = new Readable({
read() {
rs.push('hello');
}
});
let cnt = 10;
const ws = new Writable({
write(data, enc, cb) {
cnt--;
if (cnt === 0) return cb(new Error('kaboom'));
cb();
}
});
rs.on('close', common.mustCall());
ws.on('close', common.mustCall());
pipeline(
rs,
makeTransform(),
makeTransform(),
makeTransform(),
makeTransform(),
makeTransform(),
makeTransform(),
ws,
common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('kaboom'));
})
);
}
{
const oldStream = new Stream();
oldStream.pause = oldStream.resume = () => {};
oldStream.write = (data) => {
oldStream.emit('data', data);
return true;
};
oldStream.end = () => {
oldStream.emit('end');
};
const expected = [
Buffer.from('hello'),
Buffer.from('world')
];
const rs = new Readable({
read() {
for (let i = 0; i < expected.length; i++) {
rs.push(expected[i]);
}
rs.push(null);
}
});
const ws = new Writable({
write(data, enc, cb) {
assert.deepStrictEqual(data, expected.shift());
cb();
}
});
let finished = false;
ws.on('finish', () => {
finished = true;
});
pipeline(
rs,
oldStream,
ws,
common.mustCall((err) => {
assert(!err, 'no error');
assert(finished, 'last stream finished');
})
);
}
{
const oldStream = new Stream();
oldStream.pause = oldStream.resume = () => {};
oldStream.write = (data) => {
oldStream.emit('data', data);
return true;
};
oldStream.end = () => {
oldStream.emit('end');
};
const destroyableOldStream = new Stream();
destroyableOldStream.pause = destroyableOldStream.resume = () => {};
destroyableOldStream.destroy = common.mustCall(() => {
destroyableOldStream.emit('close');
});
destroyableOldStream.write = (data) => {
destroyableOldStream.emit('data', data);
return true;
};
destroyableOldStream.end = () => {
destroyableOldStream.emit('end');
};
const rs = new Readable({
read() {
rs.destroy(new Error('stop'));
}
});
const ws = new Writable({
write(data, enc, cb) {
cb();
}
});
let finished = false;
ws.on('finish', () => {
finished = true;
});
pipeline(
rs,
oldStream,
destroyableOldStream,
ws,
common.mustCall((err) => {
assert.deepStrictEqual(err, new Error('stop'));
assert(!finished, 'should not finish');
})
);
}
{
const pipelinePromise = promisify(pipeline);
async function run() {
const read = new Readable({
read() {}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
read.push('data');
read.push(null);
let finished = false;
write.on('finish', () => {
finished = true;
});
await pipelinePromise(read, write);
assert(finished);
}
run();
}
{
const read = new Readable({
read() {}
});
const transform = new Transform({
transform(data, enc, cb) {
cb(new Error('kaboom'));
}
});
const write = new Writable({
write(data, enc, cb) {
cb();
}
});
assert.throws(
() => pipeline(read, transform, write),
{ code: 'ERR_INVALID_CALLBACK' }
);
}
{
const server = http.Server(function(req, res) {
res.write('asd');
});
server.listen(0, function() {
http.get({ port: this.address().port }, (res) => {
const stream = new PassThrough();
// NOTE: 2 because Node 12 streams can emit 'error'
// multiple times.
stream.on('error', common.mustCall(2));
pipeline(
res,
stream,
common.mustCall((err) => {
assert.ok(err);
// TODO(ronag):
// assert.strictEqual(err.message, 'oh no');
server.close();
})
);
stream.destroy(new Error('oh no'));
}).on('error', common.mustNotCall());
});
}
{
const r = new Readable({
read() {}
});
r.push('hello');
r.push('world');
r.push(null);
let res = '';
const w = new Writable({
write(chunk, encoding, callback) {
res += chunk;
callback();
}
});
pipeline([r, w], common.mustCall((err) => {
assert.ok(!err);
assert.strictEqual(res, 'helloworld');
}));
}