Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: fix edge cases that throw ERR_INTERNAL_ASSERTION #36764

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions lib/internal/cluster/child.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {
ObjectAssign,
ReflectApply,
SafeMap,
SafeSet,
} = primordials;

const assert = require('internal/assert');
Expand Down Expand Up @@ -74,14 +75,14 @@ cluster._getServer = function(obj, options, cb) {
options.fd,
], ':');

let index = indexes.get(indexesKey);
let indexSet = indexes.get(indexesKey);

if (index === undefined)
index = 0;
else
index++;

indexes.set(indexesKey, index);
if (indexSet === undefined) {
indexSet = { nextIndex: 0, set: new SafeSet() };
indexes.set(indexesKey, indexSet);
}
const index = indexSet.nextIndex++;
indexSet.set.add(index);

const message = {
act: 'queryServer',
Expand All @@ -101,9 +102,9 @@ cluster._getServer = function(obj, options, cb) {
obj._setServerData(reply.data);

if (handle)
shared(reply, handle, indexesKey, cb); // Shared listen socket.
shared(reply, handle, indexesKey, index, cb); // Shared listen socket.
else
rr(reply, indexesKey, cb); // Round-robin.
rr(reply, indexesKey, index, cb); // Round-robin.
});

obj.once('listening', () => {
Expand All @@ -115,8 +116,20 @@ cluster._getServer = function(obj, options, cb) {
});
};

function removeIndexesKey(indexesKey, index) {
const indexSet = indexes.get(indexesKey);
if (!indexSet) {
return;
}

indexSet.set.delete(index);
if (indexSet.set.size === 0) {
indexes.delete(indexesKey);
}
}

// Shared listen socket.
function shared(message, handle, indexesKey, cb) {
function shared(message, handle, indexesKey, index, cb) {
const key = message.key;
// Monkey-patch the close() method so we can keep track of when it's
// closed. Avoids resource leaks when the handle is short-lived.
Expand All @@ -125,7 +138,7 @@ function shared(message, handle, indexesKey, cb) {
handle.close = function() {
send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
removeIndexesKey(indexesKey, index);
return ReflectApply(close, handle, arguments);
};
assert(handles.has(key) === false);
Expand All @@ -134,7 +147,7 @@ function shared(message, handle, indexesKey, cb) {
}

// Round-robin. Primary distributes handles across workers.
function rr(message, indexesKey, cb) {
function rr(message, indexesKey, index, cb) {
if (message.errno)
return cb(message.errno, null);

Expand All @@ -158,7 +171,7 @@ function rr(message, indexesKey, cb) {

send({ act: 'close', key });
handles.delete(key);
indexes.delete(indexesKey);
removeIndexesKey(indexesKey, index);
key = undefined;
}

Expand Down
40 changes: 40 additions & 0 deletions test/parallel/test-cluster-child-index-dgram.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
if (common.isWindows)
common.skip('dgram clustering is currently not supported on Windows.');

const cluster = require('cluster');
const dgram = require('dgram');

// Test an edge case when using `cluster` and `dgram.Socket.bind()`
// the port of `0`.
const kPort = 0;

function child() {
const kTime = 2;
const countdown = new Countdown(kTime * 2, () => {
process.exit(0);
});
for (let i = 0; i < kTime; i += 1) {
const socket = new dgram.Socket('udp4');
socket.bind(kPort, common.mustCall(() => {
// `process.nextTick()` or `socket2.close()` would throw
// ERR_SOCKET_DGRAM_NOT_RUNNING
process.nextTick(() => {
socket.close(countdown.dec());
const socket2 = new dgram.Socket('udp4');
socket2.bind(kPort, common.mustCall(() => {
process.nextTick(() => {
socket2.close(countdown.dec());
});
}));
});
}));
}
}

if (cluster.isMaster)
cluster.fork(__filename);
else
child();
31 changes: 31 additions & 0 deletions test/parallel/test-cluster-child-index-net.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
const cluster = require('cluster');
const net = require('net');

// Test an edge case when using `cluster` and `net.Server.listen()` to
// the port of `0`.
const kPort = 0;

function child() {
const kTime = 2;
const countdown = new Countdown(kTime * 2, () => {
process.exit(0);
});
for (let i = 0; i < kTime; i += 1) {
const server = net.createServer();
server.listen(kPort, common.mustCall(() => {
server.close(countdown.dec());
const server2 = net.createServer();
server2.listen(kPort, common.mustCall(() => {
server2.close(countdown.dec());
}));
}));
}
}

if (cluster.isMaster)
cluster.fork(__filename);
else
child();