Skip to content

Commit

Permalink
stream: first error wins and cannot be overriden
Browse files Browse the repository at this point in the history
The first stream error is the only one that gets
emitted as 'error' or forwarded in callbacks. Also
it cannot be override by _destroy.

Refs: #30979
  • Loading branch information
ronag committed Dec 15, 2019
1 parent 67ed526 commit 2c96024
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 53 deletions.
4 changes: 2 additions & 2 deletions lib/_stream_readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ function ReadableState(options, stream, isDuplex) {
// Has it been destroyed
this.destroyed = false;

// Indicates whether the stream has errored.
this.errored = false;
// Indicates whether the stream has errored. Contains the error.
this.errored = null;

// Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
Expand Down
10 changes: 8 additions & 2 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ function WritableState(options, stream, isDuplex) {
// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;
// Contains the error.
this.errored = null;

// Count buffered requests
this.bufferedRequestCount = 0;
Expand Down Expand Up @@ -488,7 +489,12 @@ function onwrite(stream, er) {
state.writelen = 0;

if (er) {
state.errored = true;
if (!state.errored) {
state.errored = er;
}
if (stream._readableState && !stream._readableState.errored) {
stream._readableState.errored = er;
}
if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {
Expand Down
54 changes: 28 additions & 26 deletions lib/internal/streams/destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,19 @@ function destroy(err, cb) {
const w = this._writableState;

if (err) {
if (w) {
w.errored = true;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
}

if ((w && w.destroyed) || (r && r.destroyed)) {
if (cb) {
cb(err);
} else if (err) {
process.nextTick(emitErrorNT, this, err);
// TODO(ronag): Wait until 'close' is emitted (if not already emitted).
cb((w && w.errored) || (r && r.errored));
}

return this;
}

Expand All @@ -38,32 +36,30 @@ function destroy(err, cb) {

this._destroy(err || null, (err) => {
if (err) {
if (w) {
w.errored = true;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
}

if (cb) {
// Invoke callback before scheduling emitClose so that callback
// can schedule before.
cb(err);
cb((w && w.errored) || (r && r.errored));
// Don't emit 'error' if passed a callback.
process.nextTick(emitCloseNT, this);
} else if (err) {
process.nextTick(emitErrorCloseNT, this, err);
} else {
process.nextTick(emitCloseNT, this);
process.nextTick(emitErrorCloseNT, this);
}
});

return this;
}

function emitErrorCloseNT(self, err) {
emitErrorNT(self, err);
function emitErrorCloseNT(self) {
emitErrorNT(self);
emitCloseNT(self);
}

Expand All @@ -76,10 +72,16 @@ function emitCloseNT(self) {
}
}

function emitErrorNT(self, err) {
function emitErrorNT(self) {
const r = self._readableState;
const w = self._writableState;

const err = (w && w.errored) || (r && r.errored);

if (!err) {
return;
}

if ((w && w.errorEmitted) || (r && r.errorEmitted)) {
return;
}
Expand All @@ -100,7 +102,7 @@ function undestroy() {

if (r) {
r.destroyed = false;
r.errored = false;
r.errored = null;
r.reading = false;
r.ended = false;
r.endEmitted = false;
Expand All @@ -109,7 +111,7 @@ function undestroy() {

if (w) {
w.destroyed = false;
w.errored = false;
w.errored = null;
w.ended = false;
w.ending = false;
w.finalCalled = false;
Expand All @@ -132,13 +134,13 @@ function errorOrDestroy(stream, err) {
if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (err) {
if (w) {
w.errored = true;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
emitErrorNT(stream, err);
emitErrorNT(stream);
}
}

Expand Down
6 changes: 4 additions & 2 deletions test/parallel/test-stream-auto-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,29 @@ const assert = require('assert');
{
const r = new stream.Readable({
read() {
r2.emit('error', new Error('fail'));
r2.destroy(new Error('fail'));
}
});
const r2 = new stream.Readable({
autoDestroy: true,
destroy: common.mustCall((err, cb) => cb())
});
r2.on('error', common.mustCall());

r.pipe(r2);
}

{
const r = new stream.Readable({
read() {
w.emit('error', new Error('fail'));
w.destroy(new Error('fail'));
}
});
const w = new stream.Writable({
autoDestroy: true,
destroy: common.mustCall((err, cb) => cb())
});
w.on('error', common.mustCall());

r.pipe(w);
}
4 changes: 2 additions & 2 deletions test/parallel/test-stream-duplex-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ const assert = require('assert');
duplex.on('end', common.mustNotCall('no end event'));
duplex.on('finish', common.mustNotCall('no finish event'));

// Error is swallowed by the custom _destroy
duplex.on('error', common.mustNotCall('no error event'));
// Error is NOT swallowed by the custom _destroy
duplex.on('error', common.mustCall());
duplex.on('close', common.mustCall());

duplex.destroy(expected);
Expand Down
21 changes: 11 additions & 10 deletions test/parallel/test-stream-readable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ const assert = require('assert');

read.on('end', common.mustNotCall('no end event'));

// Error is swallowed by the custom _destroy
read.on('error', common.mustNotCall('no error event'));
// Error is NOT swallowed by the custom _destroy
read.on('error', common.mustCall());
read.on('close', common.mustCall());

read.destroy(expected);
Expand Down Expand Up @@ -134,13 +134,13 @@ const assert = require('assert');
read.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(read._readableState.errorEmitted, true);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(err, expected);
}));

read.destroy();
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(read.destroyed, true);
ticked = true;
}
Expand Down Expand Up @@ -190,15 +190,15 @@ const assert = require('assert');
// destroy(err, callback);
read.on('error', common.mustNotCall());

assert.strictEqual(read._readableState.errored, false);
assert.strictEqual(read._readableState.errored, null);
assert.strictEqual(read._readableState.errorEmitted, false);

read.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(err, expected);
}));
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
ticked = true;
}

Expand All @@ -223,14 +223,15 @@ const assert = require('assert');

readable.destroy();
assert.strictEqual(readable.destroyed, true);
assert.strictEqual(readable._readableState.errored, false);
assert.strictEqual(readable._readableState.errored, null);
assert.strictEqual(readable._readableState.errorEmitted, false);

// Test case where `readable.destroy()` is called again with an error before
// the `_destroy()` callback is called.
readable.destroy(new Error('kaboom 2'));
const expected = new Error('kaboom 2');
readable.destroy(expected);
assert.strictEqual(readable._readableState.errorEmitted, false);
assert.strictEqual(readable._readableState.errored, true);
assert.strictEqual(readable._readableState.errored, expected);

ticked = true;
}
Expand Down
4 changes: 2 additions & 2 deletions test/parallel/test-stream-transform-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ const assert = require('assert');
transform.on('close', common.mustCall());
transform.on('finish', common.mustNotCall('no finish event'));

// Error is swallowed by the custom _destroy
transform.on('error', common.mustNotCall('no error event'));
// Error is NOT swallowed by the custom _destroy
transform.on('error', common.mustCall());

transform.destroy(expected);
}
Expand Down
44 changes: 37 additions & 7 deletions test/parallel/test-stream-writable-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,33 @@ const assert = require('assert');
write.on('finish', common.mustNotCall('no finish event'));
write.on('close', common.mustCall());

// Error is swallowed by the custom _destroy
write.on('error', common.mustNotCall('no error event'));
// Error is NOT swallowed by the custom _destroy
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));

write.destroy(expected);
assert.strictEqual(write.destroyed, true);
}

{
const write = new Writable({
write(chunk, enc, cb) { cb(); },
destroy: common.mustCall(function(err, cb) {
assert.strictEqual(err, expected);
cb(new Error('not this error'));
})
});

const expected = new Error('kaboom');

write.on('finish', common.mustNotCall('no finish event'));
write.on('close', common.mustCall());

// Error is NOT overriden by the custom _destroy
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));

write.destroy(expected);
assert.strictEqual(write.destroyed, true);
Expand Down Expand Up @@ -167,9 +192,10 @@ const assert = require('assert');
assert.strictEqual(write._writableState.errorEmitted, true);
}));

