diff --git a/test/parallel/test-stream-pipe-await-drain-manual-resume.js b/test/parallel/test-stream-pipe-await-drain-manual-resume.js index d26741b8bb4825..f7bff02bb1658a 100644 --- a/test/parallel/test-stream-pipe-await-drain-manual-resume.js +++ b/test/parallel/test-stream-pipe-await-drain-manual-resume.js @@ -1,6 +1,7 @@ 'use strict'; const common = require('../common'); const stream = require('stream'); +const assert = require('assert'); // A consumer stream with a very low highWaterMark, which starts in a state // where it buffers the chunk it receives rather than indicating that they @@ -26,6 +27,11 @@ const readable = new stream.Readable({ readable.pipe(writable); readable.once('pause', common.mustCall(() => { + assert.strictEqual( + readable._readableState.awaitDrain, + 1, + 'awaitDrain doesn\'t increase' + ); // First pause, resume manually. The next write() to writable will still // return false, because chunks are still being buffered, so it will increase // the awaitDrain counter again. @@ -34,6 +40,11 @@ readable.once('pause', common.mustCall(() => { })); readable.once('pause', common.mustCall(() => { + assert.strictEqual( + readable._readableState.awaitDrain, + 1, + '.resume() does not reset counter' + ); // Second pause, handle all chunks from now on. Once all callbacks that // are currently queued up are handled, the awaitDrain drain counter should // fall back to 0 and all chunks that are pending on the readable side @@ -50,5 +61,10 @@ readable.push(Buffer(100)); // Should get through to the writable. readable.push(null); writable.on('finish', common.mustCall(() => { + assert.strictEqual( + readable._readableState.awaitDrain, + 0, + 'awaitDrain not 0 after all chunks are written' + ); // Everything okay, all chunks were written. })); diff --git a/test/parallel/test-stream-pipe-await-drain-push-while-write.js b/test/parallel/test-stream-pipe-await-drain-push-while-write.js index 1dfdfdb80c8d71..67a8f304c31614 100644 --- a/test/parallel/test-stream-pipe-await-drain-push-while-write.js +++ b/test/parallel/test-stream-pipe-await-drain-push-while-write.js @@ -1,16 +1,34 @@ 'use strict'; const common = require('../common'); const stream = require('stream'); +const assert = require('assert'); + +const awaitDrainStates = [ + 1, // after first chunk before callback + 1, // after second chunk before callback + 0 // resolving chunk pushed after first chunk, awaitDrain is decreased +]; // A writable stream which pushes data onto the stream which pipes into it, // but only the first time it's written to. Since it's not paused at this time, // a second write will occur. If the pipe increases awaitDrain twice, we'll // never get subsequent chunks because 'drain' is only emitted once. const writable = new stream.Writable({ - write: common.mustCall((chunk, encoding, cb) => { + write: common.mustCall(function(chunk, encoding, cb) { if (chunk.length === 32 * 1024) { // first chunk - readable.push(new Buffer(33 * 1024)); // above hwm + const beforePush = readable._readableState.awaitDrain; + readable.push(new Buffer(34 * 1024)); // above hwm + // We should check if awaitDrain counter is increased. + const afterPush = readable._readableState.awaitDrain; + assert.strictEqual(afterPush - beforePush, 1, + 'Counter is not increased for awaitDrain'); } + + assert.strictEqual( + awaitDrainStates.shift(), + readable._readableState.awaitDrain, + 'State variable awaitDrain is not correct.' + ); cb(); }, 3) }); diff --git a/test/parallel/test-stream-pipe-await-drain.js b/test/parallel/test-stream-pipe-await-drain.js index 0e8d3497123538..ff232157c6cbe0 100644 --- a/test/parallel/test-stream-pipe-await-drain.js +++ b/test/parallel/test-stream-pipe-await-drain.js @@ -1,12 +1,14 @@ 'use strict'; const common = require('../common'); const stream = require('stream'); +const assert = require('assert'); // This is very similar to test-stream-pipe-cleanup-pause.js. const reader = new stream.Readable(); const writer1 = new stream.Writable(); const writer2 = new stream.Writable(); +const writer3 = new stream.Writable(); // 560000 is chosen here because it is larger than the (default) highWaterMark // and will cause `.write()` to return false @@ -19,7 +21,10 @@ writer1._write = common.mustCall(function(chunk, encoding, cb) { this.emit('chunk-received'); cb(); }, 1); + writer1.once('chunk-received', function() { + assert.strictEqual(reader._readableState.awaitDrain, 0, + 'initial value is not 0'); setImmediate(function() { // This one should *not* get through to writer1 because writer2 is not // "done" processing. @@ -29,12 +34,26 @@ writer1.once('chunk-received', function() { // A "slow" consumer: writer2._write = common.mustCall(function(chunk, encoding, cb) { + assert.strictEqual( + reader._readableState.awaitDrain, 1, + 'awaitDrain isn\'t 1 after first push' + ); // Not calling cb here to "simulate" slow stream. + // This should be called exactly once, since the first .write() call + // will return false. +}, 1); +writer3._write = common.mustCall(function(chunk, encoding, cb) { + assert.strictEqual( + reader._readableState.awaitDrain, 2, + 'awaitDrain isn\'t 2 after second push' + ); + // Not calling cb here to "simulate" slow stream. // This should be called exactly once, since the first .write() call // will return false. }, 1); reader.pipe(writer1); reader.pipe(writer2); +reader.pipe(writer3); reader.push(buffer);