Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dgram: implement socket.bind({ fd }) #21745

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions doc/api/dgram.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ added: v0.11.14
* `port` {integer}
* `address` {string}
* `exclusive` {boolean}
* `fd` {integer}
* `callback` {Function}

For UDP sockets, causes the `dgram.Socket` to listen for datagram
Expand All @@ -177,6 +178,11 @@ system will attempt to listen on all addresses. Once binding is
complete, a `'listening'` event is emitted and the optional `callback`
function is called.

The `options` object may contain a `fd` property. When a `fd` greater
than `0` is set, it will wrap around an existing socket with the given
file descriptor. In this case, the properties of `port` and `address`
will be ignored.

Note that specifying both a `'listening'` event listener and passing a
`callback` to the `socket.bind()` method is not harmful but not very
useful.
Expand Down
91 changes: 72 additions & 19 deletions lib/dgram.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const errors = require('internal/errors');
const {
kStateSymbol,
_createSocketHandle,
newHandle
newHandle,
guessHandleType,
} = require('internal/dgram');
const {
ERR_INVALID_ARG_TYPE,
Expand All @@ -35,7 +36,8 @@ const {
ERR_SOCKET_BAD_PORT,
ERR_SOCKET_BUFFER_SIZE,
ERR_SOCKET_CANNOT_SEND,
ERR_SOCKET_DGRAM_NOT_RUNNING
ERR_SOCKET_DGRAM_NOT_RUNNING,
ERR_INVALID_FD_TYPE
} = errors.codes;
const { Buffer } = require('buffer');
const util = require('util');
Expand All @@ -45,6 +47,7 @@ const {
defaultTriggerAsyncIdScope,
symbols: { async_id_symbol }
} = require('internal/async_hooks');
const { isInt32 } = require('internal/validators');
const { UV_UDP_REUSEADDR } = process.binding('constants').os;

const { UDP, SendWrap } = process.binding('udp_wrap');
Expand Down Expand Up @@ -151,6 +154,28 @@ function bufferSize(self, size, buffer) {
return ret;
}

// Query master process to get the server handle and utilize it.
function bindServerHandle(self, options, errCb) {
if (!cluster)
cluster = require('cluster');

const state = self[kStateSymbol];
cluster._getServer(self, options, (err, handle) => {
if (err) {
errCb(err);
return;
}

if (!state.handle) {
// Handle has been closed in the mean time.
return handle.close();
}

replaceHandle(self, handle);
startListening(self);
});
}

Socket.prototype.bind = function(port_, address_ /* , callback */) {
let port = port_;

Expand All @@ -171,6 +196,44 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
return this;
}

// Open an existing fd instead of creating a new one.
if (port !== null && typeof port === 'object' &&
isInt32(port.fd) && port.fd > 0) {
const fd = port.fd;
const exclusive = !!port.exclusive;
const state = this[kStateSymbol];

if (!cluster)
cluster = require('cluster');

if (cluster.isWorker && !exclusive) {
bindServerHandle(this, {
address: null,
port: null,
addressType: this.type,
fd,
flags: null
}, (err) => {
// Callback to handle error.
const ex = errnoException(err, 'open');
this.emit('error', ex);
state.bindState = BIND_STATE_UNBOUND;
});
return this;
}

const type = guessHandleType(fd);
if (type !== 'UDP')
throw new ERR_INVALID_FD_TYPE(type);
const err = state.handle.open(fd);

if (err)
throw errnoException(err, 'open');

startListening(this);
return this;
}

var address;
var exclusive;

Expand Down Expand Up @@ -207,28 +270,18 @@ Socket.prototype.bind = function(port_, address_ /* , callback */) {
flags |= UV_UDP_REUSEADDR;

if (cluster.isWorker && !exclusive) {
const onHandle = (err, handle) => {
if (err) {
var ex = exceptionWithHostPort(err, 'bind', ip, port);
this.emit('error', ex);
state.bindState = BIND_STATE_UNBOUND;
return;
}

if (!state.handle)
// handle has been closed in the mean time.
return handle.close();

replaceHandle(this, handle);
startListening(this);
};
cluster._getServer(this, {
bindServerHandle(this, {
address: ip,
port: port,
addressType: this.type,
fd: -1,
flags: flags
}, onHandle);
}, (err) => {
// Callback to handle error.
const ex = exceptionWithHostPort(err, 'bind', ip, port);
this.emit('error', ex);
state.bindState = BIND_STATE_UNBOUND;
});
} else {
if (!state.handle)
return; // handle has been closed in the mean time
Expand Down
39 changes: 27 additions & 12 deletions lib/internal/dgram.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
'use strict';
const assert = require('assert');
const { codes } = require('internal/errors');
const { UDP } = process.binding('udp_wrap');
const { isInt32 } = require('internal/validators');
const TTYWrap = process.binding('tty_wrap');
const { UV_EINVAL } = process.binding('uv');
const { ERR_INVALID_ARG_TYPE, ERR_SOCKET_BAD_TYPE } = codes;
const kStateSymbol = Symbol('state symbol');
let dns; // Lazy load for startup performance.
Expand All @@ -17,6 +19,9 @@ function lookup6(lookup, address, callback) {
}


const guessHandleType = TTYWrap.guessHandleType;


function newHandle(type, lookup) {
if (lookup === undefined) {
if (dns === undefined) {
Expand Down Expand Up @@ -49,22 +54,32 @@ function newHandle(type, lookup) {


function _createSocketHandle(address, port, addressType, fd, flags) {
// Opening an existing fd is not supported for UDP handles.
assert(typeof fd !== 'number' || fd < 0);

const handle = newHandle(addressType);

if (port || address) {
const err = handle.bind(address, port || 0, flags);

if (err) {
handle.close();
return err;
let err;

if (isInt32(fd) && fd > 0) {
const type = guessHandleType(fd);
if (type !== 'UDP') {
err = UV_EINVAL;
} else {
err = handle.open(fd);
}
} else if (port || address) {
err = handle.bind(address, port || 0, flags);
}

if (err) {
handle.close();
return err;
}

return handle;
}


module.exports = { kStateSymbol, _createSocketHandle, newHandle };
module.exports = {
kStateSymbol,
_createSocketHandle,
newHandle,
guessHandleType,
};
13 changes: 13 additions & 0 deletions src/udp_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ void UDPWrap::Initialize(Local<Object> target,
Local<FunctionTemplate>(),
attributes);

env->SetProtoMethod(t, "open", Open);
env->SetProtoMethod(t, "bind", Bind);
env->SetProtoMethod(t, "send", Send);
env->SetProtoMethod(t, "bind6", Bind6);
Expand Down Expand Up @@ -206,6 +207,18 @@ void UDPWrap::DoBind(const FunctionCallbackInfo<Value>& args, int family) {
}


void UDPWrap::Open(const FunctionCallbackInfo<Value>& args) {
UDPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int fd = static_cast<int>(args[0]->IntegerValue());
int err = uv_udp_open(&wrap->handle_, fd);

args.GetReturnValue().Set(err);
}


void UDPWrap::Bind(const FunctionCallbackInfo<Value>& args) {
DoBind(args, AF_INET);
}
Expand Down
1 change: 1 addition & 0 deletions src/udp_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class UDPWrap: public HandleWrap {
v8::Local<v8::Context> context);
static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Open(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Bind(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Send(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Bind6(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
108 changes: 108 additions & 0 deletions test/parallel/test-cluster-dgram-bind-fd.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
'use strict';
const common = require('../common');
if (common.isWindows)
common.skip('dgram clustering is currently not supported on Windows.');

const NUM_WORKERS = 4;
const PACKETS_PER_WORKER = 10;

const assert = require('assert');
const cluster = require('cluster');
const dgram = require('dgram');
const { UDP } = process.binding('udp_wrap');

if (cluster.isMaster)
master();
else
worker();


function master() {
// Create a handle and use its fd.
const rawHandle = new UDP();
const err = rawHandle.bind(common.localhostIPv4, 0, 0);
assert(err >= 0, String(err));
assert.notStrictEqual(rawHandle.fd, -1);

const fd = rawHandle.fd;

let listening = 0;

// Fork 4 workers.
for (let i = 0; i < NUM_WORKERS; i++)
cluster.fork();

// Wait until all workers are listening.
cluster.on('listening', common.mustCall((worker, address) => {
if (++listening < NUM_WORKERS)
return;

// Start sending messages.
const buf = Buffer.from('hello world');
const socket = dgram.createSocket('udp4');
let sent = 0;
doSend();

function doSend() {
socket.send(buf, 0, buf.length, address.port, address.address, afterSend);
}

function afterSend() {
sent++;
if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
doSend();
} else {
socket.close();
}
}
}, NUM_WORKERS));

// Set up event handlers for every worker. Each worker sends a message when
// it has received the expected number of packets. After that it disconnects.
for (const key in cluster.workers) {
if (cluster.workers.hasOwnProperty(key))
setupWorker(cluster.workers[key]);
}

function setupWorker(worker) {
let received = 0;

worker.send({
fd,
});

worker.on('message', common.mustCall((msg) => {
received = msg.received;
worker.disconnect();
}));

worker.on('exit', common.mustCall(() => {
assert.strictEqual(received, PACKETS_PER_WORKER);
}));
}
}


function worker() {
let received = 0;

process.on('message', common.mustCall((data) => {
const { fd } = data;
// Create udp socket and start listening.
const socket = dgram.createSocket('udp4');

socket.on('message', common.mustCall((data, info) => {
received++;

// Every 10 messages, notify the master.
if (received === PACKETS_PER_WORKER) {
process.send({ received });
socket.close();
}
}, PACKETS_PER_WORKER));

socket.bind({
fd,
});
}));
}
Loading