Skip to content

Commit

Permalink
fix: propagate duplex transform sink errors (#8)
Browse files Browse the repository at this point in the history
Sink errors were either getting smothered or causing unhandled promise rejections depending on your platform.

This breaks on Chrome when creating an abortable-iterator from a duplex transform stream that you then try to abort.

Instead if the sink of a duplex stream being used as a transform returns a promise, propagate errors thrown by the sink into it's source so they can be caught and handled by the consumer.
  • Loading branch information
achingbrain committed Jan 13, 2022
1 parent 45859d3 commit 89df9be
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,14 @@
"license": "MIT",
"devDependencies": {
"aegir": "^36.1.3",
"delay": "^5.0.0",
"it-all": "^1.0.6",
"it-drain": "^1.0.5",
"streaming-iterables": "^6.0.0"
},
"dependencies": {
"it-merge": "^1.0.4",
"it-pushable": "^2.0.0",
"it-stream-types": "^1.0.3"
},
"repository": {
Expand Down
16 changes: 15 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { pushable } from 'it-pushable'
import merge from 'it-merge'
import type * as it from 'it-stream-types'

export const rawPipe = (...fns: any) => {
Expand All @@ -22,7 +24,19 @@ export const isDuplex = <TSource, TSink = TSource, RSink = Promise<void>> (obj:

const duplexPipelineFn = <TSource> (duplex: any) => {
return (source: any): it.Source<TSource> => {
duplex.sink(source) // TODO: error on sink side is unhandled rejection - this is the same as pull streams
const p = duplex.sink(source)

if (p.then != null) {
const stream = pushable<TSource>()
p.then(() => {
stream.end()
}, (err: Error) => {
stream.end(err)
})

return merge(stream, duplex.source)
}

return duplex.source
}
}
Expand Down
23 changes: 22 additions & 1 deletion test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import { pipe } from '../src/index.js'
import all from 'it-all'
import drain from 'it-drain'
import { filter, collect, consume } from 'streaming-iterables'
import type { Duplex } from 'it-stream-types'
import delay from 'delay'
import type { Duplex, Source } from 'it-stream-types'

const oneTwoThree = () => [1, 2, 3]

Expand Down Expand Up @@ -116,4 +117,24 @@ describe('it-pipe', () => {

expect(result).to.be.undefined()
})

it('should propagate duplex transform sink errors', async () => {
const err = new Error('Aaargh')

await expect(
pipe(
oneTwoThree, {
source: (async function * () {
await delay(1000)
yield 5
}()),
sink: async (source: Source<number>) => {
await delay(20)
throw err
}
},
async (source) => await drain(source)
)
).to.eventually.be.rejected.with.property('message', err.message)
})
})

0 comments on commit 89df9be

Please sign in to comment.