Skip to content

Commit

Permalink
return router from middleware
Browse files Browse the repository at this point in the history
instead of a list of handlers
- expose engines on middleware
- implement multer _removeFile
  • Loading branch information
paed01 committed Jul 18, 2023
1 parent f9e9cf1 commit 7c2ac1d
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 42 deletions.
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import express from 'express';
import { bpmnEngineMiddleware, HttpError } from 'bpmn-express-middleware';

const app = express();
app.use('/rest', bpmnEngineMiddleware());
app.use('/rest', bpmnEngineMiddleware({ idleTimeout: 90000 }));

app.use(errorHandler);

Expand All @@ -27,6 +27,8 @@ function errorHandler(err, req, res, next) {
}
```

# Middleware

## Options

- `adapter`: Optional [storage adapter](#storage-adapter). Defaults to in-memory adapter based on LRU cache
Expand Down Expand Up @@ -174,8 +176,8 @@ Persistent storage adapter, defaults to in memory storage.
Three types will be saved to adapter:

- `deployment`: BPMN deployment with references to BPMN files
- `file`: BPMN file with content
- `state`: BPMN process engine state
- `file`: BPMN file with meta and content
- `state`: BPMN engine state

## `async upsert(type, key, value[, options])`

Expand Down
10 changes: 7 additions & 3 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Request, Response, NextFunction, Handler } from 'express'
import { Request, Response, NextFunction, Handler, Router } from 'express'
import { Engine, BpmnEngineOptions, BpmnEngineExecutionState, BpmnMessage } from 'bpmn-engine'
import { LRUCache } from 'lru-cache';
import { Broker } from 'smqp';
Expand Down Expand Up @@ -143,7 +143,7 @@ export class BpmnEngineMiddleware {
/** GET (*)?/status/:token */
getStatusByToken(req: Request<tokenParam>, res: Response<RunningResponseBody>, next: NextFunction): Promise<void>;
/** GET (*)?/status/:token/:activityId */
getActivityStatus(req: Request<tokenParam | activityIdParam>, res: Response<PostponedActivity>, next: NextFunction): Promise<void>;
getActivityStatus(req: Request<tokenParam & activityIdParam>, res: Response<PostponedActivity>, next: NextFunction): Promise<void>;
/** POST (*)?/resume/:token */
resumeByToken(req: Request<tokenParam>, res: Response, next: NextFunction): Promise<void>;
/** POST (*)?/signal/:token */
Expand All @@ -162,4 +162,8 @@ export class BpmnEngineMiddleware {
internalStopByToken(req: Request<tokenParam>, res: Response, next: NextFunction): void;
}

export function bpmnEngineMiddleware(options?: BpmnEngineMiddlewareOptions): Handler[];
interface MiddlewareReturnType extends Router {
engines: Engines;
}

export function bpmnEngineMiddleware(options?: BpmnEngineMiddlewareOptions): MiddlewareReturnType;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bpmn-middleware",
"version": "0.0.3",
"version": "0.0.4",
"description": "BPMN engine express middleware",
"type": "module",
"main": "dist/main.cjs",
Expand Down
10 changes: 7 additions & 3 deletions src/MulterAdapterStorage.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ MulterAdapterStorage.prototype._handleFile = function handleFile(req, file, call
})
.on('end', async () => {
try {
await this.adapter.upsert(STORAGE_TYPE_FILE, fileName, content);
await this.adapter.upsert(STORAGE_TYPE_FILE, fileName, { ...file, content });
return callback(null, { path: fileName, size });
} catch (err) {
return callback(err);
Expand All @@ -25,6 +25,10 @@ MulterAdapterStorage.prototype._handleFile = function handleFile(req, file, call
.on('error', callback);
};

MulterAdapterStorage.prototype._removeFile = function removeFile(req, file, callback) {
callback();
MulterAdapterStorage.prototype._removeFile = async function removeFile(req, file, callback) {
try {
await this.adapter.delete(STORAGE_TYPE_FILE, file.originalname);
} finally {
callback();
}
};
57 changes: 28 additions & 29 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import { fromActivityApi } from './Caller.js';

const packageInfo = fs.promises.readFile(join(process.cwd(), 'package.json')).then((content) => JSON.parse(content));

const kInitialized = Symbol.for('adapter init');

export { Engines, MemoryAdapter, HttpError, MiddlewareEngine };
export * from './constants.js';

Expand All @@ -34,37 +32,39 @@ export function bpmnEngineMiddleware(options) {

const router = new Router({ mergeParams: true });

const middleware = [
router.use(engineMiddleware.init),
router.get('(*)?/version', engineMiddleware.getVersion),
router.get('(*)?/deployment', engineMiddleware.getDeployment),
router.post('(*)?/deployment/create', multer({ storage }).any(), engineMiddleware.create),
router.post('(*)?/process-definition/:deploymentName/start', json(), engineMiddleware.start),
router.get('(*)?/running', engineMiddleware.getRunning),
router.get('(*)?/status/:token', engineMiddleware.getStatusByToken),
router.get('(*)?/status/:token/:activityId', engineMiddleware.getActivityStatus),
router.post('(*)?/resume/:token', json(), engineMiddleware.resumeByToken),
router.post('(*)?/signal/:token', json(), engineMiddleware.signalActivity),
router.post('(*)?/cancel/:token', json(), engineMiddleware.cancelActivity),
router.post('(*)?/fail/:token', json(), engineMiddleware.failActivity),
router.get('(*)?/state/:token', engineMiddleware.getStateByToken),
router.delete('(*)?/state/:token', engineMiddleware.deleteStateByToken),
router.delete('(*)?/internal/stop', engineMiddleware.internalStopAll),
router.delete('(*)?/internal/stop/:token', engineMiddleware.internalStopByToken),
];

Object.defineProperties(middleware, { engines: { value: engines } });

return middleware;
let initialized = false;

router.use((req, res, next) => {
if (initialized) return next();
initialized = true;
return engineMiddleware.init(req, res, next);
});
router.get('(*)?/version', engineMiddleware.getVersion);
router.get('(*)?/deployment', engineMiddleware.getDeployment);
router.post('(*)?/deployment/create', multer({ storage }).any(), engineMiddleware.create);
router.post('(*)?/process-definition/:deploymentName/start', json(), engineMiddleware.start);
router.get('(*)?/running', engineMiddleware.getRunning);
router.get('(*)?/status/:token', engineMiddleware.getStatusByToken);
router.get('(*)?/status/:token/:activityId', engineMiddleware.getActivityStatus);
router.post('(*)?/resume/:token', json(), engineMiddleware.resumeByToken);
router.post('(*)?/signal/:token', json(), engineMiddleware.signalActivity);
router.post('(*)?/cancel/:token', json(), engineMiddleware.cancelActivity);
router.post('(*)?/fail/:token', json(), engineMiddleware.failActivity);
router.get('(*)?/state/:token', engineMiddleware.getStateByToken);
router.delete('(*)?/state/:token', engineMiddleware.deleteStateByToken);
router.delete('(*)?/internal/stop', engineMiddleware.internalStopAll);
router.delete('(*)?/internal/stop/:token', engineMiddleware.internalStopByToken);

Object.defineProperties(router, { engines: { value: engines } });

return router;
}

export function BpmnEngineMiddleware(options) {
this.adapter = options.adapter;
this.engines = options.engines;
this.engineOptions = { ...options.engineOptions };
this[kInitialized] = false;

this.init = this.init.bind(this);
this.getVersion = this.getVersion.bind(this);
this.getDeployment = this.getDeployment.bind(this);
this.create = this.create.bind(this);
Expand All @@ -83,8 +83,6 @@ export function BpmnEngineMiddleware(options) {
}

BpmnEngineMiddleware.prototype.init = function init(req, res, next) {
if (this[kInitialized]) return next();
this[kInitialized] = true;
req.app.on('bpmn/end', (engine) => this._postProcessRun(engine));
req.app.on('bpmn/error', (err, engine) => this._postProcessRun(engine, err));
req.app.on('bpmn/activity.call', (callActivityApi) => this._startProcessByCallActivity(callActivityApi));
Expand Down Expand Up @@ -249,13 +247,14 @@ BpmnEngineMiddleware.prototype._startDeployment = async function startDeployment
if (!deployment) return;

const { listener, variables, businessKey, caller, idleTimeout } = options;
const deploymentSource = await this.adapter.fetch(STORAGE_TYPE_FILE, deployment[0].path);

const token = randomUUID();
await this.engines.execute({
...this.engineOptions,
name: deploymentName,
token,
source: (await this.adapter.fetch(STORAGE_TYPE_FILE, deployment[0].path)),
source: deploymentSource.content,
listener,
variables: {
...this.engineOptions.variables,
Expand Down
7 changes: 4 additions & 3 deletions test-d/index.test-d.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { expectType } from 'tsd';
import { Handler } from 'express';
import { expectAssignable, expectType } from 'tsd';
import { Router } from 'express';
import { LRUCache } from 'lru-cache';

import { bpmnEngineMiddleware, Engines, MemoryAdapter, MiddlewareEngine } from '../';

expectType<Handler[]>(bpmnEngineMiddleware());
expectAssignable<Router>(bpmnEngineMiddleware());
expectType<Engines>(bpmnEngineMiddleware().engines);
expectType<Engines>(new Engines({
adapter: new MemoryAdapter(),
engineCache: new LRUCache<string, MiddlewareEngine>({max: 1000}),
Expand Down
154 changes: 154 additions & 0 deletions test/features/storage-adapter-feature.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import request from 'supertest';
import * as ck from 'chronokinesis';
import { LRUCache } from 'lru-cache';
import FormData from 'form-data';

import { getAppWithExtensions, createDeployment, waitForProcess, horizontallyScaled } from '../helpers/testHelpers.js';
import { MemoryAdapter } from '../../src/index.js';
Expand Down Expand Up @@ -667,4 +668,157 @@ Feature('storage adapter', () => {
expect(apps.getRunning()).to.have.length(1);
});
});

Scenario('storage adapter throws on upsert file', () => {
let apps, storage;
after(() => {
return apps.stop();
});

Given('a faulty storage adapter', () => {
storage = new LRUCache({ max: 1000 });
class VolatileAdapter extends MemoryAdapter {
upsert(type, key, value) {
if (type === STORAGE_TYPE_FILE) {
return Promise.reject(new Error('DB file error'));
} else {
return super.upsert(type, key, value);
}
}
}

apps = horizontallyScaled(2, { adapter: new VolatileAdapter(storage) });
});

let response;
And('a process is deployed', async () => {
response = await createDeployment(apps.balance(), 'faulty-adapter', `<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<process id="bp" isExecutable="true">
<userTask id="task" />
</process>
</definitions>`);
});

Then('error is returned', () => {
expect(response.statusCode, response.text).to.equal(502);
expect(response.body.message).to.equal('DB file error');
});
});

Scenario('storage adapter throws on upsert multiple files', () => {
let apps, adapter;
after(() => {
return apps.stop();
});

Given('a faulty storage adapter', () => {
class VolatileAdapter extends MemoryAdapter {
upsert(type, key, value) {
if (type === STORAGE_TYPE_FILE && key === 'multiple-file-process.json') {
return Promise.reject(new Error('DB json file error'));
} else {
return super.upsert(type, key, value);
}
}
}

adapter = new VolatileAdapter();
apps = horizontallyScaled(2, { adapter });
});

let deploymentName, response;
And('a process is with multiple files', async () => {
deploymentName = 'multiple-file-process';
const form = new FormData();
form.append('deployment-name', deploymentName);
form.append('deployment-source', 'Test modeler');
form.append(`${deploymentName}.bpmn`, `<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<process id="bp" isExecutable="true">
<userTask id="task" />
</process>
</definitions>`, `${deploymentName}.bpmn`);

form.append(`${deploymentName}.json`, Buffer.from('{"foo":"bar"}'), `${deploymentName}.json`);

response = await apps.request()
.post('/rest/deployment/create')
.set(form.getHeaders())
.send(form.getBuffer().toString());
});

Then('error is returned', () => {
expect(response.statusCode, response.text).to.equal(502);
expect(response.body.message).to.equal('DB json file error');
});

And('storage lack both files', async () => {
expect(await adapter.fetch(STORAGE_TYPE_FILE, `${deploymentName}.json`), `${deploymentName}.json`).to.not.be.ok;
expect(await adapter.fetch(STORAGE_TYPE_FILE, `${deploymentName}.bpmn`), `${deploymentName}.bpmn`).to.not.be.ok;
});
});

Scenario('storage adapter throws on delete file', () => {
let apps, adapter;
after(() => {
return apps.stop();
});

Given('a faulty storage adapter', () => {
class VolatileAdapter extends MemoryAdapter {
upsert(type, key, value) {
if (type === STORAGE_TYPE_FILE && key === 'multiple-file-process.json') {
return Promise.reject(new Error('DB json file error'));
} else {
return super.upsert(type, key, value);
}
}
delete(type, key, value) {
if (type === STORAGE_TYPE_FILE && key === 'multiple-file-process.bpmn') {
return Promise.reject(new Error('Delete DB bpmn file error'));
} else {
return super.delete(type, key, value);
}
}
}

adapter = new VolatileAdapter();
apps = horizontallyScaled(2, { adapter });
});

let deploymentName, response;
And('a process is with multiple files', async () => {
deploymentName = 'multiple-file-process';
const form = new FormData();
form.append('deployment-name', deploymentName);
form.append('deployment-source', 'Test modeler');
form.append(`${deploymentName}.bpmn`, `<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<process id="bp" isExecutable="true">
<userTask id="task" />
</process>
</definitions>`, `${deploymentName}.bpmn`);

form.append(`${deploymentName}.json`, Buffer.from('{"foo":"bar"}'), `${deploymentName}.json`);

response = await apps.request()
.post('/rest/deployment/create')
.set(form.getHeaders())
.send(form.getBuffer().toString());
});

Then('error is returned', () => {
expect(response.statusCode, response.text).to.equal(502);
expect(response.body.message).to.equal('DB json file error');
});

And('storage has kept file', async () => {
expect(await adapter.fetch(STORAGE_TYPE_FILE, `${deploymentName}.json`), `${deploymentName}.json`).to.not.be.ok;
expect(await adapter.fetch(STORAGE_TYPE_FILE, `${deploymentName}.bpmn`), `${deploymentName}.bpmn`).to.be.ok;
});
});
});

0 comments on commit 7c2ac1d

Please sign in to comment.