diff --git a/benchmark/http/upgrade.js b/benchmark/http/upgrade.js new file mode 100644 index 00000000000000..0feaecc8ff19e6 --- /dev/null +++ b/benchmark/http/upgrade.js @@ -0,0 +1,53 @@ +'use strict'; + +const common = require('../common.js'); +const PORT = common.PORT; +const net = require('net'); + +const bench = common.createBenchmark(main, { + n: [5, 1000] +}); + +const reqData = 'GET / HTTP/1.1\r\n' + + 'Upgrade: WebSocket\r\n' + + 'Connection: Upgrade\r\n' + + '\r\n' + + 'WjN}|M(6'; + +const resData = 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' + + 'Upgrade: WebSocket\r\n' + + 'Connection: Upgrade\r\n' + + '\r\n\r\n'; + +function main({ n }) { + process.env.PORT = PORT; + var server = require('../fixtures/simple-http-server.js') + .listen(common.PORT) + .on('listening', function() { + bench.start(); + doBench(server.address(), n, function() { + bench.end(n); + server.close(); + }); + }) + .on('upgrade', function(req, socket, upgradeHead) { + socket.resume(); + socket.write(resData); + socket.end(); + }); +} + +function doBench(address, count, done) { + if (count === 0) { + done(); + return; + } + + const conn = net.createConnection(address.port); + conn.write(reqData); + conn.resume(); + + conn.on('end', function() { + doBench(address, count - 1, done); + }); +} diff --git a/doc/api/stream.md b/doc/api/stream.md index f3780ee4c49b01..dc74df7720ee19 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -63,8 +63,8 @@ object mode is not safe. Both [Writable][] and [Readable][] streams will store data in an internal -buffer that can be retrieved using `writable._writableState.getBuffer()` or -`readable._readableState.buffer`, respectively. +buffer that can be retrieved using `writable.writableBuffer` or +`readable.readableBuffer`, respectively. The amount of data potentially buffered depends on the `highWaterMark` option passed into the streams constructor. For normal streams, the `highWaterMark` @@ -602,22 +602,22 @@ Readable stream implementation. Specifically, at any given point in time, every Readable is in one of three possible states: -* `readable._readableState.flowing = null` -* `readable._readableState.flowing = false` -* `readable._readableState.flowing = true` +* `readable.readableFlowing = null` +* `readable.readableFlowing = false` +* `readable.readableFlowing = true` -When `readable._readableState.flowing` is `null`, no mechanism for consuming the +When `readable.readableFlowing` is `null`, no mechanism for consuming the streams data is provided so the stream will not generate its data. While in this state, attaching a listener for the `'data'` event, calling the `readable.pipe()` method, or calling the `readable.resume()` method will switch -`readable._readableState.flowing` to `true`, causing the Readable to begin +`readable.readableFlowing` to `true`, causing the Readable to begin actively emitting events as data is generated. Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure" -will cause the `readable._readableState.flowing` to be set as `false`, +will cause the `readable.readableFlowing` to be set as `false`, temporarily halting the flowing of events but *not* halting the generation of data. While in this state, attaching a listener for the `'data'` event -would not cause `readable._readableState.flowing` to switch to `true`. +would not cause `readable.readableFlowing` to switch to `true`. ```js const { PassThrough, Writable } = require('stream'); @@ -626,14 +626,14 @@ const writable = new Writable(); pass.pipe(writable); pass.unpipe(writable); -// flowing is now false +// readableFlowing is now false pass.on('data', (chunk) => { console.log(chunk.toString()); }); pass.write('ok'); // will not emit 'data' pass.resume(); // must be called to make 'data' being emitted ``` -While `readable._readableState.flowing` is `false`, data may be accumulating +While `readable.readableFlowing` is `false`, data may be accumulating within the streams internal buffer. #### Choose One diff --git a/lib/_http_client.js b/lib/_http_client.js index bdda708493adba..d9a2d10ae24fd6 100644 --- a/lib/_http_client.js +++ b/lib/_http_client.js @@ -432,9 +432,7 @@ function socketOnData(d) { socket.removeListener('close', socketCloseListener); socket.removeListener('error', socketErrorListener); - // TODO(isaacs): Need a way to reset a stream to fresh state - // IE, not flowing, and not explicitly paused. - socket._readableState.flowing = null; + socket.readableFlowing = null; req.emit(eventName, res, socket, bodyHead); req.emit('close'); diff --git a/lib/_http_server.js b/lib/_http_server.js index a1cbb63c1d3531..cab48245010ae7 100644 --- a/lib/_http_server.js +++ b/lib/_http_server.js @@ -502,9 +502,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) { debug('SERVER have listener for %s', eventName); var bodyHead = d.slice(bytesParsed, d.length); - // TODO(isaacs): Need a way to reset a stream to fresh state - // IE, not flowing, and not explicitly paused. - socket._readableState.flowing = null; + socket.readableFlowing = null; server.emit(eventName, req, socket, bodyHead); } else { // Got upgrade header or CONNECT method, but have no handler. diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js index 05f649340845b0..e99d246396f6cd 100644 --- a/lib/_stream_duplex.js +++ b/lib/_stream_duplex.js @@ -74,6 +74,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', { } }); +Object.defineProperty(Duplex.prototype, 'writableBuffer', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, + get: function() { + return this._writableState && this._writableState.getBuffer(); + } +}); + // the no-half-open enforcer function onend() { // if we allow half-open state, or if the writable side ended, diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 01d886e6cab2eb..2653d3835d8d76 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -925,6 +925,31 @@ Object.defineProperty(Readable.prototype, 'readableHighWaterMark', { } }); +Object.defineProperty(Readable.prototype, 'readableBuffer', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, + get: function() { + return this._readableState && this._readableState.buffer; + } +}); + +Object.defineProperty(Readable.prototype, 'readableFlowing', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, + get: function() { + return this._readableState.flowing; + }, + set: function(state) { + if (this._readableState) { + this._readableState.flowing = state; + } + } +}); + // exposed for testing purposes only. Readable._fromList = fromList; diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index f5f05486d233a9..97fbebdd66ddc5 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -324,6 +324,16 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { return this; }; +Object.defineProperty(Writable.prototype, 'writableBuffer', { + // making it explicit this property is not enumerable + // because otherwise some prototype manipulation in + // userland will fail + enumerable: false, + get: function() { + return this._writableState && this._writableState.getBuffer(); + } +}); + function decodeChunk(state, chunk, encoding) { if (!state.objectMode && state.decodeStrings !== false && diff --git a/lib/net.js b/lib/net.js index 5562db6ed61e38..aa27e700508a50 100644 --- a/lib/net.js +++ b/lib/net.js @@ -254,7 +254,7 @@ function Socket(options) { // stop the handle from reading and pause the stream this._handle.reading = false; this._handle.readStop(); - this._readableState.flowing = false; + this.readableFlowing = false; } else if (!options.manualStart) { this.read(0); } @@ -819,7 +819,7 @@ protoGetter('bytesWritten', function bytesWritten() { if (!state) return undefined; - state.getBuffer().forEach(function(el) { + this.writableBuffer.forEach(function(el) { if (el.chunk instanceof Buffer) bytes += el.chunk.length; else diff --git a/test/parallel/test-stream-push-order.js b/test/parallel/test-stream-push-order.js index be2db9f44a6691..ce4f336b0254d5 100644 --- a/test/parallel/test-stream-push-order.js +++ b/test/parallel/test-stream-push-order.js @@ -47,6 +47,6 @@ s.read(0); // ACTUALLY [1, 3, 5, 6, 4, 2] process.on('exit', function() { - assert.deepStrictEqual(s._readableState.buffer.join(','), '1,2,3,4,5,6'); + assert.deepStrictEqual(s.readableBuffer.join(','), '1,2,3,4,5,6'); console.log('ok'); }); diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js index bee3a1c82a8678..e31d2dd921ce5b 100644 --- a/test/parallel/test-stream-readable-reading-readingMore.js +++ b/test/parallel/test-stream-readable-reading-readingMore.js @@ -15,7 +15,7 @@ assert.strictEqual(state.readingMore, false); readable.on('data', common.mustCall((data) => { // while in a flowing state, should try to read more. - if (state.flowing) + if (readable.readableFlowing) assert.strictEqual(state.readingMore, true); // reading as long as we've not ended diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js index 819b088e2757f2..16a0523994c19f 100644 --- a/test/parallel/test-stream2-transform.js +++ b/test/parallel/test-stream2-transform.js @@ -46,7 +46,7 @@ const Transform = require('_stream_transform'); assert.strictEqual(tx._readableState.length, 10); assert.strictEqual(transformed, 10); assert.strictEqual(tx._transformState.writechunk.length, 5); - assert.deepStrictEqual(tx._writableState.getBuffer().map(function(c) { + assert.deepStrictEqual(tx.writableBuffer.map(function(c) { return c.chunk.length; }), [6, 7, 8, 9, 10]); } diff --git a/test/parallel/test-stream2-unpipe-leak.js b/test/parallel/test-stream2-unpipe-leak.js index cc331d58217e25..5c19be061fd54c 100644 --- a/test/parallel/test-stream2-unpipe-leak.js +++ b/test/parallel/test-stream2-unpipe-leak.js @@ -66,7 +66,7 @@ assert.strictEqual(dest.listeners('finish').length, 0); console.error(src._readableState); process.on('exit', function() { - src._readableState.buffer.length = 0; + src.readableBuffer.length = 0; console.error(src._readableState); assert(src._readableState.length >= src.readableHighWaterMark); console.log('ok'); diff --git a/test/parallel/test-stream3-pause-then-read.js b/test/parallel/test-stream3-pause-then-read.js index d75fe697081b32..f7bfadaf9d124c 100644 --- a/test/parallel/test-stream3-pause-then-read.js +++ b/test/parallel/test-stream3-pause-then-read.js @@ -68,7 +68,7 @@ function readn(n, then) { r.once('readable', read); else { assert.strictEqual(c.length, n); - assert(!r._readableState.flowing); + assert(!r.readableFlowing); then(); } })();