From 25dedbab6380f77262ee6fd871a9b3036c076bd7 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Thu, 13 Feb 2020 11:18:47 +0100 Subject: [PATCH] stream: fix sync callback leak --- lib/_stream_writable.js | 13 ++++-- lib/internal/bootstrap/node.js | 3 +- lib/internal/process/task_queues.js | 4 +- .../test-stream-writable-sync-error.js | 44 +++++++++++++++++++ 4 files changed, 59 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-stream-writable-sync-error.js diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 8d1f33748b3088..53c0894d0f2981 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -131,7 +131,7 @@ function WritableState(options, stream, isDuplex) { // or on a later tick. We set this to true at first, because any // actions that shouldn't happen until "later" should generally also // not happen before the first write call. - this.sync = true; + this.tick = process.tickId; // A flag to know if we're processing previously buffered items, which // may call the _write() callback in the same tick, so that we don't @@ -199,6 +199,14 @@ WritableState.prototype.getBuffer = function getBuffer() { return out; }; +ObjectDefineProperties(WritableState.prototype, { + sync: { + get() { + return this.tick === process.tickId; + } + }, +}); + // Test _writableState for inheritance to account for Duplex streams, // whose prototype chain only points to Readable. var realHasInstance; @@ -377,14 +385,13 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) { state.writelen = len; state.writecb = cb; state.writing = true; - state.sync = true; + state.tick = process.tickId; if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write')); else if (writev) stream._writev(chunk, state.onwrite); else stream._write(chunk, encoding, state.onwrite); - state.sync = false; } function onwriteError(stream, state, er, cb) { diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js index aa166cc931b94f..cca239eb5416d9 100644 --- a/lib/internal/bootstrap/node.js +++ b/lib/internal/bootstrap/node.js @@ -228,8 +228,9 @@ process.emitWarning = emitWarning; // bootstrap to make sure that any operation done before this are synchronous. // If any ticks or timers are scheduled before this they are unlikely to work. { - const { nextTick, runNextTicks } = setupTaskQueue(); + const { nextTick, runNextTicks, tickId } = setupTaskQueue(); process.nextTick = nextTick; + process.tickId = tickId; // Used to emulate a tick manually in the JS land. // A better name for this function would be `runNextTicks` but // it has been exposed to the process object so we keep this legacy name diff --git a/lib/internal/process/task_queues.js b/lib/internal/process/task_queues.js index c07942587ca9c2..bbb98037a1648c 100644 --- a/lib/internal/process/task_queues.js +++ b/lib/internal/process/task_queues.js @@ -55,6 +55,7 @@ function setHasTickScheduled(value) { } const queue = new FixedQueue(); +let tickId = newAsyncId(); // Should be in sync with RunNextTicksNative in node_task_queue.cc function runNextTicks() { @@ -70,7 +71,7 @@ function processTicksAndRejections() { let tock; do { while (tock = queue.shift()) { - const asyncId = tock[async_id_symbol]; + const asyncId = tickId = tock[async_id_symbol]; emitBefore(asyncId, tock[trigger_async_id_symbol], tock); try { @@ -179,6 +180,7 @@ module.exports = { // Sets the callback to be run in every tick. setTickCallback(processTicksAndRejections); return { + tickId, nextTick, runNextTicks }; diff --git a/test/parallel/test-stream-writable-sync-error.js b/test/parallel/test-stream-writable-sync-error.js new file mode 100644 index 00000000000000..291e5943b280c5 --- /dev/null +++ b/test/parallel/test-stream-writable-sync-error.js @@ -0,0 +1,44 @@ +'use strict'; +const common = require('../common'); + +// Tests for the regression in _stream_writable fixed in +// https://github.com/nodejs/node/pull/31756 + +// Specifically, when a write callback is invoked synchronously +// with an error, and autoDestroy is not being used, the error +// should still be emitted on nextTick. + +const { Writable } = require('stream'); + +class MyStream extends Writable { + #cb = undefined; + + constructor() { + super({ autoDestroy: false }); + } + + _write(_, __, cb) { + this.#cb = cb; + } + + close() { + // Synchronously invoke the callback with an error. + this.#cb(new Error('foo')); + } +} + +const stream = new MyStream(); + +const mustError = common.mustCall(2); + +stream.write('test', () => {}); + +// Both error callbacks should be invoked. + +stream.on('error', mustError); + +stream.close(); + +// Without the fix in #31756, the error handler +// added after the call to close will not be invoked. +stream.on('error', mustError);