Skip to content

Commit

Permalink
stream: fix sync callback leak
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Feb 13, 2020
1 parent a751389 commit 25dedba
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 5 deletions.
13 changes: 10 additions & 3 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ function WritableState(options, stream, isDuplex) {
// or on a later tick. We set this to true at first, because any
// actions that shouldn't happen until "later" should generally also
// not happen before the first write call.
this.sync = true;
this.tick = process.tickId;

// A flag to know if we're processing previously buffered items, which
// may call the _write() callback in the same tick, so that we don't
Expand Down Expand Up @@ -199,6 +199,14 @@ WritableState.prototype.getBuffer = function getBuffer() {
return out;
};

ObjectDefineProperties(WritableState.prototype, {
sync: {
get() {
return this.tick === process.tickId;
}
},
});

// Test _writableState for inheritance to account for Duplex streams,
// whose prototype chain only points to Readable.
var realHasInstance;
Expand Down Expand Up @@ -377,14 +385,13 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
state.tick = process.tickId;
if (state.destroyed)
state.onwrite(new ERR_STREAM_DESTROYED('write'));
else if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}

function onwriteError(stream, state, er, cb) {
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ process.emitWarning = emitWarning;
// bootstrap to make sure that any operation done before this are synchronous.
// If any ticks or timers are scheduled before this they are unlikely to work.
{
const { nextTick, runNextTicks } = setupTaskQueue();
const { nextTick, runNextTicks, tickId } = setupTaskQueue();
process.nextTick = nextTick;
process.tickId = tickId;
// Used to emulate a tick manually in the JS land.
// A better name for this function would be `runNextTicks` but
// it has been exposed to the process object so we keep this legacy name
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/process/task_queues.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ function setHasTickScheduled(value) {
}

const queue = new FixedQueue();
let tickId = newAsyncId();

// Should be in sync with RunNextTicksNative in node_task_queue.cc
function runNextTicks() {
Expand All @@ -70,7 +71,7 @@ function processTicksAndRejections() {
let tock;
do {
while (tock = queue.shift()) {
const asyncId = tock[async_id_symbol];
const asyncId = tickId = tock[async_id_symbol];
emitBefore(asyncId, tock[trigger_async_id_symbol], tock);

try {
Expand Down Expand Up @@ -179,6 +180,7 @@ module.exports = {
// Sets the callback to be run in every tick.
setTickCallback(processTicksAndRejections);
return {
tickId,
nextTick,
runNextTicks
};
Expand Down
44 changes: 44 additions & 0 deletions test/parallel/test-stream-writable-sync-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';
const common = require('../common');

// Tests for the regression in _stream_writable fixed in
// https://github.com/nodejs/node/pull/31756

// Specifically, when a write callback is invoked synchronously
// with an error, and autoDestroy is not being used, the error
// should still be emitted on nextTick.

const { Writable } = require('stream');

class MyStream extends Writable {
#cb = undefined;

constructor() {
super({ autoDestroy: false });
}

_write(_, __, cb) {
this.#cb = cb;
}

close() {
// Synchronously invoke the callback with an error.
this.#cb(new Error('foo'));
}
}

const stream = new MyStream();

const mustError = common.mustCall(2);

stream.write('test', () => {});

// Both error callbacks should be invoked.

stream.on('error', mustError);

stream.close();

// Without the fix in #31756, the error handler
// added after the call to close will not be invoked.
stream.on('error', mustError);

0 comments on commit 25dedba

Please sign in to comment.