diff --git a/.changeset/silver-walls-remain.md b/.changeset/silver-walls-remain.md new file mode 100644 index 00000000..e821a2ca --- /dev/null +++ b/.changeset/silver-walls-remain.md @@ -0,0 +1,5 @@ +--- +"barnard59-core": patch +--- + +Pipeline would never finish if the last step was an async generator with a `break` statement diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 71073393..bf03447d 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -46,6 +46,16 @@ jobs: - run: npm ci - run: npx wsrun -mc build + test-e2e: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 + with: + node-version: 20 + - run: npm ci + - run: npm run -w barnard59-test-e2e test + test-cli: runs-on: ubuntu-latest strategy: diff --git a/package-lock.json b/package-lock.json index 68528930..41789561 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27679,7 +27679,7 @@ }, "packages/cli": { "name": "barnard59", - "version": "5.0.4", + "version": "5.0.5", "license": "MIT", "dependencies": { "@opentelemetry/api": "^1.0.0", @@ -27691,7 +27691,7 @@ "@opentelemetry/semantic-conventions": "^0.24.0", "@opentelemetry/tracing": "^0.24.0", "@rdfjs/namespace": "^2.0.0", - "barnard59-core": "6.0.1", + "barnard59-core": "6.1.0", "barnard59-env": "1.2.6", "commander": "^11.0.0", "find-up": "^7.0.0", @@ -27718,7 +27718,7 @@ "barnard59-base": "^2.4.2", "barnard59-formats": "^4.0.0", "barnard59-graph-store": "^6.0.1", - "barnard59-http": "^2.0.0", + "barnard59-http": "^2.1.0", "barnard59-shell": "^0.1.0", "barnard59-test-support": "^0.0.3", "chai": "^4.3.7", @@ -27847,7 +27847,7 @@ }, "packages/core": { "name": "barnard59-core", - "version": "6.0.1", + "version": "6.1.0", "license": "MIT", "dependencies": { "@opentelemetry/api": "^1.0.1", @@ -27867,7 +27867,7 @@ "@rdfjs/namespace": "^2.0.0", "@types/readable-stream": "^4.0.9", "barnard59-env": "^1.2.2", - "barnard59-http": "^2.0.0", + "barnard59-http": "^2.1.0", "barnard59-test-support": "^0.0.3", "chai": "^4.3.7", "get-stream": "^6.0.1", @@ -27940,12 +27940,12 @@ }, "packages/cube": { "name": "barnard59-cube", - "version": "1.4.8", + "version": "1.4.9", "license": "MIT", "dependencies": { "barnard59-base": "^2.4.2", "barnard59-formats": "^4.0.0", - "barnard59-http": "^2.0.0", + "barnard59-http": "^2.1.0", "barnard59-rdf": "^3.4.0", "barnard59-shacl": "^1.4.9", "barnard59-sparql": "^2.3.0", @@ -28254,7 +28254,7 @@ }, "packages/http": { "name": "barnard59-http", - "version": "2.0.0", + "version": "2.1.0", "license": "MIT", "dependencies": { "@opentelemetry/api": "^1.0.1", @@ -28466,7 +28466,7 @@ }, "packages/shacl": { "name": "barnard59-shacl", - "version": "1.4.9", + "version": "1.4.10", "license": "MIT", "dependencies": { "@rdfjs/fetch": "^3.0.0", diff --git a/packages/core/lib/Pipeline.ts b/packages/core/lib/Pipeline.ts index dd80266b..dba7d809 100644 --- a/packages/core/lib/Pipeline.ts +++ b/packages/core/lib/Pipeline.ts @@ -3,7 +3,7 @@ import once from 'onetime' import type { Stream } from 'readable-stream' import streams from 'readable-stream' import createStream, { assertWritable } from './factory/stream.js' -import { isReadable } from './isStream.js' +import { isReadable, isWritable } from './isStream.js' import nextLoop from './nextLoop.js' import type { Options as BaseOptions } from './StreamObject.js' import StreamObject from './StreamObject.js' @@ -116,6 +116,15 @@ class Pipeline extends StreamObject { this.children[index].stream.pipe(child.stream) } + this.lastChild.stream.on('end', () => { + // in some cases, a duplex stream emits the end event but is still writable + // which prevents the pipeline from reaching the finished callback below + // in such case, a pipeline never finishes, and the process hangs + if (!isWritable(this.lastChild.stream) && !isReadable(this.lastChild.stream)) { + this.finish() + } + }) + finished(this.lastChild.stream, err => { if (!err) { this.finish() @@ -123,8 +132,7 @@ class Pipeline extends StreamObject { this.logger.error(err) } }) - // eslint-disable-next-line @typescript-eslint/no-explicit-any - } catch (err: any) { + } catch (err: any) { // eslint-disable-line @typescript-eslint/no-explicit-any this.destroy(err) this.logger.error(err, { iri: this.ptr.value }) diff --git a/test/e2e/forEach.e2e.test.js b/test/e2e/forEach.e2e.test.js index 990b5929..34373519 100644 --- a/test/e2e/forEach.e2e.test.js +++ b/test/e2e/forEach.e2e.test.js @@ -1,8 +1,9 @@ -import { deepStrictEqual, strictEqual } from 'node:assert' +import { strictEqual } from 'node:assert' import { createPipeline } from 'barnard59-core' import getStream from 'get-stream' import { pipelineDefinitionLoader } from 'barnard59-test-support/loadPipelineDefinition.js' import env from 'barnard59-env' +import { expect } from 'chai' const loadPipelineDefinition = pipelineDefinitionLoader(import.meta.url, 'definitions') @@ -36,7 +37,7 @@ describe('forEach', () => { const out = await getStream.array(pipeline.stream) - deepStrictEqual(out, [ + expect(out).to.contain.all.members([ '/root/definitions/foreach/csv-duplicate.ttl', '/root/definitions/foreach/with-handler.ttl', '/root/definitions/foreach/with-variable.ttl', diff --git a/test/e2e/pipeline.e2e.test.js b/test/e2e/pipeline.e2e.test.js index a22ce95e..7766db46 100644 --- a/test/e2e/pipeline.e2e.test.js +++ b/test/e2e/pipeline.e2e.test.js @@ -76,7 +76,7 @@ describe('Pipeline', () => { const out = await fromStream(env.dataset(), env.formats.parsers.import('text/turtle', pipeline.stream)) // then - const source = await fromStream(env.dataset(), fromFile('definitions/file-loader.ttl')) + const source = await fromStream(env.dataset(), fromFile(env, 'definitions/file-loader.ttl')) expect(toCanonical(out)).to.eq(toCanonical(source)) })