From 7c2ac1d1ca502e71bf7fc281b51e95e954b788c8 Mon Sep 17 00:00:00 2001 From: paed01 Date: Tue, 18 Jul 2023 07:04:16 +0200 Subject: [PATCH] return router from middleware instead of a list of handlers - expose engines on middleware - implement multer _removeFile --- README.md | 8 +- index.d.ts | 10 +- package.json | 2 +- src/MulterAdapterStorage.js | 10 +- src/index.js | 57 +++++---- test-d/index.test-d.ts | 7 +- test/features/storage-adapter-feature.js | 154 +++++++++++++++++++++++ 7 files changed, 206 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index 469e01c..702e8ce 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ import express from 'express'; import { bpmnEngineMiddleware, HttpError } from 'bpmn-express-middleware'; const app = express(); -app.use('/rest', bpmnEngineMiddleware()); +app.use('/rest', bpmnEngineMiddleware({ idleTimeout: 90000 })); app.use(errorHandler); @@ -27,6 +27,8 @@ function errorHandler(err, req, res, next) { } ``` +# Middleware + ## Options - `adapter`: Optional [storage adapter](#storage-adapter). Defaults to in-memory adapter based on LRU cache @@ -174,8 +176,8 @@ Persistent storage adapter, defaults to in memory storage. Three types will be saved to adapter: - `deployment`: BPMN deployment with references to BPMN files -- `file`: BPMN file with content -- `state`: BPMN process engine state +- `file`: BPMN file with meta and content +- `state`: BPMN engine state ## `async upsert(type, key, value[, options])` diff --git a/index.d.ts b/index.d.ts index 0a75b68..54ee148 100644 --- a/index.d.ts +++ b/index.d.ts @@ -1,4 +1,4 @@ -import { Request, Response, NextFunction, Handler } from 'express' +import { Request, Response, NextFunction, Handler, Router } from 'express' import { Engine, BpmnEngineOptions, BpmnEngineExecutionState, BpmnMessage } from 'bpmn-engine' import { LRUCache } from 'lru-cache'; import { Broker } from 'smqp'; @@ -143,7 +143,7 @@ export class BpmnEngineMiddleware { /** GET (*)?/status/:token */ getStatusByToken(req: Request, res: Response, next: NextFunction): Promise; /** GET (*)?/status/:token/:activityId */ - getActivityStatus(req: Request, res: Response, next: NextFunction): Promise; + getActivityStatus(req: Request, res: Response, next: NextFunction): Promise; /** POST (*)?/resume/:token */ resumeByToken(req: Request, res: Response, next: NextFunction): Promise; /** POST (*)?/signal/:token */ @@ -162,4 +162,8 @@ export class BpmnEngineMiddleware { internalStopByToken(req: Request, res: Response, next: NextFunction): void; } -export function bpmnEngineMiddleware(options?: BpmnEngineMiddlewareOptions): Handler[]; +interface MiddlewareReturnType extends Router { + engines: Engines; +} + +export function bpmnEngineMiddleware(options?: BpmnEngineMiddlewareOptions): MiddlewareReturnType; diff --git a/package.json b/package.json index f2309ce..2f19b20 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "bpmn-middleware", - "version": "0.0.3", + "version": "0.0.4", "description": "BPMN engine express middleware", "type": "module", "main": "dist/main.cjs", diff --git a/src/MulterAdapterStorage.js b/src/MulterAdapterStorage.js index 028a609..e2016e5 100644 --- a/src/MulterAdapterStorage.js +++ b/src/MulterAdapterStorage.js @@ -16,7 +16,7 @@ MulterAdapterStorage.prototype._handleFile = function handleFile(req, file, call }) .on('end', async () => { try { - await this.adapter.upsert(STORAGE_TYPE_FILE, fileName, content); + await this.adapter.upsert(STORAGE_TYPE_FILE, fileName, { ...file, content }); return callback(null, { path: fileName, size }); } catch (err) { return callback(err); @@ -25,6 +25,10 @@ MulterAdapterStorage.prototype._handleFile = function handleFile(req, file, call .on('error', callback); }; -MulterAdapterStorage.prototype._removeFile = function removeFile(req, file, callback) { - callback(); +MulterAdapterStorage.prototype._removeFile = async function removeFile(req, file, callback) { + try { + await this.adapter.delete(STORAGE_TYPE_FILE, file.originalname); + } finally { + callback(); + } }; diff --git a/src/index.js b/src/index.js index e5f10ca..b213e5f 100644 --- a/src/index.js +++ b/src/index.js @@ -14,8 +14,6 @@ import { fromActivityApi } from './Caller.js'; const packageInfo = fs.promises.readFile(join(process.cwd(), 'package.json')).then((content) => JSON.parse(content)); -const kInitialized = Symbol.for('adapter init'); - export { Engines, MemoryAdapter, HttpError, MiddlewareEngine }; export * from './constants.js'; @@ -34,37 +32,39 @@ export function bpmnEngineMiddleware(options) { const router = new Router({ mergeParams: true }); - const middleware = [ - router.use(engineMiddleware.init), - router.get('(*)?/version', engineMiddleware.getVersion), - router.get('(*)?/deployment', engineMiddleware.getDeployment), - router.post('(*)?/deployment/create', multer({ storage }).any(), engineMiddleware.create), - router.post('(*)?/process-definition/:deploymentName/start', json(), engineMiddleware.start), - router.get('(*)?/running', engineMiddleware.getRunning), - router.get('(*)?/status/:token', engineMiddleware.getStatusByToken), - router.get('(*)?/status/:token/:activityId', engineMiddleware.getActivityStatus), - router.post('(*)?/resume/:token', json(), engineMiddleware.resumeByToken), - router.post('(*)?/signal/:token', json(), engineMiddleware.signalActivity), - router.post('(*)?/cancel/:token', json(), engineMiddleware.cancelActivity), - router.post('(*)?/fail/:token', json(), engineMiddleware.failActivity), - router.get('(*)?/state/:token', engineMiddleware.getStateByToken), - router.delete('(*)?/state/:token', engineMiddleware.deleteStateByToken), - router.delete('(*)?/internal/stop', engineMiddleware.internalStopAll), - router.delete('(*)?/internal/stop/:token', engineMiddleware.internalStopByToken), - ]; - - Object.defineProperties(middleware, { engines: { value: engines } }); - - return middleware; + let initialized = false; + + router.use((req, res, next) => { + if (initialized) return next(); + initialized = true; + return engineMiddleware.init(req, res, next); + }); + router.get('(*)?/version', engineMiddleware.getVersion); + router.get('(*)?/deployment', engineMiddleware.getDeployment); + router.post('(*)?/deployment/create', multer({ storage }).any(), engineMiddleware.create); + router.post('(*)?/process-definition/:deploymentName/start', json(), engineMiddleware.start); + router.get('(*)?/running', engineMiddleware.getRunning); + router.get('(*)?/status/:token', engineMiddleware.getStatusByToken); + router.get('(*)?/status/:token/:activityId', engineMiddleware.getActivityStatus); + router.post('(*)?/resume/:token', json(), engineMiddleware.resumeByToken); + router.post('(*)?/signal/:token', json(), engineMiddleware.signalActivity); + router.post('(*)?/cancel/:token', json(), engineMiddleware.cancelActivity); + router.post('(*)?/fail/:token', json(), engineMiddleware.failActivity); + router.get('(*)?/state/:token', engineMiddleware.getStateByToken); + router.delete('(*)?/state/:token', engineMiddleware.deleteStateByToken); + router.delete('(*)?/internal/stop', engineMiddleware.internalStopAll); + router.delete('(*)?/internal/stop/:token', engineMiddleware.internalStopByToken); + + Object.defineProperties(router, { engines: { value: engines } }); + + return router; } export function BpmnEngineMiddleware(options) { this.adapter = options.adapter; this.engines = options.engines; this.engineOptions = { ...options.engineOptions }; - this[kInitialized] = false; - this.init = this.init.bind(this); this.getVersion = this.getVersion.bind(this); this.getDeployment = this.getDeployment.bind(this); this.create = this.create.bind(this); @@ -83,8 +83,6 @@ export function BpmnEngineMiddleware(options) { } BpmnEngineMiddleware.prototype.init = function init(req, res, next) { - if (this[kInitialized]) return next(); - this[kInitialized] = true; req.app.on('bpmn/end', (engine) => this._postProcessRun(engine)); req.app.on('bpmn/error', (err, engine) => this._postProcessRun(engine, err)); req.app.on('bpmn/activity.call', (callActivityApi) => this._startProcessByCallActivity(callActivityApi)); @@ -249,13 +247,14 @@ BpmnEngineMiddleware.prototype._startDeployment = async function startDeployment if (!deployment) return; const { listener, variables, businessKey, caller, idleTimeout } = options; + const deploymentSource = await this.adapter.fetch(STORAGE_TYPE_FILE, deployment[0].path); const token = randomUUID(); await this.engines.execute({ ...this.engineOptions, name: deploymentName, token, - source: (await this.adapter.fetch(STORAGE_TYPE_FILE, deployment[0].path)), + source: deploymentSource.content, listener, variables: { ...this.engineOptions.variables, diff --git a/test-d/index.test-d.ts b/test-d/index.test-d.ts index b3a9095..e06ae99 100644 --- a/test-d/index.test-d.ts +++ b/test-d/index.test-d.ts @@ -1,10 +1,11 @@ -import { expectType } from 'tsd'; -import { Handler } from 'express'; +import { expectAssignable, expectType } from 'tsd'; +import { Router } from 'express'; import { LRUCache } from 'lru-cache'; import { bpmnEngineMiddleware, Engines, MemoryAdapter, MiddlewareEngine } from '../'; -expectType(bpmnEngineMiddleware()); +expectAssignable(bpmnEngineMiddleware()); +expectType(bpmnEngineMiddleware().engines); expectType(new Engines({ adapter: new MemoryAdapter(), engineCache: new LRUCache({max: 1000}), diff --git a/test/features/storage-adapter-feature.js b/test/features/storage-adapter-feature.js index d9d60ac..f5b0574 100644 --- a/test/features/storage-adapter-feature.js +++ b/test/features/storage-adapter-feature.js @@ -1,6 +1,7 @@ import request from 'supertest'; import * as ck from 'chronokinesis'; import { LRUCache } from 'lru-cache'; +import FormData from 'form-data'; import { getAppWithExtensions, createDeployment, waitForProcess, horizontallyScaled } from '../helpers/testHelpers.js'; import { MemoryAdapter } from '../../src/index.js'; @@ -667,4 +668,157 @@ Feature('storage adapter', () => { expect(apps.getRunning()).to.have.length(1); }); }); + + Scenario('storage adapter throws on upsert file', () => { + let apps, storage; + after(() => { + return apps.stop(); + }); + + Given('a faulty storage adapter', () => { + storage = new LRUCache({ max: 1000 }); + class VolatileAdapter extends MemoryAdapter { + upsert(type, key, value) { + if (type === STORAGE_TYPE_FILE) { + return Promise.reject(new Error('DB file error')); + } else { + return super.upsert(type, key, value); + } + } + } + + apps = horizontallyScaled(2, { adapter: new VolatileAdapter(storage) }); + }); + + let response; + And('a process is deployed', async () => { + response = await createDeployment(apps.balance(), 'faulty-adapter', ` + + + + + `); + }); + + Then('error is returned', () => { + expect(response.statusCode, response.text).to.equal(502); + expect(response.body.message).to.equal('DB file error'); + }); + }); + + Scenario('storage adapter throws on upsert multiple files', () => { + let apps, adapter; + after(() => { + return apps.stop(); + }); + + Given('a faulty storage adapter', () => { + class VolatileAdapter extends MemoryAdapter { + upsert(type, key, value) { + if (type === STORAGE_TYPE_FILE && key === 'multiple-file-process.json') { + return Promise.reject(new Error('DB json file error')); + } else { + return super.upsert(type, key, value); + } + } + } + + adapter = new VolatileAdapter(); + apps = horizontallyScaled(2, { adapter }); + }); + + let deploymentName, response; + And('a process is with multiple files', async () => { + deploymentName = 'multiple-file-process'; + const form = new FormData(); + form.append('deployment-name', deploymentName); + form.append('deployment-source', 'Test modeler'); + form.append(`${deploymentName}.bpmn`, ` + + + + + `, `${deploymentName}.bpmn`); + + form.append(`${deploymentName}.json`, Buffer.from('{"foo":"bar"}'), `${deploymentName}.json`); + + response = await apps.request() + .post('/rest/deployment/create') + .set(form.getHeaders()) + .send(form.getBuffer().toString()); + }); + + Then('error is returned', () => { + expect(response.statusCode, response.text).to.equal(502); + expect(response.body.message).to.equal('DB json file error'); + }); + + And('storage lack both files', async () => { + expect(await adapter.fetch(STORAGE_TYPE_FILE, `${deploymentName}.json`), `${deploymentName}.json`).to.not.be.ok; + expect(await adapter.fetch(STORAGE_TYPE_FILE, `${deploymentName}.bpmn`), `${deploymentName}.bpmn`).to.not.be.ok; + }); + }); + + Scenario('storage adapter throws on delete file', () => { + let apps, adapter; + after(() => { + return apps.stop(); + }); + + Given('a faulty storage adapter', () => { + class VolatileAdapter extends MemoryAdapter { + upsert(type, key, value) { + if (type === STORAGE_TYPE_FILE && key === 'multiple-file-process.json') { + return Promise.reject(new Error('DB json file error')); + } else { + return super.upsert(type, key, value); + } + } + delete(type, key, value) { + if (type === STORAGE_TYPE_FILE && key === 'multiple-file-process.bpmn') { + return Promise.reject(new Error('Delete DB bpmn file error')); + } else { + return super.delete(type, key, value); + } + } + } + + adapter = new VolatileAdapter(); + apps = horizontallyScaled(2, { adapter }); + }); + + let deploymentName, response; + And('a process is with multiple files', async () => { + deploymentName = 'multiple-file-process'; + const form = new FormData(); + form.append('deployment-name', deploymentName); + form.append('deployment-source', 'Test modeler'); + form.append(`${deploymentName}.bpmn`, ` + + + + + `, `${deploymentName}.bpmn`); + + form.append(`${deploymentName}.json`, Buffer.from('{"foo":"bar"}'), `${deploymentName}.json`); + + response = await apps.request() + .post('/rest/deployment/create') + .set(form.getHeaders()) + .send(form.getBuffer().toString()); + }); + + Then('error is returned', () => { + expect(response.statusCode, response.text).to.equal(502); + expect(response.body.message).to.equal('DB json file error'); + }); + + And('storage has kept file', async () => { + expect(await adapter.fetch(STORAGE_TYPE_FILE, `${deploymentName}.json`), `${deploymentName}.json`).to.not.be.ok; + expect(await adapter.fetch(STORAGE_TYPE_FILE, `${deploymentName}.bpmn`), `${deploymentName}.bpmn`).to.be.ok; + }); + }); });