Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: fix sync callback leak #31765

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ function WritableState(options, stream, isDuplex) {
// or on a later tick. We set this to true at first, because any
// actions that shouldn't happen until "later" should generally also
// not happen before the first write call.
this.sync = true;
this.tick = process.tickId;

// A flag to know if we're processing previously buffered items, which
// may call the _write() callback in the same tick, so that we don't
Expand Down Expand Up @@ -199,6 +199,14 @@ WritableState.prototype.getBuffer = function getBuffer() {
return out;
};

ObjectDefineProperties(WritableState.prototype, {
sync: {
get() {
return this.tick === process.tickId;
Copy link
Member Author

@ronag ronag Feb 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcollina: one way to get around the readable-stream problem could be:

process.tickId === undefined || this.tick === process.tickId

The downside would be that it would be slower for sync streams (but probably correct?) on old node versions, i.e . it would always think it is "sync" and use nextTick even though it is not strictly necessary.

We could wait with backporting this to readable-stream until v14 becomes LTS?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readable-stream has to work on old version of Node.js as well, as long as browsers. I would prefer to avoid this completely.

}
},
});

// Test _writableState for inheritance to account for Duplex streams,
// whose prototype chain only points to Readable.
var realHasInstance;
Expand Down Expand Up @@ -377,14 +385,13 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len;
state.writecb = cb;
state.writing = true;
state.sync = true;
state.tick = process.tickId;
if (state.destroyed)
state.onwrite(new ERR_STREAM_DESTROYED('write'));
else if (writev)
stream._writev(chunk, state.onwrite);
else
stream._write(chunk, encoding, state.onwrite);
state.sync = false;
}

function onwriteError(stream, state, er, cb) {
Expand Down
3 changes: 2 additions & 1 deletion lib/internal/bootstrap/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ process.emitWarning = emitWarning;
// bootstrap to make sure that any operation done before this are synchronous.
// If any ticks or timers are scheduled before this they are unlikely to work.
{
const { nextTick, runNextTicks } = setupTaskQueue();
const { nextTick, runNextTicks, tickId } = setupTaskQueue();
process.nextTick = nextTick;
process.tickId = tickId;
// Used to emulate a tick manually in the JS land.
// A better name for this function would be `runNextTicks` but
// it has been exposed to the process object so we keep this legacy name
Expand Down
4 changes: 3 additions & 1 deletion lib/internal/process/task_queues.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ function setHasTickScheduled(value) {
}

const queue = new FixedQueue();
let tickId = newAsyncId();

// Should be in sync with RunNextTicksNative in node_task_queue.cc
function runNextTicks() {
Expand All @@ -70,7 +71,7 @@ function processTicksAndRejections() {
let tock;
do {
while (tock = queue.shift()) {
const asyncId = tock[async_id_symbol];
const asyncId = tickId = tock[async_id_symbol];
Copy link
Member Author

@ronag ronag Feb 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be exactly correct since it will only run if there is a nextTick scheduled and won't take other possible async entry points into account such as IO and what not. This is not my area of expertise.

Maybe would require some help from @addaleax if this approach is deemed worth looking further into.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I’m happy to investigate this in more detail… It looks like you’re essentially going for an id for each individual synchronous block of JS execution, which should be doable, but I agree that it makes sense to get a green light for this approach first :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, fwiw I've been mulling over the details of a proposal to TC-39 that does precisely that: assign execution and trigger IDs for every JS execution at the language level. It would make several things significantly easier.

I'm not convinced that this PR is the way to go forward tho and definitely need to give it a bit more thought.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm yeah … I’m not sure if this would work fully when using async_hooks because we’d basically always want the lowest async id on the stack, not the highest. But maybe we could expose + use that? That would also solve the readable-stream issue for all supported versions of Node.js (although obviously not browsers…)

emitBefore(asyncId, tock[trigger_async_id_symbol], tock);

try {
Expand Down Expand Up @@ -179,6 +180,7 @@ module.exports = {
// Sets the callback to be run in every tick.
setTickCallback(processTicksAndRejections);
return {
tickId,
nextTick,
runNextTicks
};
Expand Down
44 changes: 44 additions & 0 deletions test/parallel/test-stream-writable-sync-error.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';
const common = require('../common');

// Tests for the regression in _stream_writable fixed in
// https://github.com/nodejs/node/pull/31756

// Specifically, when a write callback is invoked synchronously
// with an error, and autoDestroy is not being used, the error
// should still be emitted on nextTick.

const { Writable } = require('stream');

class MyStream extends Writable {
#cb = undefined;

constructor() {
super({ autoDestroy: false });
}

_write(_, __, cb) {
this.#cb = cb;
}

close() {
// Synchronously invoke the callback with an error.
this.#cb(new Error('foo'));
}
}

const stream = new MyStream();

const mustError = common.mustCall(2);

stream.write('test', () => {});

// Both error callbacks should be invoked.

stream.on('error', mustError);

stream.close();

// Without the fix in #31756, the error handler
// added after the call to close will not be invoked.
stream.on('error', mustError);