write.destroy(new Error('kaboom 1'));
const expected = new Error('kaboom 1');
write.destroy(expected);
write.destroy(new Error('kaboom 2'));
assert.strictEqual(write._writableState.errored, true);
assert.strictEqual(write._writableState.errored, expected);
assert.strictEqual(write._writableState.errorEmitted, false);
assert.strictEqual(write.destroyed, true);
ticked = true;
Expand Down Expand Up @@ -198,14 +224,15 @@ const assert = require('assert');

writable.destroy();
assert.strictEqual(writable.destroyed, true);
assert.strictEqual(writable._writableState.errored, false);
assert.strictEqual(writable._writableState.errored, null);
assert.strictEqual(writable._writableState.errorEmitted, false);

// Test case where `writable.destroy()` is called again with an error before
// the `_destroy()` callback is called.
writable.destroy(new Error('kaboom 2'));
const expected = new Error('kaboom 2');
writable.destroy(expected);
assert.strictEqual(writable._writableState.errorEmitted, false);
assert.strictEqual(writable._writableState.errored, true);
assert.strictEqual(writable._writableState.errored, expected);

ticked = true;
}
Expand Down Expand Up @@ -249,6 +276,9 @@ const assert = require('assert');
write.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(err, expected);
}));
write.on('error', common.mustCall((err) => {
assert.strictEqual(err, expected);
}));
}

{
Expand Down

0 comments on commit 2c96024

Please sign in to comment.