Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: first error wins and cannot be overriden #30982

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ function ReadableState(options, stream, isDuplex) {
// Has it been destroyed
this.destroyed = false;

// Indicates whether the stream has errored.
this.errored = false;
// Indicates whether the stream has errored. Contains the error.
this.errored = null;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
Expand Down
10 changes: 8 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ function WritableState(options, stream, isDuplex) {
// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;
// Contains the error.
this.errored = null;

// Count buffered requests
this.bufferedRequestCount = 0;
Expand Down Expand Up @@ -488,7 +489,12 @@ function onwrite(stream, er) {
state.writelen = 0;

if (er) {
state.errored = true;
if (!state.errored) {
state.errored = er;
}
if (stream._readableState && !stream._readableState.errored) {
stream._readableState.errored = er;
}
if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
Expand Down
54 changes: 28 additions & 26 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,19 @@ function destroy(err, cb) {
const w = this._writableState;

if (err) {
if (w) {
w.errored = true;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
}

if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
} else if (err) {
process.nextTick(emitErrorNT, this, err);
// TODO(ronag): Wait until 'close' is emitted (if not already emitted).
cb((w && w.errored) || (r && r.errored));
}

return this;
}

Expand All @@ -38,32 +36,30 @@ function destroy(err, cb) {

this._destroy(err || null, (err) => {
if (err) {
if (w) {
w.errored = true;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
}

if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
cb(err);
cb((w && w.errored) || (r && r.errored));
// Don't emit 'error' if passed a callback.
process.nextTick(emitCloseNT, this);
} else if (err) {
process.nextTick(emitErrorCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
process.nextTick(emitErrorCloseNT, this);
}
});

return this;
}

function emitErrorCloseNT(self, err) {
emitErrorNT(self, err);
function emitErrorCloseNT(self) {
emitErrorNT(self);
emitCloseNT(self);
}

Expand All @@ -76,10 +72,16 @@ function emitCloseNT(self) {
}
}

function emitErrorNT(self, err) {
function emitErrorNT(self) {
const r = self._readableState;
const w = self._writableState;

const err = (w && w.errored) || (r && r.errored);

if (!err) {
return;
}

if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
return;
}
Expand All @@ -100,7 +102,7 @@ function undestroy() {

if (r) {
r.destroyed = false;
r.errored = false;
r.errored = null;
r.reading = false;
r.ended = false;
r.endEmitted = false;
Expand All @@ -109,7 +111,7 @@ function undestroy() {

if (w) {
w.destroyed = false;
w.errored = false;
w.errored = null;
w.ended = false;
w.ending = false;
w.finalCalled = false;
Expand All @@ -132,13 +134,13 @@ function errorOrDestroy(stream, err) {
if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (err) {
if (w) {
w.errored = true;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
emitErrorNT(stream, err);
emitErrorNT(stream);
}
}

Expand Down
6 changes: 4 additions & 2 deletions test/parallel/test-stream-auto-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,29 @@ const assert = require('assert');
{
const r = new stream.Readable({
read() {
r2.emit('error', new Error('fail'));
r2.destroy(new Error('fail'));
Copy link
Member Author

@ronag ronag Dec 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note this change. Calling emit('error' is not something that should be done. But do we need to support it?

}
});
const r2 = new stream.Readable({
autoDestroy: true,
destroy: common.mustCall((err, cb) => cb())
});
r2.on('error', common.mustCall());

r.pipe(r2);
}

{
const r = new stream.Readable({
read() {
w.emit('error', new Error('fail'));
w.destroy(new Error('fail'));
Copy link
Member Author

@ronag ronag Dec 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note this change. Calling emit('error' is not something that should be done. But do we need to support it?

}
});
const w = new stream.Writable({
autoDestroy: true,
destroy: common.mustCall((err, cb) => cb())
});
w.on('error', common.mustCall());

r.pipe(w);
}
4 changes: 2 additions & 2 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ const assert = require('assert');
duplex.on('end', common.mustNotCall('no end event'));
duplex.on('finish', common.mustNotCall('no finish event'));

// Error is swallowed by the custom _destroy
duplex.on('error', common.mustNotCall('no error event'));
// Error is NOT swallowed by the custom _destroy
duplex.on('error', common.mustCall());
duplex.on('close', common.mustCall());

duplex.destroy(expected);
Expand Down
21 changes: 11 additions & 10 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ const assert = require('assert');

read.on('end', common.mustNotCall('no end event'));

// Error is swallowed by the custom _destroy
read.on('error', common.mustNotCall('no error event'));
// Error is NOT swallowed by the custom _destroy
read.on('error', common.mustCall());
read.on('close', common.mustCall());

read.destroy(expected);
Expand Down Expand Up @@ -134,13 +134,13 @@ const assert = require('assert');
read.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(read._readableState.errorEmitted, true);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(err, expected);
}));

read.destroy();
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(read.destroyed, true);
ticked = true;
}
Expand Down Expand Up @@ -190,15 +190,15 @@ const assert = require('assert');
// destroy(err, callback);
read.on('error', common.mustNotCall());

assert.strictEqual(read._readableState.errored, false);
assert.strictEqual(read._readableState.errored, null);
assert.strictEqual(read._readableState.errorEmitted, false);

read.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(err, expected);
}));
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
ticked = true;
}

Expand All @@ -223,14 +223,15 @@ const assert = require('assert');

readable.destroy();
assert.strictEqual(readable.destroyed, true);
assert.strictEqual(readable._readableState.errored, false);
assert.strictEqual(readable._readableState.errored, null);
assert.strictEqual(readable._readableState.errorEmitted, false);

// Test case where `readable.destroy()` is called again with an error before
// the `_destroy()` callback is called.
readable.destroy(new Error('kaboom 2'));
const expected = new Error('kaboom 2');
readable.destroy(expected);
assert.strictEqual(readable._readableState.errorEmitted, false);
assert.strictEqual(readable._readableState.errored, true);
assert.strictEqual(readable._readableState.errored, expected);

ticked = true;
}
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-stream-transform-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ const assert = require('assert');
transform.on('close', common.mustCall());
transform.on('finish', common.mustNotCall('no finish event'));

// Error is swallowed by the custom _destroy
transform.on('error', common.mustNotCall('no error event'));
// Error is NOT swallowed by the custom _destroy
transform.on('error', common.mustCall());

transform.destroy(expected);
}
Expand Down
44 changes: 37 additions & 7 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,33 @@ const assert = require('assert');
write.on('finish', common.mustNotCall('no finish event'));
write.on('close', common.mustCall());

// Error is swallowed by the custom _destroy
write.on('error', common.mustNotCall('no error event'));
// Error is NOT swallowed by the custom _destroy
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));

write.destroy(expected);
assert.strictEqual(write.destroyed, true);
}

