Skip to content

Commit

Permalink
feat: implement body mixin on Readable (nodejs#907)
Browse files Browse the repository at this point in the history
* stream: implement body mixin on Readable

Port over PR from node core. It will not be possible to land
these changes on core due to ecosystem breakage.

Refs: nodejs/node#39520

* fixup: tests

* fixup

* fixup

* fixuP

* fixuP

* fixup: remove node specific stuff

* fixup: formData

* fixup: simplify

* fixup

* fixup

* fixup

* fixup: README

* fixup
  • Loading branch information
ronag authored and crysmags committed Feb 27, 2024
1 parent ed752f4 commit a15e2f4
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 24 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ for await (const data of body) {
console.log('trailers', trailers)
```

Using [the body mixin from the Fetch Standard](https://fetch.spec.whatwg.org/#body-mixin).

```js
import { request } from 'undici'

const {
statusCode,
headers,
trailers,
body
} = await request('http://localhost:3000/foo')

console.log('response received', statusCode)
console.log('headers', headers)
console.log('data', await body.json())
console.log('trailers', trailers)
```

## Common API Methods

This section documents our most commonly used API methods. Additional APIs are documented in their own files within the [docs](./docs/) folder and are accessible via the navigation list on the left side of the docs site.
Expand Down
10 changes: 9 additions & 1 deletion docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,20 @@ The `RequestOptions.method` property should not be value `'CONNECT'`.

* **statusCode** `number`
* **headers** `http.IncomingHttpHeaders`
* **body** `stream.Readable`
* **body** `stream.Readable` which also implements [the body mixin from the Fetch Standard](https://fetch.spec.whatwg.org/#body-mixin).
* **trailers** `Record<string, string>` - This object starts out
as empty and will be mutated to contain trailers after `body` has emitted `'end'`.
* **opaque** `unknown`
* **context** `object`

`body` contains the following additional [body mixin](https://fetch.spec.whatwg.org/#body-mixin) methods and properties:

- `text()`
- `json()`
- `arrayBuffer()`
- `body`
- `bodyUsed`

#### Example 1 - Basic GET Request

```js
Expand Down
25 changes: 2 additions & 23 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { Readable } = require('stream')
const Readable = require('./readable')
const {
InvalidArgumentError,
RequestAbortedError
Expand All @@ -9,27 +9,6 @@ const util = require('../core/util')
const { AsyncResource } = require('async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')

const kAbort = Symbol('abort')

class RequestResponse extends Readable {
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })
this[kAbort] = abort
}

_destroy (err, callback) {
if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (err) {
this[kAbort]()
}

callback(err)
}
}

