294 lines
8.6 KiB
JavaScript
294 lines
8.6 KiB
JavaScript
'use strict';
|
|
const common = require('../common');
|
|
const assert = require('assert');
|
|
const dgram = require('dgram');
|
|
const util = require('util');
|
|
|
|
if (common.inFreeBSDJail) {
|
|
common.skip('in a FreeBSD jail');
|
|
return;
|
|
}
|
|
|
|
// All SunOS systems must be able to pass this manual test before the
|
|
// following barrier can be removed:
|
|
// $ socat UDP-RECVFROM:12356,ip-add-membership=224.0.0.115:127.0.0.1,fork \
|
|
// EXEC:hostname &
|
|
// $ echo hi |socat STDIO \
|
|
// UDP4-DATAGRAM:224.0.0.115:12356,ip-multicast-if=127.0.0.1
|
|
|
|
if (common.isSunOS) {
|
|
common.skip('SunOs is not correctly delivering to loopback multicast.');
|
|
return;
|
|
}
|
|
|
|
const networkInterfaces = require('os').networkInterfaces();
|
|
const fork = require('child_process').fork;
|
|
const MULTICASTS = {
|
|
IPv4: ['224.0.0.115', '224.0.0.116', '224.0.0.117'],
|
|
IPv6: ['ff02::1:115', 'ff02::1:116', 'ff02::1:117']
|
|
};
|
|
const LOOPBACK = { IPv4: '127.0.0.1', IPv6: '::1' };
|
|
const ANY = { IPv4: '0.0.0.0', IPv6: '::' };
|
|
const FAM = 'IPv4';
|
|
|
|
// Windows wont bind on multicasts so its filtering is by port.
|
|
const PORTS = {};
|
|
for (let i = 0; i < MULTICASTS[FAM].length; i++) {
|
|
PORTS[MULTICASTS[FAM][i]] = common.PORT + (common.isWindows ? i : 0);
|
|
}
|
|
|
|
const UDP = { IPv4: 'udp4', IPv6: 'udp6' };
|
|
|
|
const TIMEOUT = common.platformTimeout(5000);
|
|
const NOW = Date.now();
|
|
const TMPL = (tail) => `${NOW} - ${tail}`;
|
|
|
|
// Take the first non-internal interface as the other interface to isolate
|
|
// from loopback. Ideally, this should check for whether or not this interface
|
|
// and the loopback have the MULTICAST flag.
|
|
const interfaceAddress = ((networkInterfaces) => {
|
|
for (const name in networkInterfaces) {
|
|
for (const localInterface of networkInterfaces[name]) {
|
|
if (!localInterface.internal && localInterface.family === FAM) {
|
|
let interfaceAddress = localInterface.address;
|
|
// On Windows, IPv6 would need: `%${localInterface.scopeid}`
|
|
if (FAM === 'IPv6')
|
|
interfaceAddress += `${interfaceAddress}%${name}`;
|
|
return interfaceAddress;
|
|
}
|
|
}
|
|
}
|
|
})(networkInterfaces);
|
|
|
|
assert.ok(interfaceAddress);
|
|
|
|
const messages = [
|
|
{ tail: 'First message to send', mcast: MULTICASTS[FAM][0], rcv: true },
|
|
{ tail: 'Second message to send', mcast: MULTICASTS[FAM][0], rcv: true },
|
|
{ tail: 'Third message to send', mcast: MULTICASTS[FAM][1], rcv: true,
|
|
newAddr: interfaceAddress },
|
|
{ tail: 'Fourth message to send', mcast: MULTICASTS[FAM][2] },
|
|
{ tail: 'Fifth message to send', mcast: MULTICASTS[FAM][1], rcv: true },
|
|
{ tail: 'Sixth message to send', mcast: MULTICASTS[FAM][2], rcv: true,
|
|
newAddr: LOOPBACK[FAM] }
|
|
];
|
|
|
|
|
|
if (process.argv[2] !== 'child') {
|
|
const IFACES = [ANY[FAM], interfaceAddress, LOOPBACK[FAM]];
|
|
const workers = {};
|
|
const listeners = MULTICASTS[FAM].length * 2;
|
|
let listening = 0;
|
|
let dead = 0;
|
|
let i = 0;
|
|
let done = 0;
|
|
let timer = null;
|
|
|
|
const killSubprocesses = (subprocesses) => {
|
|
for (const i in subprocesses)
|
|
subprocesses[i].kill();
|
|
};
|
|
|
|
// Exit the test if it doesn't succeed within the TIMEOUT.
|
|
timer = setTimeout(() => {
|
|
console.error('[PARENT] Responses were not received within %d ms.',
|
|
TIMEOUT);
|
|
console.error('[PARENT] Skip');
|
|
|
|
killSubprocesses(workers);
|
|
common.skip('Check filter policy');
|
|
|
|
process.exit(1);
|
|
}, TIMEOUT);
|
|
|
|
// Launch the child processes.
|
|
for (let i = 0; i < listeners; i++) {
|
|
const IFACE = IFACES[i % IFACES.length];
|
|
const MULTICAST = MULTICASTS[FAM][i % MULTICASTS[FAM].length];
|
|
|
|
const messagesNeeded = messages.filter((m) => m.rcv &&
|
|
m.mcast === MULTICAST)
|
|
.map((m) => TMPL(m.tail));
|
|
const worker = fork(process.argv[1],
|
|
['child',
|
|
IFACE,
|
|
MULTICAST,
|
|
messagesNeeded.length,
|
|
NOW]);
|
|
workers[worker.pid] = worker;
|
|
|
|
worker.messagesReceived = [];
|
|
worker.messagesNeeded = messagesNeeded;
|
|
|
|
// Handle the death of workers.
|
|
worker.on('exit', (code) => {
|
|
// Don't consider this a true death if the worker has finished
|
|
// successfully or if the exit code is 0.
|
|
if (worker.isDone || code === 0) {
|
|
return;
|
|
}
|
|
|
|
dead += 1;
|
|
console.error('[PARENT] Worker %d died. %d dead of %d',
|
|
worker.pid,
|
|
dead,
|
|
listeners);
|
|
|
|
if (dead === listeners) {
|
|
console.error('[PARENT] All workers have died.');
|
|
console.error('[PARENT] Fail');
|
|
|
|
killSubprocesses(workers);
|
|
|
|
process.exit(1);
|
|
}
|
|
});
|
|
|
|
worker.on('message', (msg) => {
|
|
if (msg.listening) {
|
|
listening += 1;
|
|
|
|
if (listening === listeners) {
|
|
// All child process are listening, so start sending.
|
|
sendSocket.sendNext();
|
|
}
|
|
} else if (msg.message) {
|
|
worker.messagesReceived.push(msg.message);
|
|
|
|
if (worker.messagesReceived.length === worker.messagesNeeded.length) {
|
|
done += 1;
|
|
worker.isDone = true;
|
|
console.error('[PARENT] %d received %d messages total.',
|
|
worker.pid,
|
|
worker.messagesReceived.length);
|
|
}
|
|
|
|
if (done === listeners) {
|
|
console.error('[PARENT] All workers have received the ' +
|
|
'required number of ' +
|
|
'messages. Will now compare.');
|
|
|
|
Object.keys(workers).forEach((pid) => {
|
|
const worker = workers[pid];
|
|
|
|
let count = 0;
|
|
|
|
worker.messagesReceived.forEach((buf) => {
|
|
for (let i = 0; i < worker.messagesNeeded.length; ++i) {
|
|
if (buf.toString() === worker.messagesNeeded[i]) {
|
|
count++;
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
console.error('[PARENT] %d received %d matching messages.',
|
|
worker.pid,
|
|
count);
|
|
|
|
assert.strictEqual(count, worker.messagesNeeded.length,
|
|
'A worker received ' +
|
|
'an invalid multicast message');
|
|
});
|
|
|
|
clearTimeout(timer);
|
|
console.error('[PARENT] Success');
|
|
killSubprocesses(workers);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
const sendSocket = dgram.createSocket({
|
|
type: UDP[FAM],
|
|
reuseAddr: true
|
|
});
|
|
|
|
// Don't bind the address explicitly when sending and start with
|
|
// the OSes default multicast interface selection.
|
|
sendSocket.bind(common.PORT, ANY[FAM]);
|
|
sendSocket.on('listening', () => {
|
|
console.error(`outgoing iface ${interfaceAddress}`);
|
|
});
|
|
|
|
sendSocket.on('close', () => {
|
|
console.error('[PARENT] sendSocket closed');
|
|
});
|
|
|
|
sendSocket.sendNext = () => {
|
|
const msg = messages[i++];
|
|
|
|
if (!msg) {
|
|
sendSocket.close();
|
|
return;
|
|
}
|
|
console.error(TMPL(NOW, msg.tail));
|
|
const buf = Buffer.from(TMPL(msg.tail));
|
|
if (msg.newAddr) {
|
|
console.error(`changing outgoing multicast ${msg.newAddr}`);
|
|
sendSocket.setMulticastInterface(msg.newAddr);
|
|
}
|
|
sendSocket.send(
|
|
buf,
|
|
0,
|
|
buf.length,
|
|
PORTS[msg.mcast],
|
|
msg.mcast,
|
|
(err) => {
|
|
assert.ifError(err);
|
|
console.error('[PARENT] sent %s to %s:%s',
|
|
util.inspect(buf.toString()),
|
|
msg.mcast, PORTS[msg.mcast]);
|
|
|
|
process.nextTick(sendSocket.sendNext);
|
|
}
|
|
);
|
|
};
|
|
}
|
|
|
|
if (process.argv[2] === 'child') {
|
|
const IFACE = process.argv[3];
|
|
const MULTICAST = process.argv[4];
|
|
const NEEDEDMSGS = Number(process.argv[5]);
|
|
const SESSION = Number(process.argv[6]);
|
|
const receivedMessages = [];
|
|
|
|
console.error(`pid ${process.pid} iface ${IFACE} MULTICAST ${MULTICAST}`);
|
|
const listenSocket = dgram.createSocket({
|
|
type: UDP[FAM],
|
|
reuseAddr: true
|
|
});
|
|
|
|
listenSocket.on('message', (buf, rinfo) => {
|
|
// Examine udp messages only when they were sent by the parent.
|
|
if (!buf.toString().startsWith(SESSION)) return;
|
|
|
|
console.error('[CHILD] %s received %s from %j',
|
|
process.pid,
|
|
util.inspect(buf.toString()),
|
|
rinfo);
|
|
|
|
receivedMessages.push(buf);
|
|
|
|
let closecb;
|
|
|
|
if (receivedMessages.length === NEEDEDMSGS) {
|
|
listenSocket.close();
|
|
closecb = () => process.exit();
|
|
}
|
|
|
|
process.send({ message: buf.toString() }, closecb);
|
|
});
|
|
|
|
|
|
listenSocket.on('listening', () => {
|
|
listenSocket.addMembership(MULTICAST, IFACE);
|
|
process.send({ listening: true });
|
|
});
|
|
|
|
if (common.isWindows)
|
|
listenSocket.bind(PORTS[MULTICAST], ANY[FAM]);
|
|
else
|
|
listenSocket.bind(common.PORT, MULTICAST);
|
|
}
|