Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: tests for _readableStream.awaitDrain #8914

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions test/parallel/test-stream-pipe-await-drain-manual-resume.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');

// A consumer stream with a very low highWaterMark, which starts in a state
// where it buffers the chunk it receives rather than indicating that they
Expand All @@ -26,6 +27,11 @@ const readable = new stream.Readable({
readable.pipe(writable);

readable.once('pause', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
1,
'awaitDrain doesn\'t increase'
);
// First pause, resume manually. The next write() to writable will still
// return false, because chunks are still being buffered, so it will increase
// the awaitDrain counter again.
Expand All @@ -34,6 +40,11 @@ readable.once('pause', common.mustCall(() => {
}));

readable.once('pause', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
1,
'.resume() does not reset counter'
);
// Second pause, handle all chunks from now on. Once all callbacks that
// are currently queued up are handled, the awaitDrain drain counter should
// fall back to 0 and all chunks that are pending on the readable side
Expand All @@ -50,5 +61,10 @@ readable.push(Buffer.alloc(100)); // Should get through to the writable.
readable.push(null);

writable.on('finish', common.mustCall(() => {
assert.strictEqual(
readable._readableState.awaitDrain,
0,
'awaitDrain not 0 after all chunks are written'
);
// Everything okay, all chunks were written.
}));
22 changes: 20 additions & 2 deletions test/parallel/test-stream-pipe-await-drain-push-while-write.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');

const awaitDrainStates = [
1, // after first chunk before callback
1, // after second chunk before callback
0 // resolving chunk pushed after first chunk, awaitDrain is decreased
];

// A writable stream which pushes data onto the stream which pipes into it,
// but only the first time it's written to. Since it's not paused at this time,
// a second write will occur. If the pipe increases awaitDrain twice, we'll
// never get subsequent chunks because 'drain' is only emitted once.
const writable = new stream.Writable({
write: common.mustCall((chunk, encoding, cb) => {
write: common.mustCall(function(chunk, encoding, cb) {
if (chunk.length === 32 * 1024) { // first chunk
readable.push(new Buffer(33 * 1024)); // above hwm
const beforePush = readable._readableState.awaitDrain;
readable.push(new Buffer(34 * 1024)); // above hwm
// We should check if awaitDrain counter is increased.
const afterPush = readable._readableState.awaitDrain;
assert.strictEqual(afterPush - beforePush, 1,
'Counter is not increased for awaitDrain');
}

assert.strictEqual(
awaitDrainStates.shift(),
readable._readableState.awaitDrain,
'State variable awaitDrain is not correct.'
);
cb();
}, 3)
});
Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-stream-pipe-await-drain.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
'use strict';
const common = require('../common');
const stream = require('stream');
const assert = require('assert');

// This is very similar to test-stream-pipe-cleanup-pause.js.

const reader = new stream.Readable();
const writer1 = new stream.Writable();
const writer2 = new stream.Writable();
const writer3 = new stream.Writable();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should add a new writer here. Why this change?

Copy link
Contributor Author

@shmuga shmuga Oct 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it's better to trace if the counter is not stuck on value 1 if we have multiple writers in pipeline. For example, if someone will breake the behavior of this counter and it will work as a flag, he will get error in this test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect, thanks.


// 560000 is chosen here because it is larger than the (default) highWaterMark
// and will cause `.write()` to return false
Expand All @@ -19,7 +21,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) {
this.emit('chunk-received');
cb();
}, 1);

writer1.once('chunk-received', function() {
assert.strictEqual(reader._readableState.awaitDrain, 0,
'initial value is not 0');
setImmediate(function() {
// This one should *not* get through to writer1 because writer2 is not
// "done" processing.
Expand All @@ -29,12 +34,26 @@ writer1.once('chunk-received', function() {

// A "slow" consumer:
writer2._write = common.mustCall(function(chunk, encoding, cb) {
assert.strictEqual(
reader._readableState.awaitDrain, 1,
'awaitDrain isn\'t 1 after first push'
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
// will return false.
}, 1);

writer3._write = common.mustCall(function(chunk, encoding, cb) {
assert.strictEqual(
reader._readableState.awaitDrain, 2,
'awaitDrain isn\'t 2 after second push'
);
// Not calling cb here to "simulate" slow stream.
// This should be called exactly once, since the first .write() call
// will return false.
}, 1);

reader.pipe(writer1);
reader.pipe(writer2);
reader.pipe(writer3);
reader.push(buffer);