{
const write = new Writable({
write(chunk, enc, cb) { cb(); },
destroy: common.mustCall(function(err, cb) {
assert.strictEqual(err, expected);
cb(new Error('not this error'));
})
});

const expected = new Error('kaboom');

write.on('finish', common.mustNotCall('no finish event'));
write.on('close', common.mustCall());

// Error is NOT overriden by the custom _destroy
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));

write.destroy(expected);
assert.strictEqual(write.destroyed, true);
Expand Down Expand Up @@ -167,9 +192,10 @@ const assert = require('assert');
assert.strictEqual(write._writableState.errorEmitted, true);
}));

write.destroy(new Error('kaboom 1'));
const expected = new Error('kaboom 1');
write.destroy(expected);
write.destroy(new Error('kaboom 2'));
assert.strictEqual(write._writableState.errored, true);
assert.strictEqual(write._writableState.errored, expected);
assert.strictEqual(write._writableState.errorEmitted, false);
assert.strictEqual(write.destroyed, true);
ticked = true;
Expand Down Expand Up @@ -198,14 +224,15 @@ const assert = require('assert');

writable.destroy();
assert.strictEqual(writable.destroyed, true);
assert.strictEqual(writable._writableState.errored, false);
assert.strictEqual(writable._writableState.errored, null);
assert.strictEqual(writable._writableState.errorEmitted, false);

// Test case where `writable.destroy()` is called again with an error before
// the `_destroy()` callback is called.
writable.destroy(new Error('kaboom 2'));
const expected = new Error('kaboom 2');
writable.destroy(expected);
assert.strictEqual(writable._writableState.errorEmitted, false);
assert.strictEqual(writable._writableState.errored, true);
assert.strictEqual(writable._writableState.errored, expected);

ticked = true;
}
Expand Down Expand Up @@ -249,6 +276,9 @@ const assert = require('assert');
write.destroy(expected, common.mustCall(function(err) {
Copy link
Member Author

@ronag ronag Dec 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroy(err, cb)is now able to override the no error state from destroy() above.

assert.strictEqual(err, expected);
}));
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
}

{
Expand Down