class RequestHandler extends AsyncResource {
constructor (opts, callback) {
if (!opts || typeof opts !== 'object') {
Expand Down Expand Up @@ -92,7 +71,7 @@ class RequestHandler extends AsyncResource {
return
}

const body = new RequestResponse(resume, abort)
const body = new Readable(resume, abort)

this.callback = null
this.res = body
Expand Down
272 changes: 272 additions & 0 deletions lib/api/readable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
// Ported from https://github.com/nodejs/undici/pull/907

'use strict'

const assert = require('assert')
const { Readable } = require('stream')
const { RequestAbortedError, NotSupportedError } = require('../core/errors')

let Blob

const kConsume = Symbol('kConsume')
const kReading = Symbol('kReading')
const kBody = Symbol('kBody')
const kAbort = Symbol('abort')

module.exports = class BodyReadable extends Readable {
constructor (resume, abort) {
super({ autoDestroy: true, read: resume })

this._readableState.dataEmitted = false

this[kAbort] = abort
this[kConsume] = null
this[kBody] = null

// Is stream being consumed through Readable API?
// This is an optimization so that we avoid checking
// for 'data' and 'readable' listeners in the hot path
// inside push().
this[kReading] = false
}

destroy (err) {
if (this.destroyed) {
// Node < 16
return this
}

if (!err && !this._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (err) {
this[kAbort]()
}

return super.destroy(err)
}

emit (ev, ...args) {
// Waiting for: https://github.com/nodejs/node/pull/39589
if (ev === 'data') {
this._readableState.dataEmitted = true
} else if (ev === 'error') {
// Node < 16
this._readableState.errorEmitted = true
}
return super.emit(ev, ...args)
}

on (ev, ...args) {
if (ev === 'data' || ev === 'readable') {
this[kReading] = true
}
return super.on(ev, ...args)
}

addListener (ev, ...args) {
return this.on(ev, ...args)
}

off (ev, ...args) {
const ret = super.off(ev, ...args)
if (ev === 'data' || ev === 'readable') {
this[kReading] = (
this.listenerCount('data') > 0 ||
this.listenerCount('readable') > 0
)
}
return ret
}

removeListener (ev, ...args) {
return this.off(ev, ...args)
}

push (chunk) {
if (this[kConsume] && chunk !== null && !this[kReading]) {
consumePush(this[kConsume], chunk)
return true
} else {
return super.push(chunk)
}
}

// https://fetch.spec.whatwg.org/#dom-body-text
text () {
return consume(this, 'text')
}

// https://fetch.spec.whatwg.org/#dom-body-json
json () {
return consume(this, 'json')
}

// https://fetch.spec.whatwg.org/#dom-body-blob
blob () {
return consume(this, 'blob')
}

// https://fetch.spec.whatwg.org/#dom-body-arraybuffer
arrayBuffer () {
return consume(this, 'arrayBuffer')
}

// https://fetch.spec.whatwg.org/#dom-body-formdata
formData () {
// TODO: Implement.
throw new NotSupportedError()
}

// https://fetch.spec.whatwg.org/#dom-body-bodyused
get bodyUsed () {
return isDisturbed(this)
}

// https://fetch.spec.whatwg.org/#dom-body-body
get body () {
if (!this[kBody]) {
this[kBody] = Readable.toWeb(this)
if (this[kConsume]) {
// TODO: Is this the best way to force a lock?
this[kBody].getReader() // Ensure stream is locked.
assert(this[kBody].locked)
}
}
return this[kBody]
}
}

// https://streams.spec.whatwg.org/#readablestream-locked
function isLocked (self) {
// Consume is an implicit lock.
return (self[kBody] && self[kBody].locked === true) || self[kConsume]
}

// https://streams.spec.whatwg.org/#readablestream-disturbed
function isDisturbed (self) {
// Waiting for: https://github.com/nodejs/node/pull/39589
const { _readableState: state } = self
return !!(
state.dataEmitted ||
state.endEmitted ||
state.errorEmitted ||
state.closeEmitted
)
}

// https://fetch.spec.whatwg.org/#body-unusable
function isUnusable (self) {
return isDisturbed(self) || isLocked(self)
}

async function consume (stream, type) {
if (isUnusable(stream)) {
throw new TypeError('unusable')
}

assert(!stream[kConsume])

return new Promise((resolve, reject) => {
stream[kConsume] = {
type,
stream,
resolve,
reject,
length: 0,
body: []
}

stream
.on('error', function (err) {
consumeFinish(this[kConsume], err)
})
.on('close', function () {
if (this[kConsume].body !== null) {
consumeFinish(this[kConsume], new RequestAbortedError())
}
})

process.nextTick(consumeStart, stream[kConsume])
})
}

function consumeStart (consume) {
if (consume.body === null) {
return
}

const { _readableState: state } = consume.stream

for (const chunk of state.buffer) {
consumePush(consume, chunk)
}

if (state.endEmitted) {
consumeEnd(this[kConsume])
} else {
consume.stream.on('end', function () {
consumeEnd(this[kConsume])
})
}

consume.stream.resume()

while (consume.stream.read() != null) {
// Loop
}
}

function consumeEnd (consume) {
const { type, body, resolve, stream, length } = consume

try {
if (type === 'text') {
resolve(body.join(''))
} else if (type === 'json') {
resolve(JSON.parse(body.join('')))
} else if (type === 'arrayBuffer') {
const dst = new Uint8Array(length)

let pos = 0
for (const buf of body) {
dst.set(buf, pos)
pos += buf.byteLength
}

resolve(dst)
} else if (type === 'blob') {
if (!Blob) {
Blob = require('buffer').Blob
}
resolve(new Blob(body))
}

consumeFinish(consume)
} catch (err) {
stream.destroy(err)
}
}

function consumePush (consume, chunk) {
consume.length += chunk.length
consume.body.push(chunk)
}

function consumeFinish (consume, err) {
if (consume.body === null) {
return
}

if (err) {
consume.reject(err)
} else {
consume.resolve()
}

consume.reject = null
consume.resolve = null
consume.decoder = null
consume.body = null
}
Loading

0 comments on commit a15e2f4

Please sign in to comment.