Skip to content

Commit

Permalink
fixup: bodyMixin
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Aug 2, 2021
1 parent bb368bf commit 1b7f509
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 48 deletions.
105 changes: 57 additions & 48 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -1138,26 +1138,6 @@ Readable.prototype.iterator = function(options) {
return streamToAsyncIterator(this, options);
};

// https://fetch.spec.whatwg.org/#dom-body-text
Readable.prototype.text = function text() {
return consume(this, kTextType);
};

// https://fetch.spec.whatwg.org/#dom-body-json
Readable.prototype.json = function json() {
return consume(this, kJSONType);
};

// https://fetch.spec.whatwg.org/#dom-body-blob
Readable.prototype.blob = function blob() {
return consume(this, kBlobType);
};

// https://fetch.spec.whatwg.org/#dom-body-arraybuffer
Readable.prototype.arrayBuffer = function arrayBuffer() {
return consume(this, kArrayBufferType);
};

function streamToAsyncIterator(stream, options) {
if (typeof stream.read !== 'function') {
// v1 stream
Expand Down Expand Up @@ -1325,34 +1305,6 @@ ObjectDefineProperties(Readable.prototype, {
get() {
return this._readableState ? this._readableState.endEmitted : false;
}
},

// https://fetch.spec.whatwg.org/#dom-body-bodyused
bodyUsed: {
get() {
return isDisturbed(this);
}
},

// https://fetch.spec.whatwg.org/#dom-body-body
body: {
get() {
// Compat
if (this[kHasBody]) {
return this[kBody];
}

if (this[kConsume] && this[kConsume].type === kWebStreamType) {
return this[kConsume].stream;
}

return consume(this, kWebStreamType);
},
// Compat
set(value) {
this[kBody] = value;
this[kHasBody] = true;
}
}
});

Expand Down Expand Up @@ -1696,6 +1648,63 @@ function endWritableNT(state, stream) {
}
}

function bodyMixin(readable) {
// https://fetch.spec.whatwg.org/#dom-body-text
readable.text = function text() {
return consume(this, kTextType);
};

// https://fetch.spec.whatwg.org/#dom-body-json
readable.json = function json() {
return consume(this, kJSONType);
};

// https://fetch.spec.whatwg.org/#dom-body-blob
readable.blob = function blob() {
return consume(this, kBlobType);
};

// https://fetch.spec.whatwg.org/#dom-body-arraybuffer
readable.arrayBuffer = function arrayBuffer() {
return consume(this, kArrayBufferType);
};


ObjectDefineProperties(readable, {
// https://fetch.spec.whatwg.org/#dom-body-bodyused
bodyUsed: {
get() {
return isDisturbed(this);
}
},

// https://fetch.spec.whatwg.org/#dom-body-body
body: {
get() {
// Compat
if (this[kHasBody]) {
return this[kBody];
}

if (this[kConsume] && this[kConsume].type === kWebStreamType) {
return this[kConsume].stream;
}

return consume(this, kWebStreamType);
},
// Compat
set(value) {
this[kBody] = value;
this[kHasBody] = true;
}
}
});

return readable;
}

Readable.bodyMixin = bodyMixin;

Readable.from = function(iterable, opts) {
return from(Readable, iterable, opts);
};
Expand Down
35 changes: 35 additions & 0 deletions tmp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict';
const {
Transform,
Writable,
pipeline,
} = require('stream');
const assert = require('assert');

function createTransformStream(tf, context) {
return new Transform({
readableObjectMode: true,
writableObjectMode: true,

transform(chunk, encoding, done) {
tf(chunk, context, done);
}
});
}

const write = new Writable({
write(data, enc, cb) {
cb();
}
});

const ts = createTransformStream((chunk, _, done) => {
return done(new Error('Artificial error'));
});

pipeline(ts, write, (err) => {
assert.ok(err, 'should have an error');
console.log(err);
});

ts.write('test');

0 comments on commit 1b7f509

Please sign in to comment.