From 68990948fe473262ec5f7d0b06112835ffd613aa Mon Sep 17 00:00:00 2001 From: Brian White Date: Sun, 11 Oct 2015 14:06:17 -0400 Subject: [PATCH] stream: avoid pause with unpipe in buffered write If a pipe is cleaned up (due to unpipe) during a write that returned false, the source stream can get stuck in a paused state. Fixes: https://github.com/nodejs/node/issues/2323 PR-URL: https://github.com/nodejs/node/pull/2325 Reviewed-By: Sakthipriyan Vairamani --- lib/_stream_readable.js | 16 ++++++-- .../test-stream-pipe-cleanup-pause.js | 38 +++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) create mode 100644 test/parallel/test-stream-pipe-cleanup-pause.js diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 64ff744b8db677..5a375321615a7f 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -497,6 +497,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { var ondrain = pipeOnDrain(src); dest.on('drain', ondrain); + var cleanedUp = false; function cleanup() { debug('cleanup'); // cleanup event handlers once the pipe is broken @@ -509,6 +510,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) { src.removeListener('end', cleanup); src.removeListener('data', ondata); + cleanedUp = true; + // if the reader is waiting for a drain event from this // specific writer, then it would cause it to never start // flowing again. @@ -524,9 +527,16 @@ Readable.prototype.pipe = function(dest, pipeOpts) { debug('ondata'); var ret = dest.write(chunk); if (false === ret) { - debug('false write response, pause', - src._readableState.awaitDrain); - src._readableState.awaitDrain++; + // If the user unpiped during `dest.write()`, it is possible + // to get stuck in a permanently paused state if that write + // also returned false. + if (state.pipesCount === 1 && + state.pipes[0] === dest && + src.listenerCount('data') === 1 && + !cleanedUp) { + debug('false write response, pause', src._readableState.awaitDrain); + src._readableState.awaitDrain++; + } src.pause(); } } diff --git a/test/parallel/test-stream-pipe-cleanup-pause.js b/test/parallel/test-stream-pipe-cleanup-pause.js new file mode 100644 index 00000000000000..b38f57a4ff0ba1 --- /dev/null +++ b/test/parallel/test-stream-pipe-cleanup-pause.js @@ -0,0 +1,38 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const stream = require('stream'); + +const reader = new stream.Readable(); +const writer1 = new stream.Writable(); +const writer2 = new stream.Writable(); + +// 560000 is chosen here because it is larger than the (default) highWaterMark +// and will cause `.write()` to return false +// See: https://github.com/nodejs/node/issues/2323 +const buffer = new Buffer(560000); + +reader._read = function(n) {}; + +writer1._write = common.mustCall(function(chunk, encoding, cb) { + this.emit('chunk-received'); + cb(); +}, 1); +writer1.once('chunk-received', function() { + reader.unpipe(writer1); + reader.pipe(writer2); + reader.push(buffer); + setImmediate(function() { + reader.push(buffer); + setImmediate(function() { + reader.push(buffer); + }); + }); +}); + +writer2._write = common.mustCall(function(chunk, encoding, cb) { + cb(); +}, 3); + +reader.pipe(writer1); +reader.push(buffer);