Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streams: remove all references to readableState/writableState from docs #12855

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions benchmark/http/upgrade.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict';

const common = require('../common.js');
const PORT = common.PORT;
const net = require('net');

const bench = common.createBenchmark(main, {
n: [5, 1000]
});

const reqData = 'GET / HTTP/1.1\r\n' +
'Upgrade: WebSocket\r\n' +
'Connection: Upgrade\r\n' +
'\r\n' +
'WjN}|M(6';

const resData = 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
'Upgrade: WebSocket\r\n' +
'Connection: Upgrade\r\n' +
'\r\n\r\n';

function main({ n }) {
process.env.PORT = PORT;
var server = require('../fixtures/simple-http-server.js')
.listen(common.PORT)
.on('listening', function() {
bench.start();
doBench(server.address(), n, function() {
bench.end(n);
server.close();
});
})
.on('upgrade', function(req, socket, upgradeHead) {
socket.resume();
socket.write(resData);
socket.end();
});
}

function doBench(address, count, done) {
if (count === 0) {
done();
return;
}

const conn = net.createConnection(address.port);
conn.write(reqData);
conn.resume();

conn.on('end', function() {
doBench(address, count - 1, done);
});
}
22 changes: 11 additions & 11 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ object mode is not safe.
<!--type=misc-->

Both [Writable][] and [Readable][] streams will store data in an internal
buffer that can be retrieved using `writable._writableState.getBuffer()` or
`readable._readableState.buffer`, respectively.
buffer that can be retrieved using `writable.writableBuffer` or
`readable.readableBuffer`, respectively.

The amount of data potentially buffered depends on the `highWaterMark` option
passed into the streams constructor. For normal streams, the `highWaterMark`
Expand Down Expand Up @@ -602,22 +602,22 @@ Readable stream implementation.
Specifically, at any given point in time, every Readable is in one of three
possible states:

* `readable._readableState.flowing = null`
* `readable._readableState.flowing = false`
* `readable._readableState.flowing = true`
* `readable.readableFlowing = null`
* `readable.readableFlowing = false`
* `readable.readableFlowing = true`

When `readable._readableState.flowing` is `null`, no mechanism for consuming the
When `readable.readableFlowing` is `null`, no mechanism for consuming the
streams data is provided so the stream will not generate its data. While in this
state, attaching a listener for the `'data'` event, calling the `readable.pipe()`
method, or calling the `readable.resume()` method will switch
`readable._readableState.flowing` to `true`, causing the Readable to begin
`readable.readableFlowing` to `true`, causing the Readable to begin
actively emitting events as data is generated.

Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure"
will cause the `readable._readableState.flowing` to be set as `false`,
will cause the `readable.readableFlowing` to be set as `false`,
temporarily halting the flowing of events but *not* halting the generation of
data. While in this state, attaching a listener for the `'data'` event
would not cause `readable._readableState.flowing` to switch to `true`.
would not cause `readable.readableFlowing` to switch to `true`.

```js
const { PassThrough, Writable } = require('stream');
Expand All @@ -626,14 +626,14 @@ const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// flowing is now false
// readableFlowing is now false

pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // will not emit 'data'
pass.resume(); // must be called to make 'data' being emitted
```

While `readable._readableState.flowing` is `false`, data may be accumulating
While `readable.readableFlowing` is `false`, data may be accumulating
within the streams internal buffer.

#### Choose One
Expand Down
4 changes: 1 addition & 3 deletions lib/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,7 @@ function socketOnData(d) {
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);

// TODO(isaacs): Need a way to reset a stream to fresh state
// IE, not flowing, and not explicitly paused.
socket._readableState.flowing = null;
socket.readableFlowing = null;

req.emit(eventName, res, socket, bodyHead);
req.emit('close');
Expand Down
4 changes: 1 addition & 3 deletions lib/_http_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -502,9 +502,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
debug('SERVER have listener for %s', eventName);
var bodyHead = d.slice(bytesParsed, d.length);

// TODO(isaacs): Need a way to reset a stream to fresh state
// IE, not flowing, and not explicitly paused.
socket._readableState.flowing = null;
socket.readableFlowing = null;
server.emit(eventName, req, socket, bodyHead);
} else {
// Got upgrade header or CONNECT method, but have no handler.
Expand Down
10 changes: 10 additions & 0 deletions lib/_stream_duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
}
});

Object.defineProperty(Duplex.prototype, 'writableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState && this._writableState.getBuffer();
}
});

// the no-half-open enforcer
function onend() {
// if we allow half-open state, or if the writable side ended,
Expand Down
25 changes: 25 additions & 0 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,31 @@ Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
}
});

Object.defineProperty(Readable.prototype, 'readableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._readableState && this._readableState.buffer;
}
});

Object.defineProperty(Readable.prototype, 'readableFlowing', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._readableState.flowing;
},
set: function(state) {
if (this._readableState) {
this._readableState.flowing = state;
}
}
});

// exposed for testing purposes only.
Readable._fromList = fromList;

Expand Down
10 changes: 10 additions & 0 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,16 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
return this;
};

Object.defineProperty(Writable.prototype, 'writableBuffer', {
// making it explicit this property is not enumerable
// because otherwise some prototype manipulation in
// userland will fail
enumerable: false,
get: function() {
return this._writableState && this._writableState.getBuffer();
}
});

function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
Expand Down
4 changes: 2 additions & 2 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ function Socket(options) {
// stop the handle from reading and pause the stream
this._handle.reading = false;
this._handle.readStop();
this._readableState.flowing = false;
this.readableFlowing = false;
} else if (!options.manualStart) {
this.read(0);
}
Expand Down Expand Up @@ -829,7 +829,7 @@ protoGetter('bytesWritten', function bytesWritten() {
if (!state)
return undefined;

state.getBuffer().forEach(function(el) {
this.writableBuffer.forEach(function(el) {
if (el.chunk instanceof Buffer)
bytes += el.chunk.length;
else
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-push-order.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ s.read(0);
// ACTUALLY [1, 3, 5, 6, 4, 2]

process.on('exit', function() {
assert.deepStrictEqual(s._readableState.buffer.join(','), '1,2,3,4,5,6');
assert.deepStrictEqual(s.readableBuffer.join(','), '1,2,3,4,5,6');
console.log('ok');
});
2 changes: 1 addition & 1 deletion test/parallel/test-stream-readable-reading-readingMore.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ assert.strictEqual(state.readingMore, false);

readable.on('data', common.mustCall((data) => {
// while in a flowing state, should try to read more.
if (state.flowing)
if (readable.readableFlowing)
assert.strictEqual(state.readingMore, true);

// reading as long as we've not ended
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream2-transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ const Transform = require('_stream_transform');
assert.strictEqual(tx._readableState.length, 10);
assert.strictEqual(transformed, 10);
assert.strictEqual(tx._transformState.writechunk.length, 5);
assert.deepStrictEqual(tx._writableState.getBuffer().map(function(c) {
assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);
}
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream2-unpipe-leak.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ assert.strictEqual(dest.listeners('finish').length, 0);

console.error(src._readableState);
process.on('exit', function() {
src._readableState.buffer.length = 0;
src.readableBuffer.length = 0;
console.error(src._readableState);
assert(src._readableState.length >= src.readableHighWaterMark);
console.log('ok');
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream3-pause-then-read.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ function readn(n, then) {
r.once('readable', read);
else {
assert.strictEqual(c.length, n);
assert(!r._readableState.flowing);
assert(!r.readableFlowing);
then();
}
})();
Expand Down