Skip to content

Commit

Permalink
ci: run e2e tests (#327)
Browse files Browse the repository at this point in the history
* ci: run e2e tests

* test: fix usage of `fromFile`

* test: better assertion

* fix(limit): stalling stream

* revert: keep break in limit step

* Delete .changeset/odd-monkeys-wave.md

* fix: unending pipelines

* fix: handle end and finish events

* fix: ensure stream is finished

* refactor: comment + call this.finish()
  • Loading branch information
tpluscode authored Sep 19, 2024
1 parent ad5f176 commit 10e1437
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 15 deletions.
5 changes: 5 additions & 0 deletions .changeset/silver-walls-remain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"barnard59-core": patch
---

Pipeline would never finish if the last step was an async generator with a `break` statement
10 changes: 10 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ jobs:
- run: npm ci
- run: npx wsrun -mc build

test-e2e:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
- run: npm ci
- run: npm run -w barnard59-test-e2e test

test-cli:
runs-on: ubuntu-latest
strategy:
Expand Down
18 changes: 9 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions packages/core/lib/Pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import once from 'onetime'
import type { Stream } from 'readable-stream'
import streams from 'readable-stream'
import createStream, { assertWritable } from './factory/stream.js'
import { isReadable } from './isStream.js'
import { isReadable, isWritable } from './isStream.js'
import nextLoop from './nextLoop.js'
import type { Options as BaseOptions } from './StreamObject.js'
import StreamObject from './StreamObject.js'
Expand Down Expand Up @@ -116,15 +116,23 @@ class Pipeline extends StreamObject<Stream & { pipeline?: Pipeline }> {
this.children[index].stream.pipe(child.stream)
}

this.lastChild.stream.on('end', () => {
// in some cases, a duplex stream emits the end event but is still writable
// which prevents the pipeline from reaching the finished callback below
// in such case, a pipeline never finishes, and the process hangs
if (!isWritable(this.lastChild.stream) && !isReadable(this.lastChild.stream)) {
this.finish()
}
})

finished(this.lastChild.stream, err => {
if (!err) {
this.finish()
} else {
this.logger.error(err)
}
})
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (err: any) {
} catch (err: any) { // eslint-disable-line @typescript-eslint/no-explicit-any
this.destroy(err)

this.logger.error(err, { iri: this.ptr.value })
Expand Down
5 changes: 3 additions & 2 deletions test/e2e/forEach.e2e.test.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { deepStrictEqual, strictEqual } from 'node:assert'
import { strictEqual } from 'node:assert'
import { createPipeline } from 'barnard59-core'
import getStream from 'get-stream'
import { pipelineDefinitionLoader } from 'barnard59-test-support/loadPipelineDefinition.js'
import env from 'barnard59-env'
import { expect } from 'chai'

const loadPipelineDefinition = pipelineDefinitionLoader(import.meta.url, 'definitions')

Expand Down Expand Up @@ -36,7 +37,7 @@ describe('forEach', () => {

const out = await getStream.array(pipeline.stream)

deepStrictEqual(out, [
expect(out).to.contain.all.members([
'/root/definitions/foreach/csv-duplicate.ttl',
'/root/definitions/foreach/with-handler.ttl',
'/root/definitions/foreach/with-variable.ttl',
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/pipeline.e2e.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe('Pipeline', () => {
const out = await fromStream(env.dataset(), env.formats.parsers.import('text/turtle', pipeline.stream))

// then
const source = await fromStream(env.dataset(), fromFile('definitions/file-loader.ttl'))
const source = await fromStream(env.dataset(), fromFile(env, 'definitions/file-loader.ttl'))
expect(toCanonical(out)).to.eq(toCanonical(source))
})

Expand Down

0 comments on commit 10e1437

Please sign in to comment.